Scope to room

This commit is contained in:
Neil Alexander 2022-08-17 16:59:59 +01:00
parent 17378d870f
commit 37aba99e0c
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
6 changed files with 32 additions and 30 deletions

View file

@ -107,26 +107,6 @@ func (r *Inputer) processRoomEvent(
})
}
// If we already know about this event and it hasn't been rejected
// then we won't attempt to reprocess it. If it was rejected then we
// can attempt to reprocess, in case we have learned something new
// that will allow us to accept the event this time.
wasRejected, werr := r.DB.IsEventRejected(ctx, event.EventID())
switch {
case werr == sql.ErrNoRows:
// We haven't seen this event before so continue.
case werr != nil:
// Something has gone wrong trying to find out if we rejected
// this event already.
logger.WithError(werr).Errorf("Failed to check if event %q is already seen", event.EventID())
return werr
case !wasRejected:
// We've seen this event before and it wasn't rejected so we
// should ignore it.
logger.Debugf("Already processed event %q, ignoring", event.EventID())
return nil
}
// Don't waste time processing the event if the room doesn't exist.
// A room entry locally will only be created in response to a create
// event.
@ -139,6 +119,28 @@ func (r *Inputer) processRoomEvent(
return fmt.Errorf("room %s does not exist for event %s", event.RoomID(), event.EventID())
}
// If we already know about this event and it hasn't been rejected
// then we won't attempt to reprocess it. If it was rejected then we
// can attempt to reprocess, in case we have learned something new
// that will allow us to accept the event this time.
if roomInfo != nil {
wasRejected, werr := r.DB.IsEventRejected(ctx, roomInfo.RoomNID, event.EventID())
switch {
case werr == sql.ErrNoRows:
// We haven't seen this event before so continue.
case werr != nil:
// Something has gone wrong trying to find out if we rejected
// this event already.
logger.WithError(werr).Errorf("Failed to check if event %q is already seen", event.EventID())
return werr
case !wasRejected:
// We've seen this event before and it wasn't rejected so we
// should ignore it.
logger.Debugf("Already processed event %q, ignoring", event.EventID())
return nil
}
}
var missingAuth, missingPrev bool
serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{}
if !isCreateEvent {

View file

@ -95,7 +95,7 @@ type Database interface {
// The GetRoomUpdater must have Commit or Rollback called on it if this doesn't return an error.
// If this returns an error then no further action is required.
// IsEventRejected returns true if the event is known and rejected.
IsEventRejected(ctx context.Context, eventID string) (rejected bool, err error)
IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (rejected bool, err error)
GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error)
// Look up event references for the latest events in the room and the current state snapshot.
// Returns the latest events, the current state and the maximum depth of the latest events plus 1.

View file

@ -137,7 +137,7 @@ const selectRoomNIDsForEventNIDsSQL = "" +
"SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid = ANY($1)"
const selectEventRejectedSQL = "" +
"SELECT is_rejected FROM roomserver_events WHERE event_id = $1"
"SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2"
type eventStatements struct {
insertEventStmt *sql.Stmt
@ -547,9 +547,9 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array {
}
func (s *eventStatements) SelectEventRejected(
ctx context.Context, txn *sql.Tx, eventID string,
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string,
) (rejected bool, err error) {
stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt)
err = stmt.QueryRowContext(ctx, eventID).Scan(&rejected)
err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected)
return
}

View file

@ -567,8 +567,8 @@ func (d *Database) GetRoomUpdater(
return updater, err
}
func (d *Database) IsEventRejected(ctx context.Context, eventID string) (bool, error) {
return d.EventsTable.SelectEventRejected(ctx, nil, eventID)
func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (bool, error) {
return d.EventsTable.SelectEventRejected(ctx, nil, roomNID, eventID)
}
func (d *Database) StoreEvent(

View file

@ -110,7 +110,7 @@ const selectRoomNIDsForEventNIDsSQL = "" +
"SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)"
const selectEventRejectedSQL = "" +
"SELECT is_rejected FROM roomserver_events WHERE event_id = $1"
"SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2"
type eventStatements struct {
db *sql.DB
@ -621,9 +621,9 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) string {
}
func (s *eventStatements) SelectEventRejected(
ctx context.Context, txn *sql.Tx, eventID string,
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string,
) (rejected bool, err error) {
stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt)
err = stmt.QueryRowContext(ctx, eventID).Scan(&rejected)
err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected)
return
}

View file

@ -66,7 +66,7 @@ type Events interface {
BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error)
SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error)
SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error)
SelectEventRejected(ctx context.Context, txn *sql.Tx, eventID string) (rejected bool, err error)
SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error)
}
type Rooms interface {