From 57841fc35ebb6f591903e20694dad3548bf71ce8 Mon Sep 17 00:00:00 2001
From: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Wed, 27 May 2020 12:16:53 +0100
Subject: [PATCH] Read batches from incoming channels (#1067)

---
 federationsender/queue/destinationqueue.go | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index 45faa287c..09dac464f 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -110,12 +110,26 @@ func (oq *destinationQueue) backgroundSend() {
 			// of the queue and they will all be added to transactions
 			// in order.
 			oq.pendingPDUs = append(oq.pendingPDUs, pdu)
+			// If there are any more things waiting in the channel queue
+			// then read them. This is safe because we guarantee only
+			// having one goroutine per destination queue, so the channel
+			// isn't being consumed anywhere else.
+			for len(oq.incomingPDUs) > 0 {
+				oq.pendingPDUs = append(oq.pendingPDUs, <-oq.incomingPDUs)
+			}
 		case edu := <-oq.incomingEDUs:
 			// Likewise for EDUs, although we should probably not try
 			// too hard with some EDUs (like typing notifications) after
 			// a certain amount of time has passed.
 			// TODO: think about EDU expiry some more
 			oq.pendingEDUs = append(oq.pendingEDUs, edu)
+			// If there are any more things waiting in the channel queue
+			// then read them. This is safe because we guarantee only
+			// having one goroutine per destination queue, so the channel
+			// isn't being consumed anywhere else.
+			for len(oq.incomingEDUs) > 0 {
+				oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
+			}
 		case invite := <-oq.incomingInvites:
 			// There's no strict ordering requirement for invites like
 			// there is for transactions, so we put the invite onto the
@@ -126,6 +140,13 @@ func (oq *destinationQueue) backgroundSend() {
 				[]*gomatrixserverlib.InviteV2Request{invite},
 				oq.pendingInvites...,
 			)
+			// If there are any more things waiting in the channel queue
+			// then read them. This is safe because we guarantee only
+			// having one goroutine per destination queue, so the channel
+			// isn't being consumed anywhere else.
+			for len(oq.incomingInvites) > 0 {
+				oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
+			}
 		case <-time.After(time.Second * 30):
 			// The worker is idle so stop the goroutine. It'll
 			// get restarted automatically the next time we