diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 89526fcfd..07d84dd7c 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "math" "sync" "time" @@ -28,33 +29,91 @@ import ( "go.uber.org/atomic" ) +const ( + // How many times should we tolerate consecutive failures before we + // just blacklist the host altogether? Bear in mind that the backoff + // is exponential, so the max time here to attempt is 2**failures. + FailuresUntilBlacklist = 16 +) + // destinationQueue is a queue of events for a single destination. // It is responsible for sending the events to the destination and // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { - rsProducer *producers.RoomserverProducer - client *gomatrixserverlib.FederationClient - origin gomatrixserverlib.ServerName - destination gomatrixserverlib.ServerName - running atomic.Bool - // The running mutex protects sentCounter, lastTransactionIDs and - // pendingEvents, pendingEDUs. - runningMutex sync.Mutex - sentCounter int - lastTransactionIDs []gomatrixserverlib.TransactionID - pendingEvents []*gomatrixserverlib.HeaderedEvent - pendingEDUs []*gomatrixserverlib.EDU - pendingInvites []*gomatrixserverlib.InviteV2Request + rsProducer *producers.RoomserverProducer // + client *gomatrixserverlib.FederationClient // + origin gomatrixserverlib.ServerName // + destination gomatrixserverlib.ServerName // + running atomic.Bool // is the queue worke running? + blacklisted atomic.Bool // is the remote side dead? + backoffUntil atomic.Value // time.Time + idleCounter atomic.Uint32 // how many ticks have we done nothing? + failCounter atomic.Uint32 // how many times have we failed? + sentCounter atomic.Uint32 // how many times have we succeeded? + runningMutex sync.RWMutex // protects the below + lastTransactionIDs []gomatrixserverlib.TransactionID // protected by runningMutex + pendingPDUs []*gomatrixserverlib.HeaderedEvent // protected by runningMutex + pendingEDUs []*gomatrixserverlib.EDU // protected by runningMutex + pendingInvites []*gomatrixserverlib.InviteV2Request // protected by runningMutex +} + +// Backoff marks a failure and works out when to back off until. It +// returns true if the worker should give up altogether because of +// too many consecutive failures. +func (oq *destinationQueue) backoff() bool { + // Increase the fail counter. + failCounter := oq.failCounter.Load() + failCounter++ + oq.failCounter.Store(failCounter) + + // Check that we haven't failed more times than is acceptable. + if failCounter < FailuresUntilBlacklist { + // We're still under the threshold so work out the exponential + // backoff based on how many times we have failed already. The + // worker goroutine will wait until this time before processing + // anything from the queue. + backoffSeconds := math.Exp2(float64(failCounter)) + oq.backoffUntil.Store( + time.Now().Add(time.Second * time.Duration(backoffSeconds)), + ) + return false // Don't give up yet. + } else { + // We've exceeded the maximum amount of times we're willing + // to back off, which is probably in the region of hours by + // now. Just give up - clear the queues and reset the queue + // back to its default state. + oq.blacklisted.Store(true) + oq.runningMutex.Lock() + oq.pendingPDUs = nil + oq.pendingEDUs = nil + oq.pendingInvites = nil + oq.runningMutex.Unlock() + return true // Give up. + } +} + +func (oq *destinationQueue) success() { + // Reset the idle and fail counters. + oq.idleCounter.Store(0) + oq.failCounter.Store(0) + + // Increase the sent counter. + sentCounter := oq.failCounter.Load() + oq.sentCounter.Store(sentCounter + 1) } // Send event adds the event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to // start sending events to that destination. func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { + if oq.blacklisted.Load() { + // If the destination is blacklisted then drop the event. + return + } oq.runningMutex.Lock() - defer oq.runningMutex.Unlock() - oq.pendingEvents = append(oq.pendingEvents, ev) + oq.pendingPDUs = append(oq.pendingPDUs, ev) + oq.runningMutex.Unlock() if !oq.running.Load() { go oq.backgroundSend() } @@ -64,9 +123,13 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { // If the queue is empty then it starts a background goroutine to // start sending events to that destination. func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { + if oq.blacklisted.Load() { + // If the destination is blacklisted then drop the event. + return + } oq.runningMutex.Lock() - defer oq.runningMutex.Unlock() oq.pendingEDUs = append(oq.pendingEDUs, e) + oq.runningMutex.Unlock() if !oq.running.Load() { go oq.backgroundSend() } @@ -76,9 +139,13 @@ func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { // destination. If the queue is empty then it starts a background // goroutine to start sending events to that destination. func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { + if oq.blacklisted.Load() { + // If the destination is blacklisted then drop the event. + return + } oq.runningMutex.Lock() - defer oq.runningMutex.Unlock() oq.pendingInvites = append(oq.pendingInvites, ev) + oq.runningMutex.Unlock() if !oq.running.Load() { go oq.backgroundSend() } @@ -86,39 +153,104 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { // backgroundSend is the worker goroutine for sending events. func (oq *destinationQueue) backgroundSend() { + // Mark the worker as running for its lifetime. oq.running.Store(true) defer oq.running.Store(false) for { - transaction, invites := oq.nextTransaction(), oq.nextInvites() - if !transaction && !invites { - // If the queue is empty then stop processing for this destination. - // TODO: Remove this destination from the queue map. - return + // Wait for our backoff timer. + backoffUntil := time.Now() + if b, ok := oq.backoffUntil.Load().(time.Time); ok { + backoffUntil = b + } + if backoffUntil.After(time.Now()) { + <-time.After(time.Until(backoffUntil)) } - // TODO: handle retries. - // TODO: blacklist uncooperative servers. + // Retrieve any waiting things. + oq.runningMutex.RLock() + pendingPDUs, pendingEDUs := oq.pendingPDUs, oq.pendingEDUs + pendingInvites := oq.pendingInvites + idleCounter, sentCounter := oq.idleCounter.Load(), oq.sentCounter.Load() + oq.runningMutex.RUnlock() + + // If we have pending PDUs or EDUs then construct a transaction. + if len(pendingPDUs) > 0 || len(pendingEDUs) > 0 { + // Try sending the next transaction and see what happens. + transaction, terr := oq.nextTransaction(pendingPDUs, pendingEDUs, sentCounter) + if terr != nil { + // We failed to send the transaction. + if giveUp := oq.backoff(); giveUp { + // It's been suggested that we should give up because + // the backoff has exceeded a maximum allowable value. + return + } + continue + } + + // If we successfully sent the transaction then clear out + // the pending events and EDUs. + if transaction { + oq.success() + oq.runningMutex.Lock() + oq.pendingPDUs = oq.pendingPDUs[:0] + oq.pendingEDUs = oq.pendingEDUs[:0] + oq.runningMutex.Unlock() + } + } + + // Try sending the next invite and see what happens. + if len(pendingInvites) > 0 { + invites, ierr := oq.nextInvites(pendingInvites) + if ierr != nil { + // We failed to send the transaction so increase the + // backoff and give it another go shortly. + oq.backoffUntil.Store(time.Until(backoffUntil) * 2) + continue + } + + // If we successfully sent the invites then clear out + // the pending invites. + if invites { + oq.success() + oq.runningMutex.Lock() + oq.pendingInvites = oq.pendingInvites[:0] + oq.runningMutex.Unlock() + } + } + + // At this point, if we did everything successfully, + // we can reset the backoff duration. + if idleCounter >= 5 { + // If this worker has been idle for a while then stop + // running it, otherwise the goroutine will just tick + // endlessly. It'll get automatically restarted when + // a new event needs to be sent. + return + } else { + // Otherwise, add to the ticker counter and ask the + // next iteration to wait for a second (to stop CPU + // spinning). + oq.idleCounter.Store(idleCounter + 1) + oq.backoffUntil.Store(time.Now().Add(time.Second)) + } } } // nextTransaction creates a new transaction from the pending event // queue and sends it. Returns true if a transaction was sent or // false otherwise. -func (oq *destinationQueue) nextTransaction() bool { - oq.runningMutex.Lock() - defer oq.runningMutex.Unlock() - - if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 { - return false - } - +func (oq *destinationQueue) nextTransaction( + pendingPDUs []*gomatrixserverlib.HeaderedEvent, + pendingEDUs []*gomatrixserverlib.EDU, + sentCounter uint32, +) (bool, error) { t := gomatrixserverlib.Transaction{ PDUs: []json.RawMessage{}, EDUs: []gomatrixserverlib.EDU{}, } now := gomatrixserverlib.AsTimestamp(time.Now()) - t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter)) + t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, sentCounter)) t.Origin = oq.origin t.Destination = oq.destination t.OriginServerTS = now @@ -129,19 +261,15 @@ func (oq *destinationQueue) nextTransaction() bool { oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} - for _, pdu := range oq.pendingEvents { + for _, pdu := range pendingPDUs { // Append the JSON of the event, since this is a json.RawMessage type in the // gomatrixserverlib.Transaction struct t.PDUs = append(t.PDUs, (*pdu).JSON()) } - oq.pendingEvents = nil - oq.sentCounter += len(t.PDUs) - for _, edu := range oq.pendingEDUs { + for _, edu := range pendingEDUs { t.EDUs = append(t.EDUs, *edu) } - oq.pendingEDUs = nil - oq.sentCounter += len(t.EDUs) util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) @@ -151,22 +279,18 @@ func (oq *destinationQueue) nextTransaction() bool { "destination": oq.destination, log.ErrorKey: err, }).Info("problem sending transaction") + return false, err } - return true + return true, nil } // nextInvite takes pending invite events from the queue and sends // them. Returns true if a transaction was sent or false otherwise. -func (oq *destinationQueue) nextInvites() bool { - oq.runningMutex.Lock() - defer oq.runningMutex.Unlock() - - if len(oq.pendingInvites) == 0 { - return false - } - - for _, inviteReq := range oq.pendingInvites { +func (oq *destinationQueue) nextInvites( + pendingInvites []*gomatrixserverlib.InviteV2Request, +) (bool, error) { + for _, inviteReq := range pendingInvites { ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion() log.WithFields(log.Fields{ @@ -186,7 +310,7 @@ func (oq *destinationQueue) nextInvites() bool { "state_key": ev.StateKey(), "destination": oq.destination, }).WithError(err).Error("failed to send invite") - continue + return false, err } if _, err = oq.rsProducer.SendInviteResponse( @@ -199,10 +323,9 @@ func (oq *destinationQueue) nextInvites() bool { "state_key": ev.StateKey(), "destination": oq.destination, }).WithError(err).Error("failed to return signed invite to roomserver") + return false, err } } - oq.pendingInvites = nil - - return true + return true, nil } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 33abc8fdd..cdcd0da81 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -30,7 +30,7 @@ type OutgoingQueues struct { origin gomatrixserverlib.ServerName client *gomatrixserverlib.FederationClient // The queuesMutex protects queues - queuesMutex sync.Mutex + queuesMutex sync.RWMutex queues map[gomatrixserverlib.ServerName]*destinationQueue } @@ -68,10 +68,10 @@ func (oqs *OutgoingQueues) SendEvent( "destinations": destinations, "event": ev.EventID(), }).Info("Sending event") - oqs.queuesMutex.Lock() - defer oqs.queuesMutex.Unlock() for _, destination := range destinations { + oqs.queuesMutex.RLock() oq := oqs.queues[destination] + oqs.queuesMutex.RUnlock() if oq == nil { oq = &destinationQueue{ rsProducer: oqs.rsProducer, @@ -79,10 +79,12 @@ func (oqs *OutgoingQueues) SendEvent( destination: destination, client: oqs.client, } + oqs.queuesMutex.Lock() oqs.queues[destination] = oq + oqs.queuesMutex.Unlock() } - oq.sendEvent(ev) + go oq.sendEvent(ev) } return nil @@ -114,9 +116,9 @@ func (oqs *OutgoingQueues) SendInvite( "event_id": ev.EventID(), }).Info("Sending invite") - oqs.queuesMutex.Lock() - defer oqs.queuesMutex.Unlock() + oqs.queuesMutex.RLock() oq := oqs.queues[destination] + oqs.queuesMutex.RUnlock() if oq == nil { oq = &destinationQueue{ rsProducer: oqs.rsProducer, @@ -124,10 +126,12 @@ func (oqs *OutgoingQueues) SendInvite( destination: destination, client: oqs.client, } + oqs.queuesMutex.Lock() oqs.queues[destination] = oq + oqs.queuesMutex.Unlock() } - oq.sendInvite(inviteReq) + go oq.sendInvite(inviteReq) return nil } @@ -154,10 +158,10 @@ func (oqs *OutgoingQueues) SendEDU( }).Info("Sending EDU event") } - oqs.queuesMutex.Lock() - defer oqs.queuesMutex.Unlock() for _, destination := range destinations { + oqs.queuesMutex.RLock() oq := oqs.queues[destination] + oqs.queuesMutex.RUnlock() if oq == nil { oq = &destinationQueue{ rsProducer: oqs.rsProducer, @@ -165,10 +169,12 @@ func (oqs *OutgoingQueues) SendEDU( destination: destination, client: oqs.client, } + oqs.queuesMutex.Lock() oqs.queues[destination] = oq + oqs.queuesMutex.Unlock() } - oq.sendEDU(e) + go oq.sendEDU(e) } return nil