This commit is contained in:
Neil Alexander 2020-05-05 14:37:06 +01:00
parent 93eb8163a1
commit c71bcb2cdf

View file

@ -81,7 +81,6 @@ func (oq *destinationQueue) backoff() bool {
logrus.WithField("server_name", oq.destination).Infof("Increasing backoff to %s", backoffSeconds) logrus.WithField("server_name", oq.destination).Infof("Increasing backoff to %s", backoffSeconds)
return false // Don't give up yet. return false // Don't give up yet.
} else { } else {
logrus.WithField("server_name", oq.destination).Infof("Backlisting due to errors")
// We've exceeded the maximum amount of times we're willing // We've exceeded the maximum amount of times we're willing
// to back off, which is probably in the region of hours by // to back off, which is probably in the region of hours by
// now. Just give up - clear the queues and reset the queue // now. Just give up - clear the queues and reset the queue
@ -92,6 +91,7 @@ func (oq *destinationQueue) backoff() bool {
oq.pendingEDUs = nil oq.pendingEDUs = nil
oq.pendingInvites = nil oq.pendingInvites = nil
oq.runningMutex.Unlock() oq.runningMutex.Unlock()
logrus.WithField("server_name", oq.destination).Infof("Blacklisting server due to %d consecutive errors", failCounter)
return true // Give up. return true // Give up.
} }
} }
@ -115,8 +115,7 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) {
return return
} }
oq.runningMutex.Lock() oq.runningMutex.Lock()
evCopy := *ev oq.pendingPDUs = append(oq.pendingPDUs, ev)
oq.pendingPDUs = append(oq.pendingPDUs, &evCopy)
oq.runningMutex.Unlock() oq.runningMutex.Unlock()
if !oq.running.Load() { if !oq.running.Load() {
go oq.backgroundSend() go oq.backgroundSend()
@ -128,14 +127,13 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) {
// sendEDU adds the EDU event to the pending queue for the destination. // sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to // If the queue is empty then it starts a background goroutine to
// start sending events to that destination. // start sending events to that destination.
func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) {
if oq.blacklisted.Load() { if oq.blacklisted.Load() {
// If the destination is blacklisted then drop the event. // If the destination is blacklisted then drop the event.
return return
} }
oq.runningMutex.Lock() oq.runningMutex.Lock()
eCopy := *e oq.pendingEDUs = append(oq.pendingEDUs, ev)
oq.pendingEDUs = append(oq.pendingEDUs, &eCopy)
oq.runningMutex.Unlock() oq.runningMutex.Unlock()
if !oq.running.Load() { if !oq.running.Load() {
go oq.backgroundSend() go oq.backgroundSend()
@ -153,8 +151,7 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
return return
} }
oq.runningMutex.Lock() oq.runningMutex.Lock()
evCopy := *ev oq.pendingInvites = append(oq.pendingInvites, ev)
oq.pendingInvites = append(oq.pendingInvites, &evCopy)
oq.runningMutex.Unlock() oq.runningMutex.Unlock()
if !oq.running.Load() { if !oq.running.Load() {
go oq.backgroundSend() go oq.backgroundSend()
@ -164,6 +161,7 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
} }
// backgroundSend is the worker goroutine for sending events. // backgroundSend is the worker goroutine for sending events.
// nolint:gocyclo
func (oq *destinationQueue) backgroundSend() { func (oq *destinationQueue) backgroundSend() {
// Mark the worker as running for its lifetime. // Mark the worker as running for its lifetime.
oq.wakeup = make(chan bool) oq.wakeup = make(chan bool)
@ -207,7 +205,6 @@ func (oq *destinationQueue) backgroundSend() {
// If we successfully sent the transaction then clear out // If we successfully sent the transaction then clear out
// the pending events and EDUs. // the pending events and EDUs.
if transaction { if transaction {
oq.success()
oq.runningMutex.Lock() oq.runningMutex.Lock()
oq.pendingPDUs = oq.pendingPDUs[:0] oq.pendingPDUs = oq.pendingPDUs[:0]
oq.pendingEDUs = oq.pendingEDUs[:0] oq.pendingEDUs = oq.pendingEDUs[:0]
@ -234,6 +231,10 @@ func (oq *destinationQueue) backgroundSend() {
} }
} }
// If everything was fine at this point then we can update
// the counters for the transaction IDs.
oq.success()
// Wait either for a few seconds, or until a new event is // Wait either for a few seconds, or until a new event is
// available. // available.
select { select {