From d9acb07326c9cede5cef26f6b0db0fca538c26ae Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 15 Sep 2020 17:50:19 +0100 Subject: [PATCH] Implement rejected events Critically, rejected events CAN cause state resolution to happen as it can merge forks in the DAG. This is fine, _provided_ we do not add the rejected event when performing state resolution, which is what this PR does. It also fixes the error handling when NotAllowed happens, as we were checking too early and needlessly handling NotAllowed in more than one place. --- federationapi/routing/send.go | 18 +++---------- roomserver/internal/input/input_events.go | 29 ++++++++++++--------- roomserver/internal/query/query.go | 1 + roomserver/state/state.go | 5 ++-- roomserver/storage/postgres/events_table.go | 3 ++- roomserver/storage/sqlite3/events_table.go | 3 ++- roomserver/types/types.go | 3 +++ sytest-whitelist | 4 ++- 8 files changed, 34 insertions(+), 32 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index e3f84b697..cb7bea6c4 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -17,7 +17,6 @@ package routing import ( "context" "encoding/json" - "errors" "fmt" "net/http" "time" @@ -373,13 +372,10 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, is return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion, isInboundTxn) } - // Check that the event is allowed by the state at the event. - if err := checkAllowedByState(e, gomatrixserverlib.UnwrapEventHeaders(stateResp.StateEvents)); err != nil { - return err - } - - // pass the event to the roomserver - err := api.SendEvents( + // pass the event to the roomserver which will do auth checks + // If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently + // discarded by the caller of this function + return api.SendEvents( context.Background(), t.rsAPI, []gomatrixserverlib.HeaderedEvent{ @@ -388,12 +384,6 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, is api.DoNotSendToOtherServers, nil, ) - var notAllowed *gomatrixserverlib.NotAllowed - if errors.As(err, ¬Allowed) { - // the event was rejected, silently ignore. - err = nil - } - return err } func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error { diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 7ffef1c06..0558cd763 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -70,17 +70,9 @@ func (r *Inputer) processRoomEvent( if err != nil { return "", fmt.Errorf("r.DB.StoreEvent: %w", err) } - // We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it. - if isRejected { - logrus.WithFields(logrus.Fields{ - "event_id": event.EventID(), - "type": event.Type(), - "room": event.RoomID(), - }).Debug("Stored rejected event") - return event.EventID(), rejectionErr - } + // if storing this event results in it being redacted then do so. - if redactedEventID == event.EventID() { + if !isRejected && redactedEventID == event.EventID() { r, rerr := eventutil.RedactEvent(redactionEvent, &event) if rerr != nil { return "", fmt.Errorf("eventutil.RedactEvent: %w", rerr) @@ -111,12 +103,22 @@ func (r *Inputer) processRoomEvent( if stateAtEvent.BeforeStateSnapshotNID == 0 { // We haven't calculated a state for this event yet. // Lets calculate one. - err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event) + err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected) if err != nil { return "", fmt.Errorf("r.calculateAndSetState: %w", err) } } + // We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it. + if isRejected { + logrus.WithFields(logrus.Fields{ + "event_id": event.EventID(), + "type": event.Type(), + "room": event.RoomID(), + }).Debug("Stored rejected event") + return event.EventID(), rejectionErr + } + if input.Kind == api.KindRewrite { logrus.WithFields(logrus.Fields{ "event_id": event.EventID(), @@ -167,11 +169,12 @@ func (r *Inputer) calculateAndSetState( roomInfo types.RoomInfo, stateAtEvent *types.StateAtEvent, event gomatrixserverlib.Event, + isRejected bool, ) error { var err error roomState := state.NewStateResolution(r.DB, roomInfo) - if input.HasState { + if input.HasState && !isRejected { // Check here if we think we're in the room already. stateAtEvent.Overwrite = true var joinEventNIDs []types.EventNID @@ -198,7 +201,7 @@ func (r *Inputer) calculateAndSetState( stateAtEvent.Overwrite = false // We haven't been told what the state at the event is so we need to calculate it from the prev_events - if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event); err != nil { + if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event, isRejected); err != nil { return fmt.Errorf("roomState.CalculateAndStoreStateBeforeEvent: %w", err) } } diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index b34ae7701..fb981447f 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -70,6 +70,7 @@ func (r *Queryer) QueryStateAfterEvents( if err != nil { switch err.(type) { case types.MissingEventError: + util.GetLogger(ctx).Errorf("QueryStateAfterEvents: MissingEventError: %s", err) return nil default: return err diff --git a/roomserver/state/state.go b/roomserver/state/state.go index 37e6807a3..9ee6f40d4 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -159,7 +159,7 @@ func (v StateResolution) LoadCombinedStateAfterEvents( } fullState = append(fullState, entries...) } - if prevState.IsStateEvent() { + if prevState.IsStateEvent() && !prevState.IsRejected { // If the prev event was a state event then add an entry for the event itself // so that we get the state after the event rather than the state before. fullState = append(fullState, prevState.StateEntry) @@ -523,6 +523,7 @@ func init() { func (v StateResolution) CalculateAndStoreStateBeforeEvent( ctx context.Context, event gomatrixserverlib.Event, + isRejected bool, ) (types.StateSnapshotNID, error) { // Load the state at the prev events. prevEventRefs := event.PrevEvents() @@ -561,7 +562,7 @@ func (v StateResolution) CalculateAndStoreStateAfterEvents( if len(prevStates) == 1 { prevState := prevStates[0] - if prevState.EventStateKeyNID == 0 { + if prevState.EventStateKeyNID == 0 || prevState.IsRejected { // 3) None of the previous events were state events and they all // have the same state, so this event has exactly the same state // as the previous events. diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index 4fad1c76e..c8eb8e2d2 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -89,7 +89,7 @@ const bulkSelectStateEventByIDSQL = "" + " ORDER BY event_type_nid, event_state_key_nid ASC" const bulkSelectStateAtEventByIDSQL = "" + - "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid FROM roomserver_events" + + "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" + " WHERE event_id = ANY($1)" const updateEventStateSQL = "" + @@ -258,6 +258,7 @@ func (s *eventStatements) BulkSelectStateAtEventByID( &result.EventStateKeyNID, &result.EventNID, &result.BeforeStateSnapshotNID, + &result.IsRejected, ); err != nil { return nil, err } diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index 67d111f52..773e9ade3 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -64,7 +64,7 @@ const bulkSelectStateEventByIDSQL = "" + " ORDER BY event_type_nid, event_state_key_nid ASC" const bulkSelectStateAtEventByIDSQL = "" + - "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid FROM roomserver_events" + + "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" + " WHERE event_id IN ($1)" const updateEventStateSQL = "" + @@ -263,6 +263,7 @@ func (s *eventStatements) BulkSelectStateAtEventByID( &result.EventStateKeyNID, &result.EventNID, &result.BeforeStateSnapshotNID, + &result.IsRejected, ); err != nil { return nil, err } diff --git a/roomserver/types/types.go b/roomserver/types/types.go index f5b45763f..c0fcef65e 100644 --- a/roomserver/types/types.go +++ b/roomserver/types/types.go @@ -101,6 +101,9 @@ type StateAtEvent struct { Overwrite bool // The state before the event. BeforeStateSnapshotNID StateSnapshotNID + // True if this StateEntry is rejected. State resolution should then treat this + // StateEntry as being a message event (not a state event). + IsRejected bool // The state entry for the event itself, allows us to calculate the state after the event. StateEntry } diff --git a/sytest-whitelist b/sytest-whitelist index 0adeaee6f..91516428d 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -470,4 +470,6 @@ We can't peek into rooms with shared history_visibility We can't peek into rooms with invited history_visibility We can't peek into rooms with joined history_visibility Local users can peek by room alias -Peeked rooms only turn up in the sync for the device who peeked them \ No newline at end of file +Peeked rooms only turn up in the sync for the device who peeked them +Room state at a rejected message event is the same as its predecessor +Room state at a rejected state event is the same as its predecessor \ No newline at end of file