diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index a9609361b..76357e83b 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -122,7 +122,8 @@ func (oq *destinationQueue) backgroundSend() { // Retrieve any waiting things. oq.runningMutex.RLock() - pendingPDUs, pendingEDUs := oq.pendingPDUs, oq.pendingEDUs + pendingPDUs, numPDUs := oq.pendingPDUs, len(oq.pendingPDUs) + pendingEDUs, numEDUs := oq.pendingEDUs, len(oq.pendingEDUs) pendingInvites := oq.pendingInvites idleCounter, sentCounter := oq.idleCounter.Load(), oq.statistics.SuccessCount() oq.runningMutex.RUnlock() @@ -156,13 +157,19 @@ func (oq *destinationQueue) backgroundSend() { oq.runningMutex.Lock() // Reallocate so that the underlying arrays can be GC'd, as // opposed to growing forever. + for i := 0; i < numPDUs; i++ { + oq.pendingPDUs[i] = nil + } + for i := 0; i < numEDUs; i++ { + oq.pendingEDUs[i] = nil + } oq.pendingPDUs = append( []*gomatrixserverlib.HeaderedEvent{}, - oq.pendingPDUs[len(pendingPDUs):]..., + oq.pendingPDUs[numPDUs:]..., ) oq.pendingEDUs = append( []*gomatrixserverlib.EDU{}, - oq.pendingEDUs[len(pendingEDUs):]..., + oq.pendingEDUs[numEDUs:]..., ) oq.runningMutex.Unlock() } @@ -179,7 +186,6 @@ func (oq *destinationQueue) backgroundSend() { // the backoff has exceeded a maximum allowable value. return } - continue } else if sent > 0 { // If we successfully sent the invites then clear out // the pending invites. diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 74e6da3a9..031e9538c 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" log "github.com/sirupsen/logrus" ) @@ -189,13 +190,18 @@ func (oqs *OutgoingQueues) SendEDU( // filterDestinations removes our own server from the list of destinations. // Otherwise we could end up trying to talk to ourselves. -func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) []gomatrixserverlib.ServerName { - var result []gomatrixserverlib.ServerName - for _, destination := range destinations { - if destination == origin { +func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) ( + result []gomatrixserverlib.ServerName, +) { + strs := make([]string, len(destinations)) + for i, d := range destinations { + strs[i] = string(d) + } + for _, destination := range util.UniqueStrings(strs) { + if gomatrixserverlib.ServerName(destination) == origin { continue } - result = append(result, destination) + result = append(result, gomatrixserverlib.ServerName(destination)) } return result }