mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-31 10:43:10 -06:00
Reduce chance of duplicates being sent by NATS
This commit is contained in:
parent
9ddb8749c1
commit
d0ee9f9841
|
|
@ -276,6 +276,7 @@ NextPDU:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
func fromStateTuples(tuples []gomatrixserverlib.StateKeyTuple, omitTuples []gomatrixserverlib.StateKeyTuple) (result []*gomatrixserverlib.HeaderedEvent) {
|
func fromStateTuples(tuples []gomatrixserverlib.StateKeyTuple, omitTuples []gomatrixserverlib.StateKeyTuple) (result []*gomatrixserverlib.HeaderedEvent) {
|
||||||
NextTuple:
|
NextTuple:
|
||||||
for _, t := range tuples {
|
for _, t := range tuples {
|
||||||
|
|
@ -291,6 +292,7 @@ NextTuple:
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []*gomatrixserverlib.HeaderedEvent) {
|
func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []*gomatrixserverlib.HeaderedEvent) {
|
||||||
for _, g := range got {
|
for _, g := range got {
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ var keyContentFields = map[string]string{
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Does this value make sense?
|
// TODO: Does this value make sense?
|
||||||
const MaximumProcessingTime = time.Minute
|
const MaximumProcessingTime = time.Minute * 2
|
||||||
|
|
||||||
type Inputer struct {
|
type Inputer struct {
|
||||||
DB storage.Database
|
DB storage.Database
|
||||||
|
|
@ -80,9 +80,11 @@ func (r *Inputer) Start() error {
|
||||||
_ = msg.Term()
|
_ = msg.Term()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ = msg.InProgress()
|
||||||
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
||||||
worker := r.workerForRoom(roomID)
|
r.workerForRoom(roomID).Act(nil, func() {
|
||||||
worker.Act(nil, func() {
|
_ = msg.InProgress() // resets the acknowledgement wait timer
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), MaximumProcessingTime)
|
ctx, cancel := context.WithTimeout(context.Background(), MaximumProcessingTime)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||||
|
|
@ -99,15 +101,14 @@ func (r *Inputer) Start() error {
|
||||||
// sure that we only acknowledge when we're happy we've done everything we
|
// sure that we only acknowledge when we're happy we've done everything we
|
||||||
// can. This ensures we retry things when it makes sense to do so.
|
// can. This ensures we retry things when it makes sense to do so.
|
||||||
nats.ManualAck(),
|
nats.ManualAck(),
|
||||||
// NATS will try to redeliver things to us automatically if we don't ack
|
|
||||||
// or nak them within a certain amount of time. This stops that from
|
|
||||||
// happening, so we don't end up doing a lot of unnecessary duplicate work.
|
|
||||||
nats.MaxDeliver(0),
|
|
||||||
// Use a durable named consumer.
|
// Use a durable named consumer.
|
||||||
r.Durable,
|
r.Durable,
|
||||||
// If we've missed things in the stream, e.g. we restarted, then replay
|
// If we've missed things in the stream, e.g. we restarted, then replay
|
||||||
// all of the queued messages that were waiting for us.
|
// all of the queued messages that were waiting for us.
|
||||||
nats.DeliverAll(),
|
nats.DeliverAll(),
|
||||||
|
// Ensure that NATS doesn't try to resend us something that wasn't done
|
||||||
|
// within the period of time that we might still be processing it.
|
||||||
|
nats.AckWait(MaximumProcessingTime+(time.Second*10)),
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue