diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 12845061a..237e50829 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -170,6 +170,7 @@ func (oq *destinationQueue) wakeQueueIfNeeded() { // getPendingFromDatabase will look at the database and see if // there are any persisted events that haven't been sent to this // destination yet. If so, they will be queued up. +// nolint:gocyclo func (oq *destinationQueue) getPendingFromDatabase() { // Check to see if there's anything to do for this server // in the database. @@ -177,10 +178,27 @@ func (oq *destinationQueue) getPendingFromDatabase() { ctx := context.Background() oq.pendingMutex.Lock() defer oq.pendingMutex.Unlock() + + // Take a note of all of the PDUs and EDUs that we already + // have cached. We will index them based on the receipt, + // which ultimately just contains the index of the PDU/EDU + // in the database. + gotPDUs := map[string]struct{}{} + gotEDUs := map[string]struct{}{} + for _, pdu := range oq.pendingPDUs { + gotPDUs[pdu.receipt.String()] = struct{}{} + } + for _, edu := range oq.pendingEDUs { + gotEDUs[edu.receipt.String()] = struct{}{} + } + if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 { // We have room in memory for some PDUs - let's request no more than that. if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil { for receipt, pdu := range pdus { + if _, ok := gotPDUs[receipt.String()]; ok { + continue + } oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu}) retrieved = true } @@ -192,6 +210,9 @@ func (oq *destinationQueue) getPendingFromDatabase() { // We have room in memory for some EDUs - let's request no more than that. if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil { for receipt, edu := range edus { + if _, ok := gotEDUs[receipt.String()]; ok { + continue + } oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu}) retrieved = true } diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index f5921672d..fbf84c705 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -46,6 +46,10 @@ type Receipt struct { nid int64 } +func (r *Receipt) String() string { + return fmt.Sprintf("%d", r.nid) +} + // UpdateRoom updates the joined hosts for a room and returns what the joined // hosts were before the update, or nil if this was a duplicate message. // This is called when we receive a message from kafka, so we pass in