From c2df0b3efa0afcf81eb9d4ccef444896306c3d17 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 16 Aug 2022 16:26:01 +0100 Subject: [PATCH] Reprocess outliers that were previously rejected --- roomserver/internal/input/input_events.go | 31 ++++++++------------- roomserver/storage/interface.go | 2 ++ roomserver/storage/postgres/events_table.go | 13 +++++++++ roomserver/storage/shared/storage.go | 4 +++ roomserver/storage/sqlite3/events_table.go | 13 +++++++++ roomserver/storage/tables/interface.go | 1 + 6 files changed, 45 insertions(+), 19 deletions(-) diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 81541260c..018d5ec9f 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -17,8 +17,8 @@ package input import ( - "bytes" "context" + "database/sql" "fmt" "time" @@ -107,25 +107,18 @@ func (r *Inputer) processRoomEvent( }) } - // if we have already got this event then do not process it again, if the input kind is an outlier. - // Outliers contain no extra information which may warrant a re-processing. + // If we already know about this outlier and it hasn't been rejected + // then we won't attempt to reprocess it. If it was rejected then we + // can attempt to reprocess, in case we have learned something new + // that will allow us to accept the event this time. if input.Kind == api.KindOutlier { - evs, err2 := r.DB.EventsFromIDs(ctx, []string{event.EventID()}) - if err2 == nil && len(evs) == 1 { - // check hash matches if we're on early room versions where the event ID was a random string - idFormat, err2 := headered.RoomVersion.EventIDFormat() - if err2 == nil { - switch idFormat { - case gomatrixserverlib.EventIDFormatV1: - if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { - logger.Debugf("Already processed event; ignoring") - return nil - } - default: - logger.Debugf("Already processed event; ignoring") - return nil - } - } + rejected, err := r.DB.IsEventRejected(ctx, event.EventID()) + if err != nil && err != sql.ErrNoRows { + return err + } + if !rejected { + logger.Debugf("Already processed event %q, ignoring", event.EventID()) + return nil } } diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index b12025c41..de8da58f1 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -94,6 +94,8 @@ type Database interface { // Opens and returns a room updater, which locks the room and opens a transaction. // The GetRoomUpdater must have Commit or Rollback called on it if this doesn't return an error. // If this returns an error then no further action is required. + // IsEventRejected returns true if the event is known and rejected. + IsEventRejected(ctx context.Context, eventID string) (rejected bool, err error) GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error) // Look up event references for the latest events in the room and the current state snapshot. // Returns the latest events, the current state and the maximum depth of the latest events plus 1. diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index a4d05756d..bac19d304 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -136,6 +136,9 @@ const selectMaxEventDepthSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" + "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid = ANY($1)" +const selectEventRejectedSQL = "" + + "SELECT is_rejected FROM roomserver_Events WHERE event_id = $1" + type eventStatements struct { insertEventStmt *sql.Stmt selectEventStmt *sql.Stmt @@ -153,6 +156,7 @@ type eventStatements struct { bulkSelectUnsentEventNIDStmt *sql.Stmt selectMaxEventDepthStmt *sql.Stmt selectRoomNIDsForEventNIDsStmt *sql.Stmt + selectEventRejectedStmt *sql.Stmt } func CreateEventsTable(db *sql.DB) error { @@ -180,6 +184,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) { {&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL}, {&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL}, {&s.selectRoomNIDsForEventNIDsStmt, selectRoomNIDsForEventNIDsSQL}, + {&s.selectEventRejectedStmt, selectEventRejectedSQL}, }.Prepare(db) } @@ -540,3 +545,11 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array { } return nids } + +func (s *eventStatements) SelectEventRejected( + ctx context.Context, txn *sql.Tx, eventID string, +) (rejected bool, err error) { + stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt) + err = stmt.QueryRowContext(ctx, eventID).Scan(&rejected) + return +} diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index cbf9c8b20..c24379035 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -567,6 +567,10 @@ func (d *Database) GetRoomUpdater( return updater, err } +func (d *Database) IsEventRejected(ctx context.Context, eventID string) (bool, error) { + return d.EventsTable.SelectEventRejected(ctx, nil, eventID) +} + func (d *Database) StoreEvent( ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, isRejected bool, diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index 1dda34c36..c0d396dd5 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -109,6 +109,9 @@ const selectMaxEventDepthSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" + "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)" +const selectEventRejectedSQL = "" + + "SELECT is_rejected FROM roomserver_Events WHERE event_id = $1" + type eventStatements struct { db *sql.DB insertEventStmt *sql.Stmt @@ -122,6 +125,7 @@ type eventStatements struct { bulkSelectStateAtEventAndReferenceStmt *sql.Stmt bulkSelectEventReferenceStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt + selectEventRejectedStmt *sql.Stmt //bulkSelectEventNIDStmt *sql.Stmt //bulkSelectUnsentEventNIDStmt *sql.Stmt //selectRoomNIDsForEventNIDsStmt *sql.Stmt @@ -152,6 +156,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) { //{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, //{&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL}, //{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL}, + {&s.selectEventRejectedStmt, selectEventRejectedSQL}, }.Prepare(db) } @@ -614,3 +619,11 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) string { b, _ := json.Marshal(eventNIDs) return string(b) } + +func (s *eventStatements) SelectEventRejected( + ctx context.Context, txn *sql.Tx, eventID string, +) (rejected bool, err error) { + stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt) + err = stmt.QueryRowContext(ctx, eventID).Scan(&rejected) + return +} diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 0bc389b80..c44715a3e 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -66,6 +66,7 @@ type Events interface { BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error) + SelectEventRejected(ctx context.Context, txn *sql.Tx, eventID string) (rejected bool, err error) } type Rooms interface {