Dedupe destinations, fix other bug hopefully

This commit is contained in:
Neil Alexander 2020-05-06 14:00:04 +01:00
parent 9d293caec8
commit 18cb75b0ac
2 changed files with 21 additions and 9 deletions

View file

@ -122,7 +122,8 @@ func (oq *destinationQueue) backgroundSend() {
// Retrieve any waiting things.
oq.runningMutex.RLock()
pendingPDUs, pendingEDUs := oq.pendingPDUs, oq.pendingEDUs
pendingPDUs, numPDUs := oq.pendingPDUs, len(oq.pendingPDUs)
pendingEDUs, numEDUs := oq.pendingEDUs, len(oq.pendingEDUs)
pendingInvites := oq.pendingInvites
idleCounter, sentCounter := oq.idleCounter.Load(), oq.statistics.SuccessCount()
oq.runningMutex.RUnlock()
@ -156,13 +157,19 @@ func (oq *destinationQueue) backgroundSend() {
oq.runningMutex.Lock()
// Reallocate so that the underlying arrays can be GC'd, as
// opposed to growing forever.
for i := 0; i < numPDUs; i++ {
oq.pendingPDUs[i] = nil
}
for i := 0; i < numEDUs; i++ {
oq.pendingEDUs[i] = nil
}
oq.pendingPDUs = append(
[]*gomatrixserverlib.HeaderedEvent{},
oq.pendingPDUs[len(pendingPDUs):]...,
oq.pendingPDUs[numPDUs:]...,
)
oq.pendingEDUs = append(
[]*gomatrixserverlib.EDU{},
oq.pendingEDUs[len(pendingEDUs):]...,
oq.pendingEDUs[numEDUs:]...,
)
oq.runningMutex.Unlock()
}
@ -179,7 +186,6 @@ func (oq *destinationQueue) backgroundSend() {
// the backoff has exceeded a maximum allowable value.
return
}
continue
} else if sent > 0 {
// If we successfully sent the invites then clear out
// the pending invites.

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
)
@ -189,13 +190,18 @@ func (oqs *OutgoingQueues) SendEDU(
// filterDestinations removes our own server from the list of destinations.
// Otherwise we could end up trying to talk to ourselves.
func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) []gomatrixserverlib.ServerName {
var result []gomatrixserverlib.ServerName
for _, destination := range destinations {
if destination == origin {
func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) (
result []gomatrixserverlib.ServerName,
) {
strs := make([]string, len(destinations))
for i, d := range destinations {
strs[i] = string(d)
}
for _, destination := range util.UniqueStrings(strs) {
if gomatrixserverlib.ServerName(destination) == origin {
continue
}
result = append(result, destination)
result = append(result, gomatrixserverlib.ServerName(destination))
}
return result
}