From 4cc1eaac5ae7fb95273fc2cd997342e9f4aae6a5 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 1 Jul 2020 10:41:25 +0100 Subject: [PATCH] Don't stop when there is work to be done --- federationsender/queue/destinationqueue.go | 84 +++++++++++----------- federationsender/queue/queue.go | 2 +- 2 files changed, 44 insertions(+), 42 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 5f1c4ddec..31d79dbc6 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -46,7 +46,6 @@ type destinationQueue struct { running atomic.Bool // is the queue worker running? backingOff atomic.Bool // true if we're backing off statistics *types.ServerStatistics // statistics about this remote server - incomingPDUs chan struct{} // signal that there are PDUs waiting incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send transactionID gomatrixserverlib.TransactionID // last transaction ID @@ -54,6 +53,7 @@ type destinationQueue struct { pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend + wakeServerCh chan bool // interrupts idle wait retryServerCh chan bool // interrupts backoff } @@ -120,7 +120,7 @@ func (oq *destinationQueue) sendEvent(nid int64) { // Signal that we've sent a new PDU. This will cause the queue to // wake up if it's asleep. oq.pendingPDUs.Add(1) - oq.incomingPDUs <- struct{}{} + oq.wakeServerCh <- true } // sendEDU adds the EDU event to the pending queue for the destination. @@ -168,46 +168,48 @@ func (oq *destinationQueue) backgroundSend() { // e.g. in response to EDUs. transactionID := gomatrixserverlib.TransactionID("") - // Wait either for incoming events, or until we hit an - // idle timeout. - select { - case <-oq.incomingPDUs: - // We were woken up because there are new PDUs waiting in the - // database. - case edu := <-oq.incomingEDUs: - // EDUs are handled in-memory for now. We will try to keep - // the ordering intact. - // TODO: Certain EDU types need persistence, e.g. send-to-device - oq.pendingEDUs = append(oq.pendingEDUs, edu) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingEDUs) > 0 { - oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) + // If we have nothing to do then wait either for incoming events, or + // until we hit an idle timeout. + if oq.pendingPDUs.Load() == 0 { + select { + case <-oq.wakeServerCh: + // We were woken up because there are new PDUs waiting in the + // database. + case edu := <-oq.incomingEDUs: + // EDUs are handled in-memory for now. We will try to keep + // the ordering intact. + // TODO: Certain EDU types need persistence, e.g. send-to-device + oq.pendingEDUs = append(oq.pendingEDUs, edu) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingEDUs) > 0 { + oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) + } + case invite := <-oq.incomingInvites: + // There's no strict ordering requirement for invites like + // there is for transactions, so we put the invite onto the + // front of the queue. This means that if an invite that is + // stuck failing already, that it won't block our new invite + // from being sent. + oq.pendingInvites = append( + []*gomatrixserverlib.InviteV2Request{invite}, + oq.pendingInvites..., + ) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingInvites) > 0 { + oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) + } + case <-time.After(time.Second * 30): + // The worker is idle so stop the goroutine. It'll get + // restarted automatically the next time we have an event to + // send. + return } - case invite := <-oq.incomingInvites: - // There's no strict ordering requirement for invites like - // there is for transactions, so we put the invite onto the - // front of the queue. This means that if an invite that is - // stuck failing already, that it won't block our new invite - // from being sent. - oq.pendingInvites = append( - []*gomatrixserverlib.InviteV2Request{invite}, - oq.pendingInvites..., - ) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingInvites) > 0 { - oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) - } - case <-time.After(time.Second * 30): - // The worker is idle so stop the goroutine. It'll get - // restarted automatically the next time we have an event to - // send. - return } // If we are backing off this server then wait for the diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 2cf78e022..492d5f553 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -88,9 +88,9 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d destination: destination, client: oqs.client, statistics: oqs.statistics.ForServer(destination), - incomingPDUs: make(chan struct{}, 128), incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), + wakeServerCh: make(chan bool, 128), retryServerCh: make(chan bool), signing: oqs.signing, }