From 51dba37d7a7b52000c99341e0f67d789a20b77ff Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 30 Jun 2020 14:32:05 +0100 Subject: [PATCH] Don't block if there are PDUs waiting --- federationsender/queue/destinationqueue.go | 76 +++++++++++----------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 35c59edff..3ab939c68 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -184,44 +184,46 @@ func (oq *destinationQueue) backgroundSend() { // Wait either for incoming events, or until we hit an // idle timeout. - select { - case <-oq.incomingPDUs: - // There are new PDUs waiting in the database. - case edu := <-oq.incomingEDUs: - // Likewise for EDUs, although we should probably not try - // too hard with some EDUs (like typing notifications) after - // a certain amount of time has passed. - // TODO: think about EDU expiry some more - oq.pendingEDUs = append(oq.pendingEDUs, edu) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingEDUs) > 0 { - oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) + if len(oq.pendingPDUs) == 0 { + select { + case <-oq.incomingPDUs: + // There are new PDUs waiting in the database. + case edu := <-oq.incomingEDUs: + // Likewise for EDUs, although we should probably not try + // too hard with some EDUs (like typing notifications) after + // a certain amount of time has passed. + // TODO: think about EDU expiry some more + oq.pendingEDUs = append(oq.pendingEDUs, edu) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingEDUs) > 0 { + oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) + } + case invite := <-oq.incomingInvites: + // There's no strict ordering requirement for invites like + // there is for transactions, so we put the invite onto the + // front of the queue. This means that if an invite that is + // stuck failing already, that it won't block our new invite + // from being sent. + oq.pendingInvites = append( + []*gomatrixserverlib.InviteV2Request{invite}, + oq.pendingInvites..., + ) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingInvites) > 0 { + oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) + } + case <-time.After(time.Second * 30): + // The worker is idle so stop the goroutine. It'll + // get restarted automatically the next time we + // get an event. + return } - case invite := <-oq.incomingInvites: - // There's no strict ordering requirement for invites like - // there is for transactions, so we put the invite onto the - // front of the queue. This means that if an invite that is - // stuck failing already, that it won't block our new invite - // from being sent. - oq.pendingInvites = append( - []*gomatrixserverlib.InviteV2Request{invite}, - oq.pendingInvites..., - ) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingInvites) > 0 { - oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) - } - case <-time.After(time.Second * 30): - // The worker is idle so stop the goroutine. It'll - // get restarted automatically the next time we - // get an event. - return } // If we are backing off this server then wait for the