Clearer commit and rollback results

This commit is contained in:
Neil Alexander 2022-02-04 09:50:29 +00:00
parent 8b89313851
commit 6f6c6a4e21
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 41 additions and 33 deletions

View file

@ -39,6 +39,19 @@ import (
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
) )
type retryAction int
type commitAction int
const (
doNotRetry retryAction = iota
retryLater
)
const (
commitTransaction commitAction = iota
rollbackTransaction
)
var keyContentFields = map[string]string{ var keyContentFields = map[string]string{
"m.room.join_rules": "join_rule", "m.room.join_rules": "join_rule",
"m.room.history_visibility": "history_visibility", "m.room.history_visibility": "history_visibility",
@ -138,13 +151,6 @@ func (r *Inputer) Start() error {
return err return err
} }
type retryAction int
const (
doNotRetry retryAction = iota
retryLater
)
// processRoomEventUsingUpdater opens up a room updater and tries to // processRoomEventUsingUpdater opens up a room updater and tries to
// process the event. It returns whether or not we should positively // process the event. It returns whether or not we should positively
// or negatively acknowledge the event (i.e. for NATS) and an error // or negatively acknowledge the event (i.e. for NATS) and an error
@ -162,12 +168,13 @@ func (r *Inputer) processRoomEventUsingUpdater(
if err != nil { if err != nil {
return retryLater, fmt.Errorf("r.DB.GetRoomUpdater: %w", err) return retryLater, fmt.Errorf("r.DB.GetRoomUpdater: %w", err)
} }
commit, err := r.processRoomEvent(ctx, updater, inputRoomEvent) action, err := r.processRoomEvent(ctx, updater, inputRoomEvent)
if commit { switch action {
case commitTransaction:
if cerr := updater.Commit(); cerr != nil { if cerr := updater.Commit(); cerr != nil {
return retryLater, fmt.Errorf("updater.Commit: %w", cerr) return retryLater, fmt.Errorf("updater.Commit: %w", cerr)
} }
} else { case rollbackTransaction:
if rerr := updater.Rollback(); rerr != nil { if rerr := updater.Rollback(); rerr != nil {
return retryLater, fmt.Errorf("updater.Rollback: %w", rerr) return retryLater, fmt.Errorf("updater.Rollback: %w", rerr)
} }

View file

@ -70,13 +70,13 @@ func (r *Inputer) processRoomEvent(
ctx context.Context, ctx context.Context,
updater *shared.RoomUpdater, updater *shared.RoomUpdater,
input *api.InputRoomEvent, input *api.InputRoomEvent,
) (commit bool, err error) { ) (commitAction, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// Before we do anything, make sure the context hasn't expired for this pending task. // Before we do anything, make sure the context hasn't expired for this pending task.
// If it has then we'll give up straight away — it's probably a synchronous input // If it has then we'll give up straight away — it's probably a synchronous input
// request and the caller has already given up, but the inbox task was still queued. // request and the caller has already given up, but the inbox task was still queued.
return false, context.DeadlineExceeded return rollbackTransaction, context.DeadlineExceeded
default: default:
} }
@ -118,11 +118,11 @@ func (r *Inputer) processRoomEvent(
case gomatrixserverlib.EventIDFormatV1: case gomatrixserverlib.EventIDFormatV1:
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
logger.Debugf("Already processed event; ignoring") logger.Debugf("Already processed event; ignoring")
return false, nil return rollbackTransaction, nil
} }
default: default:
logger.Debugf("Already processed event; ignoring") logger.Debugf("Already processed event; ignoring")
return false, nil return rollbackTransaction, nil
} }
} }
} }
@ -136,8 +136,8 @@ func (r *Inputer) processRoomEvent(
AuthEventIDs: event.AuthEventIDs(), AuthEventIDs: event.AuthEventIDs(),
PrevEventIDs: event.PrevEventIDs(), PrevEventIDs: event.PrevEventIDs(),
} }
if err = r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil { if err := r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil {
return false, fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err) return rollbackTransaction, fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
} }
} }
missingAuth := len(missingRes.MissingAuthEventIDs) > 0 missingAuth := len(missingRes.MissingAuthEventIDs) > 0
@ -148,8 +148,8 @@ func (r *Inputer) processRoomEvent(
RoomID: event.RoomID(), RoomID: event.RoomID(),
ExcludeSelf: true, ExcludeSelf: true,
} }
if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil { if err := r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
return false, fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err) return rollbackTransaction, fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
} }
// Sort all of the servers into a map so that we can randomise // Sort all of the servers into a map so that we can randomise
// their order. Then make sure that the input origin and the // their order. Then make sure that the input origin and the
@ -178,8 +178,8 @@ func (r *Inputer) processRoomEvent(
isRejected := false isRejected := false
authEvents := gomatrixserverlib.NewAuthEvents(nil) authEvents := gomatrixserverlib.NewAuthEvents(nil)
knownEvents := map[string]*types.Event{} knownEvents := map[string]*types.Event{}
if err = r.fetchAuthEvents(ctx, updater, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil { if err := r.fetchAuthEvents(ctx, updater, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
return false, fmt.Errorf("r.fetchAuthEvents: %w", err) return rollbackTransaction, fmt.Errorf("r.fetchAuthEvents: %w", err)
} }
// Check if the event is allowed by its auth events. If it isn't then // Check if the event is allowed by its auth events. If it isn't then
@ -195,7 +195,7 @@ func (r *Inputer) processRoomEvent(
authEventNIDs := make([]types.EventNID, 0, len(authEventIDs)) authEventNIDs := make([]types.EventNID, 0, len(authEventIDs))
for _, authEventID := range authEventIDs { for _, authEventID := range authEventIDs {
if _, ok := knownEvents[authEventID]; !ok { if _, ok := knownEvents[authEventID]; !ok {
return false, fmt.Errorf("missing auth event %s", authEventID) return rollbackTransaction, fmt.Errorf("missing auth event %s", authEventID)
} }
authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID) authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID)
} }
@ -204,6 +204,7 @@ func (r *Inputer) processRoomEvent(
if input.Kind == api.KindNew { if input.Kind == api.KindNew {
// Check that the event passes authentication checks based on the // Check that the event passes authentication checks based on the
// current room state. // current room state.
var err error
softfail, err = helpers.CheckForSoftFail(ctx, updater, headered, input.StateEventIDs) softfail, err = helpers.CheckForSoftFail(ctx, updater, headered, input.StateEventIDs)
if err != nil { if err != nil {
logger.WithError(err).Warn("Error authing soft-failed event") logger.WithError(err).Warn("Error authing soft-failed event")
@ -237,7 +238,7 @@ func (r *Inputer) processRoomEvent(
hadEvents: map[string]bool{}, hadEvents: map[string]bool{},
haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{}, haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{},
} }
if err = missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil { if err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
isRejected = true isRejected = true
rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err) rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err)
} else { } else {
@ -252,14 +253,14 @@ func (r *Inputer) processRoomEvent(
// Store the event. // Store the event.
_, _, stateAtEvent, redactionEvent, redactedEventID, err := updater.StoreEvent(ctx, event, authEventNIDs, isRejected) _, _, stateAtEvent, redactionEvent, redactedEventID, err := updater.StoreEvent(ctx, event, authEventNIDs, isRejected)
if err != nil { if err != nil {
return false, fmt.Errorf("updater.StoreEvent: %w", err) return rollbackTransaction, fmt.Errorf("updater.StoreEvent: %w", err)
} }
// if storing this event results in it being redacted then do so. // if storing this event results in it being redacted then do so.
if !isRejected && redactedEventID == event.EventID() { if !isRejected && redactedEventID == event.EventID() {
r, rerr := eventutil.RedactEvent(redactionEvent, event) r, rerr := eventutil.RedactEvent(redactionEvent, event)
if rerr != nil { if rerr != nil {
return false, fmt.Errorf("eventutil.RedactEvent: %w", rerr) return rollbackTransaction, fmt.Errorf("eventutil.RedactEvent: %w", rerr)
} }
event = r event = r
} }
@ -270,15 +271,15 @@ func (r *Inputer) processRoomEvent(
if input.Kind == api.KindOutlier { if input.Kind == api.KindOutlier {
logger.Debug("Stored outlier") logger.Debug("Stored outlier")
hooks.Run(hooks.KindNewEventPersisted, headered) hooks.Run(hooks.KindNewEventPersisted, headered)
return true, nil return commitTransaction, nil
} }
roomInfo, err := updater.RoomInfo(ctx, event.RoomID()) roomInfo, err := updater.RoomInfo(ctx, event.RoomID())
if err != nil { if err != nil {
return false, fmt.Errorf("updater.RoomInfo: %w", err) return rollbackTransaction, fmt.Errorf("updater.RoomInfo: %w", err)
} }
if roomInfo == nil { if roomInfo == nil {
return false, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID()) return rollbackTransaction, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID())
} }
if !missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0 { if !missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0 {
@ -286,7 +287,7 @@ func (r *Inputer) processRoomEvent(
// Lets calculate one. // Lets calculate one.
err = r.calculateAndSetState(ctx, updater, input, roomInfo, &stateAtEvent, event, isRejected) err = r.calculateAndSetState(ctx, updater, input, roomInfo, &stateAtEvent, event, isRejected)
if err != nil { if err != nil {
return false, fmt.Errorf("r.calculateAndSetState: %w", err) return rollbackTransaction, fmt.Errorf("r.calculateAndSetState: %w", err)
} }
} }
@ -296,7 +297,7 @@ func (r *Inputer) processRoomEvent(
"soft_fail": softfail, "soft_fail": softfail,
"missing_prev": missingPrev, "missing_prev": missingPrev,
}).Warn("Stored rejected event") }).Warn("Stored rejected event")
return true, rejectionErr return commitTransaction, rejectionErr
} }
switch input.Kind { switch input.Kind {
@ -311,7 +312,7 @@ func (r *Inputer) processRoomEvent(
input.TransactionID, // transaction ID input.TransactionID, // transaction ID
input.HasState, // rewrites state? input.HasState, // rewrites state?
); err != nil { ); err != nil {
return false, fmt.Errorf("r.updateLatestEvents: %w", err) return rollbackTransaction, fmt.Errorf("r.updateLatestEvents: %w", err)
} }
case api.KindOld: case api.KindOld:
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
@ -323,7 +324,7 @@ func (r *Inputer) processRoomEvent(
}, },
}) })
if err != nil { if err != nil {
return false, fmt.Errorf("r.WriteOutputEvents (old): %w", err) return rollbackTransaction, fmt.Errorf("r.WriteOutputEvents (old): %w", err)
} }
} }
@ -342,14 +343,14 @@ func (r *Inputer) processRoomEvent(
}, },
}) })
if err != nil { if err != nil {
return false, fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) return rollbackTransaction, fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
} }
} }
// Everything was OK — the latest events updater didn't error and // Everything was OK — the latest events updater didn't error and
// we've sent output events. Finally, generate a hook call. // we've sent output events. Finally, generate a hook call.
hooks.Run(hooks.KindNewEventPersisted, headered) hooks.Run(hooks.KindNewEventPersisted, headered)
return true, nil return commitTransaction, nil
} }
// fetchAuthEvents will check to see if any of the // fetchAuthEvents will check to see if any of the