From 2ca972ef7694a9f5b4f08bb8c9ccc836420419a6 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 2 Feb 2022 15:24:45 +0000 Subject: [PATCH] Pass through errors properly --- roomserver/internal/input/input.go | 16 +++++---- roomserver/internal/input/input_events.go | 38 +++++++++++----------- roomserver/internal/input/input_missing.go | 8 ++--- 3 files changed, 33 insertions(+), 29 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index c7b38f231..80614bc52 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -155,14 +155,18 @@ func (r *Inputer) processRoomEventUsingUpdater( if err != nil { return true, fmt.Errorf("r.DB.GetRoomUpdater: %w", err) } - if err = r.processRoomEvent(ctx, updater, inputRoomEvent); err != nil { - if rerr := updater.Rollback(); rerr != nil { - return false, fmt.Errorf("r.processRoomEvent: %w (with additional error rolling back transaction: %s)", err, rerr) + commit, err := r.processRoomEvent(ctx, updater, inputRoomEvent) + if commit { + if err = updater.Commit(); err != nil { + return false, fmt.Errorf("updater.Commit: %w", err) + } + } else { + if rerr := updater.Rollback(); rerr != nil { + return true, fmt.Errorf("updater.Rollback: %w", err) } - return false, fmt.Errorf("r.processRoomEvent: %w", err) } - if err = updater.Commit(); err != nil { - return true, fmt.Errorf("updater.Commit: %w", err) + if err != nil { + return true, err } return false, nil } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 657ecab2a..e81dd803f 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -70,13 +70,13 @@ func (r *Inputer) processRoomEvent( ctx context.Context, updater *shared.RoomUpdater, input *api.InputRoomEvent, -) (err error) { +) (commit bool, err error) { select { case <-ctx.Done(): // Before we do anything, make sure the context hasn't expired for this pending task. // If it has then we'll give up straight away — it's probably a synchronous input // request and the caller has already given up, but the inbox task was still queued. - return context.DeadlineExceeded + return false, context.DeadlineExceeded default: } @@ -118,11 +118,11 @@ func (r *Inputer) processRoomEvent( case gomatrixserverlib.EventIDFormatV1: if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { logger.Debugf("Already processed event; ignoring") - return nil + return false, nil } default: logger.Debugf("Already processed event; ignoring") - return nil + return false, nil } } } @@ -137,7 +137,7 @@ func (r *Inputer) processRoomEvent( PrevEventIDs: event.PrevEventIDs(), } if err = r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil { - return fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err) + return false, fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err) } } missingAuth := len(missingRes.MissingAuthEventIDs) > 0 @@ -149,7 +149,7 @@ func (r *Inputer) processRoomEvent( ExcludeSelf: true, } if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil { - return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err) + return false, fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err) } // Sort all of the servers into a map so that we can randomise // their order. Then make sure that the input origin and the @@ -179,7 +179,7 @@ func (r *Inputer) processRoomEvent( authEvents := gomatrixserverlib.NewAuthEvents(nil) knownEvents := map[string]*types.Event{} if err = r.fetchAuthEvents(ctx, updater, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil { - return fmt.Errorf("r.fetchAuthEvents: %w", err) + return false, fmt.Errorf("r.fetchAuthEvents: %w", err) } // Check if the event is allowed by its auth events. If it isn't then @@ -195,7 +195,7 @@ func (r *Inputer) processRoomEvent( authEventNIDs := make([]types.EventNID, 0, len(authEventIDs)) for _, authEventID := range authEventIDs { if _, ok := knownEvents[authEventID]; !ok { - return fmt.Errorf("missing auth event %s", authEventID) + return false, fmt.Errorf("missing auth event %s", authEventID) } authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID) } @@ -252,14 +252,14 @@ func (r *Inputer) processRoomEvent( // Store the event. _, _, stateAtEvent, redactionEvent, redactedEventID, err := updater.StoreEvent(ctx, event, authEventNIDs, isRejected) if err != nil { - return fmt.Errorf("updater.StoreEvent: %w", err) + return false, fmt.Errorf("updater.StoreEvent: %w", err) } // if storing this event results in it being redacted then do so. if !isRejected && redactedEventID == event.EventID() { r, rerr := eventutil.RedactEvent(redactionEvent, event) if rerr != nil { - return fmt.Errorf("eventutil.RedactEvent: %w", rerr) + return false, fmt.Errorf("eventutil.RedactEvent: %w", rerr) } event = r } @@ -270,15 +270,15 @@ func (r *Inputer) processRoomEvent( if input.Kind == api.KindOutlier { logger.Debug("Stored outlier") hooks.Run(hooks.KindNewEventPersisted, headered) - return nil + return true, nil } roomInfo, err := updater.RoomInfo(ctx, event.RoomID()) if err != nil { - return fmt.Errorf("updater.RoomInfo: %w", err) + return false, fmt.Errorf("updater.RoomInfo: %w", err) } if roomInfo == nil { - return fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID()) + return false, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID()) } if !missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0 { @@ -286,7 +286,7 @@ func (r *Inputer) processRoomEvent( // Lets calculate one. err = r.calculateAndSetState(ctx, updater, input, roomInfo, &stateAtEvent, event, isRejected) if err != nil { - return fmt.Errorf("r.calculateAndSetState: %w", err) + return false, fmt.Errorf("r.calculateAndSetState: %w", err) } } @@ -296,7 +296,7 @@ func (r *Inputer) processRoomEvent( "soft_fail": softfail, "missing_prev": missingPrev, }).Warn("Stored rejected event") - return nil + return true, rejectionErr } switch input.Kind { @@ -311,7 +311,7 @@ func (r *Inputer) processRoomEvent( input.TransactionID, // transaction ID input.HasState, // rewrites state? ); err != nil { - return fmt.Errorf("r.updateLatestEvents: %w", err) + return false, fmt.Errorf("r.updateLatestEvents: %w", err) } case api.KindOld: err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ @@ -323,7 +323,7 @@ func (r *Inputer) processRoomEvent( }, }) if err != nil { - return fmt.Errorf("r.WriteOutputEvents (old): %w", err) + return false, fmt.Errorf("r.WriteOutputEvents (old): %w", err) } } @@ -342,14 +342,14 @@ func (r *Inputer) processRoomEvent( }, }) if err != nil { - return fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) + return false, fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) } } // Everything was OK — the latest events updater didn't error and // we've sent output events. Finally, generate a hook call. hooks.Run(hooks.KindNewEventPersisted, headered) - return nil + return true, nil } // fetchAuthEvents will check to see if any of the diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index e76cc2cac..5cc4a2e90 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -79,7 +79,7 @@ func (t *missingStateReq) processEventWithMissingState( // we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled // in the gap in the DAG for _, newEvent := range newEvents { - err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{ + _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{ Kind: api.KindNew, Event: newEvent.Headered(roomVersion), Origin: t.origin, @@ -188,7 +188,7 @@ func (t *missingStateReq) processEventWithMissingState( } // TODO: we could do this concurrently? for _, ire := range outlierRoomEvents { - if err = t.inputer.processRoomEvent(ctx, t.db, &ire); err != nil { + if _, err = t.inputer.processRoomEvent(ctx, t.db, &ire); err != nil { return fmt.Errorf("t.inputer.processRoomEvent[outlier]: %w", err) } } @@ -201,7 +201,7 @@ func (t *missingStateReq) processEventWithMissingState( stateIDs = append(stateIDs, event.EventID()) } - err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{ + _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{ Kind: api.KindOld, Event: backwardsExtremity.Headered(roomVersion), Origin: t.origin, @@ -218,7 +218,7 @@ func (t *missingStateReq) processEventWithMissingState( // they will automatically fast-forward based on the room state at the // extremity in the last step. for _, newEvent := range newEvents { - err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{ + _, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{ Kind: api.KindOld, Event: newEvent.Headered(roomVersion), Origin: t.origin,