diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index b8c8669d3..9a27ba1eb 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -26,7 +26,6 @@ import ( "github.com/Arceliar/phony" "github.com/getsentry/sentry-go" fedapi "github.com/matrix-org/dendrite/federationapi/api" - "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/query" @@ -103,7 +102,8 @@ func (r *Inputer) Start() error { _ = msg.InProgress() // resets the acknowledgement wait timer defer eventsInProgress.Delete(index) defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() - if err := r.processRoomEventUsingUpdater(context.Background(), roomID, &inputRoomEvent); err != nil { + retry, err := r.processRoomEventUsingUpdater(context.Background(), roomID, &inputRoomEvent) + if err != nil { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { sentry.CaptureException(err) } @@ -113,7 +113,11 @@ func (r *Inputer) Start() error { "type": inputRoomEvent.Event.Type(), }).Warn("Roomserver failed to process async event") } - _ = msg.Ack() + if !retry { + _ = msg.Ack() + } else { + _ = msg.Nak() + } }) }, // NATS wants to acknowledge automatically by default when the message is @@ -133,26 +137,34 @@ func (r *Inputer) Start() error { return err } +// processRoomEventUsingUpdater opens up a room updater and tries to +// process the event. It returns two values: the bool signifying whether +// we should retry later if possible (i.e. using NATS, because we couldn't +// commit the transaction) and an error signifying anything else that may +// have gone wrong. func (r *Inputer) processRoomEventUsingUpdater( ctx context.Context, roomID string, inputRoomEvent *api.InputRoomEvent, -) error { +) (bool, error) { roomInfo, err := r.DB.RoomInfo(ctx, roomID) if err != nil { - return fmt.Errorf("r.DB.RoomInfo: %w", err) + return false, fmt.Errorf("r.DB.RoomInfo: %w", err) } updater, err := r.DB.GetRoomUpdater(ctx, *roomInfo) if err != nil { - return fmt.Errorf("r.DB.GetRoomUpdater: %w", err) + return true, fmt.Errorf("r.DB.GetRoomUpdater: %w", err) } - succeeded := false - defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err) if err = r.processRoomEvent(ctx, updater, inputRoomEvent); err != nil { - return fmt.Errorf("r.processRoomEvent: %w", err) + if rerr := updater.Rollback(); rerr != nil { + return false, fmt.Errorf("r.processRoomEvent: %w (with additional error rolling back transaction: %s)", err, rerr) + } + return false, fmt.Errorf("r.processRoomEvent: %w", err) } - succeeded = true - return nil + if err = updater.Commit(); err != nil { + return true, fmt.Errorf("updater.Commit: %w", err) + } + return false, nil } // InputRoomEvents implements api.RoomserverInternalAPI @@ -202,7 +214,7 @@ func (r *Inputer) InputRoomEvents( worker.Act(nil, func() { defer eventsInProgress.Delete(index) defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() - err := r.processRoomEventUsingUpdater(ctx, roomID, &inputRoomEvent) + _, err := r.processRoomEventUsingUpdater(ctx, roomID, &inputRoomEvent) if err != nil { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { sentry.CaptureException(err)