From 225fb1576b447b64df6c16aa217987e9fabe3fe5 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 15 Sep 2020 16:10:56 +0100 Subject: [PATCH] WIP Event rejection --- roomserver/internal/input/input_events.go | 11 ++++++++--- roomserver/internal/perform/perform_backfill.go | 2 +- roomserver/roomserver_test.go | 8 -------- roomserver/storage/interface.go | 1 + roomserver/storage/postgres/events_table.go | 9 ++++++--- roomserver/storage/shared/storage.go | 7 +++++-- roomserver/storage/sqlite3/events_table.go | 10 ++++++---- roomserver/storage/tables/interface.go | 5 ++++- 8 files changed, 31 insertions(+), 22 deletions(-) diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index daf1afcd3..1e88e95a7 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -46,10 +46,11 @@ func (r *Inputer) processRoomEvent( // Check that the event passes authentication checks and work out // the numeric IDs for the auth events. + isRejected := false authEventNIDs, err := helpers.CheckAuthEvents(ctx, r.DB, headered, input.AuthEventIDs) if err != nil { - logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event") - return + logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event, rejecting event") + isRejected = true } // If we don't have a transaction ID then get one. @@ -65,10 +66,14 @@ func (r *Inputer) processRoomEvent( } // Store the event. - _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) + _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs, isRejected) if err != nil { return "", fmt.Errorf("r.DB.StoreEvent: %w", err) } + // We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it. + if isRejected { + return event.EventID(), nil + } // if storing this event results in it being redacted then do so. if redactedEventID == event.EventID() { r, rerr := eventutil.RedactEvent(redactionEvent, &event) diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index 668c80787..eb1aa99b8 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -535,7 +535,7 @@ func persistEvents(ctx context.Context, db storage.Database, events []gomatrixse var stateAtEvent types.StateAtEvent var redactedEventID string var redactionEvent *gomatrixserverlib.Event - roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids) + roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids, false) if err != nil { logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event") continue diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 5a67a1bed..3bcfd4dc8 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -140,14 +140,6 @@ func mustCreateEvents(t *testing.T, roomVer gomatrixserverlib.RoomVersion, event return } -func eventsJSON(events []gomatrixserverlib.Event) []json.RawMessage { - result := make([]json.RawMessage, len(events)) - for i := range events { - result[i] = events[i].JSON() - } - return result -} - func mustLoadRawEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) []gomatrixserverlib.HeaderedEvent { t.Helper() hs := make([]gomatrixserverlib.HeaderedEvent, len(events)) diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index be724da62..10a380e85 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -70,6 +70,7 @@ type Database interface { // Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error. StoreEvent( ctx context.Context, event gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, + isRejected bool, ) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) // Look up the state entries for a list of string event IDs // Returns an error if the there is an error talking to the database diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index e66efb097..4fad1c76e 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -65,13 +65,14 @@ CREATE TABLE IF NOT EXISTS roomserver_events ( -- Needed for setting reference hashes when sending new events. reference_sha256 BYTEA NOT NULL, -- A list of numeric IDs for events that can authenticate this event. - auth_event_nids BIGINT[] NOT NULL + auth_event_nids BIGINT[] NOT NULL, + is_rejected BOOLEAN NOT NULL DEFAULT FALSE ); ` const insertEventSQL = "" + - "INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth)" + - " VALUES ($1, $2, $3, $4, $5, $6, $7)" + + "INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + " ON CONFLICT ON CONSTRAINT roomserver_event_id_unique" + " DO NOTHING" + " RETURNING event_nid, state_snapshot_nid" @@ -174,12 +175,14 @@ func (s *eventStatements) InsertEvent( referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64, + isRejected bool, ) (types.EventNID, types.StateSnapshotNID, error) { var eventNID int64 var stateNID int64 err := s.insertEventStmt.QueryRowContext( ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID), eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, + isRejected, ).Scan(&eventNID, &stateNID) return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 262b0f2f8..e710b99b7 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -382,7 +382,7 @@ func (d *Database) GetLatestEventsForUpdate( // nolint:gocyclo func (d *Database) StoreEvent( ctx context.Context, event gomatrixserverlib.Event, - txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, + txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, isRejected bool, ) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { var ( roomNID types.RoomNID @@ -446,6 +446,7 @@ func (d *Database) StoreEvent( event.EventReference().EventSHA256, authEventNIDs, event.Depth(), + isRejected, ); err != nil { if err == sql.ErrNoRows { // We've already inserted the event so select the numeric event ID @@ -459,7 +460,9 @@ func (d *Database) StoreEvent( if err = d.EventJSONTable.InsertEventJSON(ctx, txn, eventNID, event.JSON()); err != nil { return fmt.Errorf("d.EventJSONTable.InsertEventJSON: %w", err) } - redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event) + if !isRejected { // ignore rejected redaction events + redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event) + } return nil }) if err != nil { diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index a866c85d0..67d111f52 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -41,13 +41,14 @@ const eventsSchema = ` depth INTEGER NOT NULL, event_id TEXT NOT NULL UNIQUE, reference_sha256 BLOB NOT NULL, - auth_event_nids TEXT NOT NULL DEFAULT '[]' + auth_event_nids TEXT NOT NULL DEFAULT '[]', + is_rejected BOOLEAN NOT NULL DEFAULT FALSE ); ` const insertEventSQL = ` - INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING; ` @@ -150,13 +151,14 @@ func (s *eventStatements) InsertEvent( referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64, + isRejected bool, ) (types.EventNID, types.StateSnapshotNID, error) { // attempt to insert: the last_row_id is the event NID var eventNID int64 insertStmt := sqlutil.TxStmt(txn, s.insertEventStmt) result, err := insertStmt.ExecContext( ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID), - eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, + eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, isRejected, ) if err != nil { return 0, 0, err diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index adb06212a..eba878ba5 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -34,7 +34,10 @@ type EventStateKeys interface { } type Events interface { - InsertEvent(c context.Context, txn *sql.Tx, i types.RoomNID, j types.EventTypeNID, k types.EventStateKeyNID, eventID string, referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64) (types.EventNID, types.StateSnapshotNID, error) + InsertEvent( + ctx context.Context, txn *sql.Tx, i types.RoomNID, j types.EventTypeNID, k types.EventStateKeyNID, eventID string, + referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64, isRejected bool, + ) (types.EventNID, types.StateSnapshotNID, error) SelectEvent(ctx context.Context, txn *sql.Tx, eventID string) (types.EventNID, types.StateSnapshotNID, error) // bulkSelectStateEventByID lookups a list of state events by event ID. // If any of the requested events are missing from the database it returns a types.MissingEventError