diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index dd01d3e45..6a00abb4a 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -173,12 +173,11 @@ func (oq *destinationQueue) backgroundSend() { // e.g. in response to EDUs. transactionID := gomatrixserverlib.TransactionID("") - // If we haven't reached the PDU limit yet then rehydrate the - // PDU queue from the database. We'll retrieve the events in the - // next transaction. Note that this does *not* necessarily mean - // we will fill the buffer - the important thing here is that we - // still continue to send transactions in order. - if len(oq.pendingPDUs) < maxPDUsPerTransaction { + // Retrieve the events in the next transaction. Note that this + // does *not* necessarily mean we will fill the buffer - the + // important thing here is that we still continue to send + // transactions in order. + hydrate := func() { txid, pdus, err := oq.db.GetNextTransactionPDUs( context.TODO(), // context oq.destination, // server name @@ -186,18 +185,25 @@ func (oq *destinationQueue) backgroundSend() { ) if err != nil { log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination) - continue + return } transactionID = txid oq.pendingPDUs = append(oq.pendingPDUs, pdus...) } + // If we haven't reached the PDU limit yet then rehydrate the + // PDU queue from the database. + if len(oq.pendingPDUs) < maxPDUsPerTransaction { + hydrate() + } + // Wait either for incoming events, or until we hit an // idle timeout. if len(oq.pendingPDUs) == 0 { select { case <-oq.incomingPDUs: // There are new PDUs waiting in the database. + hydrate() case edu := <-oq.incomingEDUs: // Likewise for EDUs, although we should probably not try // too hard with some EDUs (like typing notifications) after