This commit is contained in:
Neil Alexander 2020-05-05 14:13:28 +01:00
parent c4ee20c95e
commit 93eb8163a1

View file

@ -24,7 +24,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
)
@ -51,6 +51,7 @@ type destinationQueue struct {
idleCounter atomic.Uint32 // how many ticks have we done nothing?
failCounter atomic.Uint32 // how many times have we failed?
sentCounter atomic.Uint32 // how many times have we succeeded?
wakeup chan bool // wakes up a sleeping worker
runningMutex sync.RWMutex // protects the below
lastTransactionIDs []gomatrixserverlib.TransactionID // protected by runningMutex
pendingPDUs []*gomatrixserverlib.HeaderedEvent // protected by runningMutex
@ -73,12 +74,14 @@ func (oq *destinationQueue) backoff() bool {
// backoff based on how many times we have failed already. The
// worker goroutine will wait until this time before processing
// anything from the queue.
backoffSeconds := math.Exp2(float64(failCounter))
backoffSeconds := time.Second * time.Duration(math.Exp2(float64(failCounter)))
oq.backoffUntil.Store(
time.Now().Add(time.Second * time.Duration(backoffSeconds)),
time.Now().Add(backoffSeconds),
)
logrus.WithField("server_name", oq.destination).Infof("Increasing backoff to %s", backoffSeconds)
return false // Don't give up yet.
} else {
logrus.WithField("server_name", oq.destination).Infof("Backlisting due to errors")
// We've exceeded the maximum amount of times we're willing
// to back off, which is probably in the region of hours by
// now. Just give up - clear the queues and reset the queue
@ -112,10 +115,13 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) {
return
}
oq.runningMutex.Lock()
oq.pendingPDUs = append(oq.pendingPDUs, ev)
evCopy := *ev
oq.pendingPDUs = append(oq.pendingPDUs, &evCopy)
oq.runningMutex.Unlock()
if !oq.running.Load() {
go oq.backgroundSend()
} else {
oq.wakeup <- true
}
}
@ -128,10 +134,13 @@ func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) {
return
}
oq.runningMutex.Lock()
oq.pendingEDUs = append(oq.pendingEDUs, e)
eCopy := *e
oq.pendingEDUs = append(oq.pendingEDUs, &eCopy)
oq.runningMutex.Unlock()
if !oq.running.Load() {
go oq.backgroundSend()
} else {
oq.wakeup <- true
}
}
@ -144,18 +153,23 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
return
}
oq.runningMutex.Lock()
oq.pendingInvites = append(oq.pendingInvites, ev)
evCopy := *ev
oq.pendingInvites = append(oq.pendingInvites, &evCopy)
oq.runningMutex.Unlock()
if !oq.running.Load() {
go oq.backgroundSend()
} else {
oq.wakeup <- true
}
}
// backgroundSend is the worker goroutine for sending events.
func (oq *destinationQueue) backgroundSend() {
// Mark the worker as running for its lifetime.
oq.wakeup = make(chan bool)
oq.running.Store(true)
defer oq.running.Store(false)
defer close(oq.wakeup)
for {
// Wait for our backoff timer.
@ -163,6 +177,8 @@ func (oq *destinationQueue) backgroundSend() {
if b, ok := oq.backoffUntil.Load().(time.Time); ok {
backoffUntil = b
}
// If we have a backoff period then sit and wait for it.
if backoffUntil.After(time.Now()) {
<-time.After(time.Until(backoffUntil))
}
@ -212,27 +228,24 @@ func (oq *destinationQueue) backgroundSend() {
// If we successfully sent the invites then clear out
// the pending invites.
if invites {
oq.success()
oq.runningMutex.Lock()
oq.pendingInvites = oq.pendingInvites[:0]
oq.runningMutex.Unlock()
}
}
// At this point, if we did everything successfully,
// we can reset the backoff duration.
if idleCounter >= 5 {
// Wait either for a few seconds, or until a new event is
// available.
select {
case <-oq.wakeup:
case <-time.After(time.Second * 5):
// If this worker has been idle for a while then stop
// running it, otherwise the goroutine will just tick
// endlessly. It'll get automatically restarted when
// a new event needs to be sent.
if idleCounter >= 5 {
return
} else {
// Otherwise, add to the ticker counter and ask the
// next iteration to wait for a second (to stop CPU
// spinning).
oq.idleCounter.Store(idleCounter + 1)
oq.backoffUntil.Store(time.Now().Add(time.Second))
}
}
}
}
@ -271,7 +284,7 @@ func (oq *destinationQueue) nextTransaction(
t.EDUs = append(t.EDUs, *edu)
}
util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
logrus.WithField("server_name", oq.destination).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
_, err := oq.client.SendTransaction(context.TODO(), t)
if err != nil {