diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 8e3d7d86c..f1f6169d9 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -276,6 +276,7 @@ NextPDU: } } +/* func fromStateTuples(tuples []gomatrixserverlib.StateKeyTuple, omitTuples []gomatrixserverlib.StateKeyTuple) (result []*gomatrixserverlib.HeaderedEvent) { NextTuple: for _, t := range tuples { @@ -291,6 +292,7 @@ NextTuple: } return } +*/ func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []*gomatrixserverlib.HeaderedEvent) { for _, g := range got { diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 27809e54f..c6abbae1e 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -44,7 +44,7 @@ var keyContentFields = map[string]string{ } // TODO: Does this value make sense? -const MaximumProcessingTime = time.Minute +const MaximumProcessingTime = time.Minute * 2 type Inputer struct { DB storage.Database @@ -80,9 +80,11 @@ func (r *Inputer) Start() error { _ = msg.Term() return } + + _ = msg.InProgress() roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() - worker := r.workerForRoom(roomID) - worker.Act(nil, func() { + r.workerForRoom(roomID).Act(nil, func() { + _ = msg.InProgress() // resets the acknowledgement wait timer ctx, cancel := context.WithTimeout(context.Background(), MaximumProcessingTime) defer cancel() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() @@ -99,15 +101,14 @@ func (r *Inputer) Start() error { // sure that we only acknowledge when we're happy we've done everything we // can. This ensures we retry things when it makes sense to do so. nats.ManualAck(), - // NATS will try to redeliver things to us automatically if we don't ack - // or nak them within a certain amount of time. This stops that from - // happening, so we don't end up doing a lot of unnecessary duplicate work. - nats.MaxDeliver(0), // Use a durable named consumer. r.Durable, // If we've missed things in the stream, e.g. we restarted, then replay // all of the queued messages that were waiting for us. nats.DeliverAll(), + // Ensure that NATS doesn't try to resend us something that wasn't done + // within the period of time that we might still be processing it. + nats.AckWait(MaximumProcessingTime+(time.Second*10)), ) return err }