diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 948225778..6d4bd548e 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -107,26 +107,6 @@ func (r *Inputer) processRoomEvent( }) } - // If we already know about this event and it hasn't been rejected - // then we won't attempt to reprocess it. If it was rejected then we - // can attempt to reprocess, in case we have learned something new - // that will allow us to accept the event this time. - wasRejected, werr := r.DB.IsEventRejected(ctx, event.EventID()) - switch { - case werr == sql.ErrNoRows: - // We haven't seen this event before so continue. - case werr != nil: - // Something has gone wrong trying to find out if we rejected - // this event already. - logger.WithError(werr).Errorf("Failed to check if event %q is already seen", event.EventID()) - return werr - case !wasRejected: - // We've seen this event before and it wasn't rejected so we - // should ignore it. - logger.Debugf("Already processed event %q, ignoring", event.EventID()) - return nil - } - // Don't waste time processing the event if the room doesn't exist. // A room entry locally will only be created in response to a create // event. @@ -139,6 +119,28 @@ func (r *Inputer) processRoomEvent( return fmt.Errorf("room %s does not exist for event %s", event.RoomID(), event.EventID()) } + // If we already know about this event and it hasn't been rejected + // then we won't attempt to reprocess it. If it was rejected then we + // can attempt to reprocess, in case we have learned something new + // that will allow us to accept the event this time. + if roomInfo != nil { + wasRejected, werr := r.DB.IsEventRejected(ctx, roomInfo.RoomNID, event.EventID()) + switch { + case werr == sql.ErrNoRows: + // We haven't seen this event before so continue. + case werr != nil: + // Something has gone wrong trying to find out if we rejected + // this event already. + logger.WithError(werr).Errorf("Failed to check if event %q is already seen", event.EventID()) + return werr + case !wasRejected: + // We've seen this event before and it wasn't rejected so we + // should ignore it. + logger.Debugf("Already processed event %q, ignoring", event.EventID()) + return nil + } + } + var missingAuth, missingPrev bool serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{} if !isCreateEvent { diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index de8da58f1..5c068873a 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -95,7 +95,7 @@ type Database interface { // The GetRoomUpdater must have Commit or Rollback called on it if this doesn't return an error. // If this returns an error then no further action is required. // IsEventRejected returns true if the event is known and rejected. - IsEventRejected(ctx context.Context, eventID string) (rejected bool, err error) + IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (rejected bool, err error) GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error) // Look up event references for the latest events in the room and the current state snapshot. // Returns the latest events, the current state and the maximum depth of the latest events plus 1. diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index 49be2e732..c7748d2be 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -137,7 +137,7 @@ const selectRoomNIDsForEventNIDsSQL = "" + "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid = ANY($1)" const selectEventRejectedSQL = "" + - "SELECT is_rejected FROM roomserver_events WHERE event_id = $1" + "SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2" type eventStatements struct { insertEventStmt *sql.Stmt @@ -547,9 +547,9 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array { } func (s *eventStatements) SelectEventRejected( - ctx context.Context, txn *sql.Tx, eventID string, + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string, ) (rejected bool, err error) { stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt) - err = stmt.QueryRowContext(ctx, eventID).Scan(&rejected) + err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected) return } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index c24379035..4f92adf1f 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -567,8 +567,8 @@ func (d *Database) GetRoomUpdater( return updater, err } -func (d *Database) IsEventRejected(ctx context.Context, eventID string) (bool, error) { - return d.EventsTable.SelectEventRejected(ctx, nil, eventID) +func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (bool, error) { + return d.EventsTable.SelectEventRejected(ctx, nil, roomNID, eventID) } func (d *Database) StoreEvent( diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index 794e554f0..174e3a9a7 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -110,7 +110,7 @@ const selectRoomNIDsForEventNIDsSQL = "" + "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)" const selectEventRejectedSQL = "" + - "SELECT is_rejected FROM roomserver_events WHERE event_id = $1" + "SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2" type eventStatements struct { db *sql.DB @@ -621,9 +621,9 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) string { } func (s *eventStatements) SelectEventRejected( - ctx context.Context, txn *sql.Tx, eventID string, + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string, ) (rejected bool, err error) { stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt) - err = stmt.QueryRowContext(ctx, eventID).Scan(&rejected) + err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected) return } diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index c44715a3e..ed67c43d8 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -66,7 +66,7 @@ type Events interface { BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error) - SelectEventRejected(ctx context.Context, txn *sql.Tx, eventID string) (rejected bool, err error) + SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error) } type Rooms interface {