From 0214ce3f4f4b738a2f2b6a4dfbc40de360c5f31c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 2 Jul 2020 16:19:48 +0100 Subject: [PATCH] Don't spin --- federationsender/queue/destinationqueue.go | 83 +++++++++++----------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index a6e41b8d4..ce706768e 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -164,6 +164,9 @@ func (oq *destinationQueue) wakeQueueIfNeeded() { } else { log.WithError(err).Errorf("Can't get pending PDU count for %q destination queue", oq.destination) } + if count > 0 { + oq.wakeServerCh <- true + } // Then start the queue. go oq.backgroundSend() } @@ -182,46 +185,44 @@ func (oq *destinationQueue) backgroundSend() { for { // If we have nothing to do then wait either for incoming events, or // until we hit an idle timeout. - if oq.pendingPDUs.Load() == 0 && len(oq.pendingEDUs) == 0 && len(oq.pendingInvites) == 0 { - select { - case <-oq.wakeServerCh: - // We were woken up because there are new PDUs waiting in the - // database. - case edu := <-oq.incomingEDUs: - // EDUs are handled in-memory for now. We will try to keep - // the ordering intact. - // TODO: Certain EDU types need persistence, e.g. send-to-device - oq.pendingEDUs = append(oq.pendingEDUs, edu) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingEDUs) > 0 { - oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) - } - case invite := <-oq.incomingInvites: - // There's no strict ordering requirement for invites like - // there is for transactions, so we put the invite onto the - // front of the queue. This means that if an invite that is - // stuck failing already, that it won't block our new invite - // from being sent. - oq.pendingInvites = append( - []*gomatrixserverlib.InviteV2Request{invite}, - oq.pendingInvites..., - ) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingInvites) > 0 { - oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) - } - case <-time.After(time.Second * 30): - // The worker is idle so stop the goroutine. It'll get - // restarted automatically the next time we have an event to - // send. - return + select { + case <-oq.wakeServerCh: + // We were woken up because there are new PDUs waiting in the + // database. + case edu := <-oq.incomingEDUs: + // EDUs are handled in-memory for now. We will try to keep + // the ordering intact. + // TODO: Certain EDU types need persistence, e.g. send-to-device + oq.pendingEDUs = append(oq.pendingEDUs, edu) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingEDUs) > 0 { + oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) } + case invite := <-oq.incomingInvites: + // There's no strict ordering requirement for invites like + // there is for transactions, so we put the invite onto the + // front of the queue. This means that if an invite that is + // stuck failing already, that it won't block our new invite + // from being sent. + oq.pendingInvites = append( + []*gomatrixserverlib.InviteV2Request{invite}, + oq.pendingInvites..., + ) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingInvites) > 0 { + oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) + } + case <-time.After(time.Second * 30): + // The worker is idle so stop the goroutine. It'll get + // restarted automatically the next time we have an event to + // send. + return } // If we are backing off this server then wait for the @@ -329,8 +330,10 @@ func (oq *destinationQueue) nextTransaction( // Ask the database for any pending PDUs from the next transaction. // maxPDUsPerTransaction is an upper limit but we probably won't // actually retrieve that many events. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() txid, pdus, err := oq.db.GetNextTransactionPDUs( - context.TODO(), // context + ctx, // context oq.destination, // server name maxPDUsPerTransaction, // max events to retrieve )