From e4f828101d877cb2188b83d2355cf67837a7c24f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 May 2020 15:43:38 +0100 Subject: [PATCH] Remodel a bit with channels --- federationsender/queue/destinationqueue.go | 123 ++++++++------------- federationsender/queue/queue.go | 76 ++++--------- 2 files changed, 73 insertions(+), 126 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 76357e83b..7ab6f1f16 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "sync" "time" "github.com/matrix-org/dendrite/federationsender/producers" @@ -34,32 +33,19 @@ import ( // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { - rsProducer *producers.RoomserverProducer // roomserver producer - client *gomatrixserverlib.FederationClient // federation client - origin gomatrixserverlib.ServerName // origin of requests - destination gomatrixserverlib.ServerName // destination of requests - running atomic.Bool // is the queue worker running? - wakeup chan bool // wakes up a sleeping worker - statistics *types.ServerStatistics // statistics about this remote server - idleCounter atomic.Uint32 // how many ticks have we done nothing? - runningMutex sync.RWMutex // protects the below - lastTransactionIDs []gomatrixserverlib.TransactionID // protected by runningMutex - pendingPDUs []*gomatrixserverlib.HeaderedEvent // protected by runningMutex - pendingEDUs []*gomatrixserverlib.EDU // protected by runningMutex - pendingInvites []*gomatrixserverlib.InviteV2Request // protected by runningMutex -} - -// Start the destination queue if it needs to be started, or -// otherwise signal to it that it should wake up from sleep. -func (oq *destinationQueue) wake() { - if !oq.running.Load() { - go oq.backgroundSend() - } else { - select { - case oq.wakeup <- true: - default: - } - } + rsProducer *producers.RoomserverProducer // roomserver producer + client *gomatrixserverlib.FederationClient // federation client + origin gomatrixserverlib.ServerName // origin of requests + destination gomatrixserverlib.ServerName // destination of requests + running atomic.Bool // is the queue worker running? + statistics *types.ServerStatistics // statistics about this remote server + incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send + incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send + incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send + lastTransactionIDs []gomatrixserverlib.TransactionID // last transaction ID + pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend + pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend + pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend } // Send event adds the event to the pending queue for the destination. @@ -70,10 +56,10 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { // If the destination is blacklisted then drop the event. return } - oq.runningMutex.Lock() - oq.pendingPDUs = append(oq.pendingPDUs, ev) - oq.runningMutex.Unlock() - oq.wake() + if !oq.running.Load() { + go oq.backgroundSend() + } + oq.incomingPDUs <- ev } // sendEDU adds the EDU event to the pending queue for the destination. @@ -84,10 +70,10 @@ func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) { // If the destination is blacklisted then drop the event. return } - oq.runningMutex.Lock() - oq.pendingEDUs = append(oq.pendingEDUs, ev) - oq.runningMutex.Unlock() - oq.wake() + if !oq.running.Load() { + go oq.backgroundSend() + } + oq.incomingEDUs <- ev } // sendInvite adds the invite event to the pending queue for the @@ -98,51 +84,51 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { // If the destination is blacklisted then drop the event. return } - oq.runningMutex.Lock() - oq.pendingInvites = append(oq.pendingInvites, ev) - oq.runningMutex.Unlock() - oq.wake() + if !oq.running.Load() { + go oq.backgroundSend() + } + oq.incomingInvites <- ev } // backgroundSend is the worker goroutine for sending events. // nolint:gocyclo func (oq *destinationQueue) backgroundSend() { // Mark the worker as running for its lifetime. - oq.wakeup = make(chan bool) oq.running.Store(true) defer oq.running.Store(false) - defer close(oq.wakeup) for { + // Wait either for incoming events, or until we hit an + // idle timeout. + select { + case pdu := <-oq.incomingPDUs: + oq.pendingPDUs = append(oq.pendingPDUs, pdu) + case edu := <-oq.incomingEDUs: + oq.pendingEDUs = append(oq.pendingEDUs, edu) + case invite := <-oq.incomingInvites: + oq.pendingInvites = append(oq.pendingInvites, invite) + case <-time.After(time.Second * 30): + // The worker is idle so stop the goroutine. It'll + // get restarted automatically the next time we + // get an event. + return + } + // If we are backing off this server then wait for the // backoff duration to complete first. if backoff, duration := oq.statistics.BackoffDuration(); backoff { <-time.After(duration) } - // Retrieve any waiting things. - oq.runningMutex.RLock() - pendingPDUs, numPDUs := oq.pendingPDUs, len(oq.pendingPDUs) - pendingEDUs, numEDUs := oq.pendingEDUs, len(oq.pendingEDUs) - pendingInvites := oq.pendingInvites - idleCounter, sentCounter := oq.idleCounter.Load(), oq.statistics.SuccessCount() - oq.runningMutex.RUnlock() - - // If this worker has been idle for a while then stop - // running it, otherwise the goroutine will just tick - // endlessly. It'll get automatically restarted when - // a new event needs to be sent. - if idleCounter >= 5 { - return - } - if len(pendingInvites) == 0 && len(pendingPDUs) == 0 && len(pendingEDUs) == 0 { - oq.idleCounter.Add(1) - } + // How many things do we have waiting? + numPDUs := len(oq.pendingPDUs) + numEDUs := len(oq.pendingEDUs) + numInvites := len(oq.pendingInvites) // If we have pending PDUs or EDUs then construct a transaction. - if len(pendingPDUs) > 0 || len(pendingEDUs) > 0 { + if numPDUs > 0 || numEDUs > 0 { // Try sending the next transaction and see what happens. - transaction, terr := oq.nextTransaction(pendingPDUs, pendingEDUs, sentCounter) + transaction, terr := oq.nextTransaction(oq.pendingPDUs, oq.pendingEDUs, oq.statistics.SuccessCount()) if terr != nil { // We failed to send the transaction. if giveUp := oq.statistics.Failure(); giveUp { @@ -154,7 +140,6 @@ func (oq *destinationQueue) backgroundSend() { // If we successfully sent the transaction then clear out // the pending events and EDUs. oq.statistics.Success() - oq.runningMutex.Lock() // Reallocate so that the underlying arrays can be GC'd, as // opposed to growing forever. for i := 0; i < numPDUs; i++ { @@ -171,13 +156,12 @@ func (oq *destinationQueue) backgroundSend() { []*gomatrixserverlib.EDU{}, oq.pendingEDUs[numEDUs:]..., ) - oq.runningMutex.Unlock() } } // Try sending the next invite and see what happens. - if len(pendingInvites) > 0 { - sent, ierr := oq.nextInvites(pendingInvites) + if numInvites > 0 { + sent, ierr := oq.nextInvites(oq.pendingInvites) if ierr != nil { // We failed to send the transaction so increase the // backoff and give it another go shortly. @@ -190,23 +174,14 @@ func (oq *destinationQueue) backgroundSend() { // If we successfully sent the invites then clear out // the pending invites. oq.statistics.Success() - oq.runningMutex.Lock() // Reallocate so that the underlying array can be GC'd, as // opposed to growing forever. oq.pendingInvites = append( []*gomatrixserverlib.InviteV2Request{}, oq.pendingInvites[sent:]..., ) - oq.runningMutex.Unlock() } } - - // Wait either for a few seconds, or until a new event is - // available. - select { - case <-oq.wakeup: - case <-time.After(time.Second * 5): - } } } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 1c1dc4485..35b0bb7f0 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -32,7 +32,7 @@ type OutgoingQueues struct { origin gomatrixserverlib.ServerName client *gomatrixserverlib.FederationClient statistics *types.Statistics - queuesMutex sync.RWMutex // protects the below + queuesMutex sync.Mutex // protects the below queues map[gomatrixserverlib.ServerName]*destinationQueue } @@ -52,6 +52,26 @@ func NewOutgoingQueues( } } +func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue { + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + oq := oqs.queues[destination] + if oq == nil { + oq = &destinationQueue{ + rsProducer: oqs.rsProducer, + origin: oqs.origin, + destination: destination, + client: oqs.client, + statistics: oqs.statistics.ForServer(destination), + incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128), + incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), + incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), + } + oqs.queues[destination] = oq + } + return oq +} + // SendEvent sends an event to the destinations func (oqs *OutgoingQueues) SendEvent( ev *gomatrixserverlib.HeaderedEvent, origin gomatrixserverlib.ServerName, @@ -73,23 +93,7 @@ func (oqs *OutgoingQueues) SendEvent( }).Info("Sending event") for _, destination := range destinations { - oqs.queuesMutex.RLock() - oq := oqs.queues[destination] - oqs.queuesMutex.RUnlock() - if oq == nil { - oq = &destinationQueue{ - rsProducer: oqs.rsProducer, - origin: oqs.origin, - destination: destination, - client: oqs.client, - statistics: oqs.statistics.ForServer(destination), - } - oqs.queuesMutex.Lock() - oqs.queues[destination] = oq - oqs.queuesMutex.Unlock() - } - - go oq.sendEvent(ev) + oqs.getQueue(destination).sendEvent(ev) } return nil @@ -122,23 +126,7 @@ func (oqs *OutgoingQueues) SendInvite( "server_name": destination, }).Info("Sending invite") - oqs.queuesMutex.RLock() - oq := oqs.queues[destination] - oqs.queuesMutex.RUnlock() - if oq == nil { - oq = &destinationQueue{ - rsProducer: oqs.rsProducer, - origin: oqs.origin, - destination: destination, - client: oqs.client, - statistics: oqs.statistics.ForServer(destination), - } - oqs.queuesMutex.Lock() - oqs.queues[destination] = oq - oqs.queuesMutex.Unlock() - } - - go oq.sendInvite(inviteReq) + oqs.getQueue(destination).sendInvite(inviteReq) return nil } @@ -166,23 +154,7 @@ func (oqs *OutgoingQueues) SendEDU( } for _, destination := range destinations { - oqs.queuesMutex.RLock() - oq := oqs.queues[destination] - oqs.queuesMutex.RUnlock() - if oq == nil { - oq = &destinationQueue{ - rsProducer: oqs.rsProducer, - origin: oqs.origin, - destination: destination, - client: oqs.client, - statistics: oqs.statistics.ForServer(destination), - } - oqs.queuesMutex.Lock() - oqs.queues[destination] = oq - oqs.queuesMutex.Unlock() - } - - go oq.sendEDU(e) + oqs.getQueue(destination).sendEDU(e) } return nil