diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index e6f325b47..7933f9750 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -120,7 +120,7 @@ func (r *Inputer) Start() error { nats.DeliverAll(), // Ensure that NATS doesn't try to resend us something that wasn't done // within the period of time that we might still be processing it. - nats.AckWait(MaximumProcessingTime+(time.Second*10)), + nats.AckWait((MaximumMissingProcessingTime*2)+(time.Second*10)), ) return err } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 8f262ebe6..2dc096674 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -41,7 +41,7 @@ func init() { } // TODO: Does this value make sense? -const MaximumProcessingTime = time.Minute * 2 +const MaximumMissingProcessingTime = time.Minute * 2 var processRoomEventDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -66,11 +66,11 @@ var processRoomEventDuration = prometheus.NewHistogramVec( // TODO: Break up function - we should probably do transaction ID checks before calling this. // nolint:gocyclo func (r *Inputer) processRoomEvent( - inctx context.Context, + ctx context.Context, input *api.InputRoomEvent, ) (err error) { select { - case <-inctx.Done(): + case <-ctx.Done(): // Before we do anything, make sure the context hasn't expired for this pending task. // If it has then we'll give up straight away — it's probably a synchronous input // request and the caller has already given up, but the inbox task was still queued. @@ -78,13 +78,6 @@ func (r *Inputer) processRoomEvent( default: } - // Wrap the context with a time limit. We'll allow no more than MaximumProcessingTime for - // everything that we need to do for this event, or it's possible that we could end up wedging - // the roomserver for a very long time. - var cancel context.CancelFunc - ctx, cancel := context.WithTimeout(inctx, MaximumProcessingTime) - defer cancel() - // Measure how long it takes to process this event. started := time.Now() defer func() { @@ -344,6 +337,10 @@ func (r *Inputer) fetchAuthEvents( known map[string]*types.Event, servers []gomatrixserverlib.ServerName, ) error { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, MaximumMissingProcessingTime) + defer cancel() + unknown := map[string]struct{}{} authEventIDs := event.AuthEventIDs() if len(authEventIDs) == 0 { diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index 44710962c..862b3a7fe 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -37,6 +37,10 @@ type missingStateReq struct { func (t *missingStateReq) processEventWithMissingState( ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, ) error { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, MaximumMissingProcessingTime) + defer cancel() + // We are missing the previous events for this events. // This means that there is a gap in our view of the history of the // room. There two ways that we can handle such a gap: