diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 20d2cfc7a..f37a04175 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -108,12 +108,16 @@ type worker struct { r *Inputer roomID string subscription *nats.Subscription + sentryHub *sentry.Hub } func (r *Inputer) startWorkerForRoom(roomID string) { v, loaded := r.workers.LoadOrStore(roomID, &worker{ r: r, roomID: roomID, + // We're cloning the CurrentHub, as we otherwise get total garbage + // in sentry, with i.e. mismatching rooms and event_ids. + sentryHub: sentry.CurrentHub().Clone(), }) w := v.(*worker) w.Lock() @@ -265,7 +269,7 @@ func (w *worker) _next() { // Look up what the next event is that's waiting to be processed. ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Minute) defer cancel() - if scope := sentry.CurrentHub().Scope(); scope != nil { + if scope := w.sentryHub.Scope(); scope != nil { scope.SetTag("room_id", w.roomID) } msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) @@ -323,7 +327,7 @@ func (w *worker) _next() { return } - if scope := sentry.CurrentHub().Scope(); scope != nil { + if scope := w.sentryHub.Scope(); scope != nil { scope.SetTag("event_id", inputRoomEvent.Event.EventID()) } @@ -347,7 +351,7 @@ func (w *worker) _next() { }).Warn("Roomserver rejected event") default: if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { - sentry.CaptureException(err) + w.sentryHub.CaptureException(err) } logrus.WithError(err).WithFields(logrus.Fields{ "room_id": w.roomID, diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index cf4fe04be..e07a05213 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -79,12 +79,26 @@ func (r *Inputer) updateLatestEvents( return nil } + // Attempt to get Sentry hub + w, loaded := r.workers.Load(event.RoomID()) + if !loaded { + // this _should_ never happen... + logrus.Panicf("failed to load worker on existing room") + } + + s, ok := w.(*worker) + if !ok { + // this _should_ never happen as well... + logrus.Panicf("failed to get sentry hub from worker") + } + u := latestEventsUpdater{ api: r, updater: updater, stateAtEvent: stateAtEvent, event: event, rewritesState: rewritesState, + sentryHub: s.sentryHub, } var updates []api.OutputEvent @@ -149,6 +163,7 @@ type latestEventsUpdater struct { // The snapshots of current state before and after processing this event oldStateNID types.StateSnapshotNID newStateNID types.StateSnapshotNID + sentryHub *sentry.Hub } func (u *latestEventsUpdater) doUpdateLatestEvents(ctx context.Context, roomInfo *types.RoomInfo) ([]api.OutputEvent, error) { @@ -288,7 +303,7 @@ func (u *latestEventsUpdater) latestState(ctx context.Context, roomInfo *types.R "rewrites_state": u.rewritesState, "state_at_event": fmt.Sprintf("%#v", u.stateAtEvent), }).Warnf("State reset detected (removing %d events)", removed) - sentry.WithScope(func(scope *sentry.Scope) { + u.sentryHub.WithScope(func(scope *sentry.Scope) { scope.SetLevel("warning") scope.SetContext("State reset", map[string]interface{}{ "Event ID": u.event.EventID(), @@ -300,7 +315,7 @@ func (u *latestEventsUpdater) latestState(ctx context.Context, roomInfo *types.R "State rewritten": fmt.Sprintf("%v", u.rewritesState), "State at event": fmt.Sprintf("%#v", u.stateAtEvent), }) - sentry.CaptureMessage("State reset detected") + u.sentryHub.CaptureMessage("State reset detected") }) }