From 6f6c6a4e21e97ffbe0031b89be079addf86b11bd Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 4 Feb 2022 09:50:29 +0000 Subject: [PATCH] Clearer commit and rollback results --- roomserver/internal/input/input.go | 27 ++++++++----- roomserver/internal/input/input_events.go | 47 ++++++++++++----------- 2 files changed, 41 insertions(+), 33 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index d6ad77fe6..5bdec0a24 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -39,6 +39,19 @@ import ( "github.com/tidwall/gjson" ) +type retryAction int +type commitAction int + +const ( + doNotRetry retryAction = iota + retryLater +) + +const ( + commitTransaction commitAction = iota + rollbackTransaction +) + var keyContentFields = map[string]string{ "m.room.join_rules": "join_rule", "m.room.history_visibility": "history_visibility", @@ -138,13 +151,6 @@ func (r *Inputer) Start() error { return err } -type retryAction int - -const ( - doNotRetry retryAction = iota - retryLater -) - // processRoomEventUsingUpdater opens up a room updater and tries to // process the event. It returns whether or not we should positively // or negatively acknowledge the event (i.e. for NATS) and an error @@ -162,12 +168,13 @@ func (r *Inputer) processRoomEventUsingUpdater( if err != nil { return retryLater, fmt.Errorf("r.DB.GetRoomUpdater: %w", err) } - commit, err := r.processRoomEvent(ctx, updater, inputRoomEvent) - if commit { + action, err := r.processRoomEvent(ctx, updater, inputRoomEvent) + switch action { + case commitTransaction: if cerr := updater.Commit(); cerr != nil { return retryLater, fmt.Errorf("updater.Commit: %w", cerr) } - } else { + case rollbackTransaction: if rerr := updater.Rollback(); rerr != nil { return retryLater, fmt.Errorf("updater.Rollback: %w", rerr) } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index e81dd803f..f3fa83d83 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -70,13 +70,13 @@ func (r *Inputer) processRoomEvent( ctx context.Context, updater *shared.RoomUpdater, input *api.InputRoomEvent, -) (commit bool, err error) { +) (commitAction, error) { select { case <-ctx.Done(): // Before we do anything, make sure the context hasn't expired for this pending task. // If it has then we'll give up straight away — it's probably a synchronous input // request and the caller has already given up, but the inbox task was still queued. - return false, context.DeadlineExceeded + return rollbackTransaction, context.DeadlineExceeded default: } @@ -118,11 +118,11 @@ func (r *Inputer) processRoomEvent( case gomatrixserverlib.EventIDFormatV1: if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { logger.Debugf("Already processed event; ignoring") - return false, nil + return rollbackTransaction, nil } default: logger.Debugf("Already processed event; ignoring") - return false, nil + return rollbackTransaction, nil } } } @@ -136,8 +136,8 @@ func (r *Inputer) processRoomEvent( AuthEventIDs: event.AuthEventIDs(), PrevEventIDs: event.PrevEventIDs(), } - if err = r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil { - return false, fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err) + if err := r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil { + return rollbackTransaction, fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err) } } missingAuth := len(missingRes.MissingAuthEventIDs) > 0 @@ -148,8 +148,8 @@ func (r *Inputer) processRoomEvent( RoomID: event.RoomID(), ExcludeSelf: true, } - if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil { - return false, fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err) + if err := r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil { + return rollbackTransaction, fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err) } // Sort all of the servers into a map so that we can randomise // their order. Then make sure that the input origin and the @@ -178,8 +178,8 @@ func (r *Inputer) processRoomEvent( isRejected := false authEvents := gomatrixserverlib.NewAuthEvents(nil) knownEvents := map[string]*types.Event{} - if err = r.fetchAuthEvents(ctx, updater, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil { - return false, fmt.Errorf("r.fetchAuthEvents: %w", err) + if err := r.fetchAuthEvents(ctx, updater, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil { + return rollbackTransaction, fmt.Errorf("r.fetchAuthEvents: %w", err) } // Check if the event is allowed by its auth events. If it isn't then @@ -195,7 +195,7 @@ func (r *Inputer) processRoomEvent( authEventNIDs := make([]types.EventNID, 0, len(authEventIDs)) for _, authEventID := range authEventIDs { if _, ok := knownEvents[authEventID]; !ok { - return false, fmt.Errorf("missing auth event %s", authEventID) + return rollbackTransaction, fmt.Errorf("missing auth event %s", authEventID) } authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID) } @@ -204,6 +204,7 @@ func (r *Inputer) processRoomEvent( if input.Kind == api.KindNew { // Check that the event passes authentication checks based on the // current room state. + var err error softfail, err = helpers.CheckForSoftFail(ctx, updater, headered, input.StateEventIDs) if err != nil { logger.WithError(err).Warn("Error authing soft-failed event") @@ -237,7 +238,7 @@ func (r *Inputer) processRoomEvent( hadEvents: map[string]bool{}, haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{}, } - if err = missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil { + if err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil { isRejected = true rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err) } else { @@ -252,14 +253,14 @@ func (r *Inputer) processRoomEvent( // Store the event. _, _, stateAtEvent, redactionEvent, redactedEventID, err := updater.StoreEvent(ctx, event, authEventNIDs, isRejected) if err != nil { - return false, fmt.Errorf("updater.StoreEvent: %w", err) + return rollbackTransaction, fmt.Errorf("updater.StoreEvent: %w", err) } // if storing this event results in it being redacted then do so. if !isRejected && redactedEventID == event.EventID() { r, rerr := eventutil.RedactEvent(redactionEvent, event) if rerr != nil { - return false, fmt.Errorf("eventutil.RedactEvent: %w", rerr) + return rollbackTransaction, fmt.Errorf("eventutil.RedactEvent: %w", rerr) } event = r } @@ -270,15 +271,15 @@ func (r *Inputer) processRoomEvent( if input.Kind == api.KindOutlier { logger.Debug("Stored outlier") hooks.Run(hooks.KindNewEventPersisted, headered) - return true, nil + return commitTransaction, nil } roomInfo, err := updater.RoomInfo(ctx, event.RoomID()) if err != nil { - return false, fmt.Errorf("updater.RoomInfo: %w", err) + return rollbackTransaction, fmt.Errorf("updater.RoomInfo: %w", err) } if roomInfo == nil { - return false, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID()) + return rollbackTransaction, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID()) } if !missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0 { @@ -286,7 +287,7 @@ func (r *Inputer) processRoomEvent( // Lets calculate one. err = r.calculateAndSetState(ctx, updater, input, roomInfo, &stateAtEvent, event, isRejected) if err != nil { - return false, fmt.Errorf("r.calculateAndSetState: %w", err) + return rollbackTransaction, fmt.Errorf("r.calculateAndSetState: %w", err) } } @@ -296,7 +297,7 @@ func (r *Inputer) processRoomEvent( "soft_fail": softfail, "missing_prev": missingPrev, }).Warn("Stored rejected event") - return true, rejectionErr + return commitTransaction, rejectionErr } switch input.Kind { @@ -311,7 +312,7 @@ func (r *Inputer) processRoomEvent( input.TransactionID, // transaction ID input.HasState, // rewrites state? ); err != nil { - return false, fmt.Errorf("r.updateLatestEvents: %w", err) + return rollbackTransaction, fmt.Errorf("r.updateLatestEvents: %w", err) } case api.KindOld: err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ @@ -323,7 +324,7 @@ func (r *Inputer) processRoomEvent( }, }) if err != nil { - return false, fmt.Errorf("r.WriteOutputEvents (old): %w", err) + return rollbackTransaction, fmt.Errorf("r.WriteOutputEvents (old): %w", err) } } @@ -342,14 +343,14 @@ func (r *Inputer) processRoomEvent( }, }) if err != nil { - return false, fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) + return rollbackTransaction, fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) } } // Everything was OK — the latest events updater didn't error and // we've sent output events. Finally, generate a hook call. hooks.Run(hooks.KindNewEventPersisted, headered) - return true, nil + return commitTransaction, nil } // fetchAuthEvents will check to see if any of the