Rehydrate more opportunistically

This commit is contained in:
Neil Alexander 2020-06-30 15:43:15 +01:00
parent b4bbefd523
commit c8ee57ba07

View file

@ -173,12 +173,11 @@ func (oq *destinationQueue) backgroundSend() {
// e.g. in response to EDUs.
transactionID := gomatrixserverlib.TransactionID("")
// If we haven't reached the PDU limit yet then rehydrate the
// PDU queue from the database. We'll retrieve the events in the
// next transaction. Note that this does *not* necessarily mean
// we will fill the buffer - the important thing here is that we
// still continue to send transactions in order.
if len(oq.pendingPDUs) < maxPDUsPerTransaction {
// Retrieve the events in the next transaction. Note that this
// does *not* necessarily mean we will fill the buffer - the
// important thing here is that we still continue to send
// transactions in order.
hydrate := func() {
txid, pdus, err := oq.db.GetNextTransactionPDUs(
context.TODO(), // context
oq.destination, // server name
@ -186,18 +185,25 @@ func (oq *destinationQueue) backgroundSend() {
)
if err != nil {
log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination)
continue
return
}
transactionID = txid
oq.pendingPDUs = append(oq.pendingPDUs, pdus...)
}
// If we haven't reached the PDU limit yet then rehydrate the
// PDU queue from the database.
if len(oq.pendingPDUs) < maxPDUsPerTransaction {
hydrate()
}
// Wait either for incoming events, or until we hit an
// idle timeout.
if len(oq.pendingPDUs) == 0 {
select {
case <-oq.incomingPDUs:
// There are new PDUs waiting in the database.
hydrate()
case edu := <-oq.incomingEDUs:
// Likewise for EDUs, although we should probably not try
// too hard with some EDUs (like typing notifications) after