Bug-fixing

This commit is contained in:
Neil Alexander 2020-12-07 12:08:23 +00:00
parent 7207ad53fe
commit 880e151439
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -140,7 +140,6 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
if !oq.statistics.Blacklisted() { if !oq.statistics.Blacklisted() {
// Wake up the queue if it's asleep. // Wake up the queue if it's asleep.
oq.wakeQueueIfNeeded() oq.wakeQueueIfNeeded()
// Queue the EDU.
select { select {
case oq.notify <- struct{}{}: case oq.notify <- struct{}{}:
default: default:
@ -185,7 +184,7 @@ func (oq *destinationQueue) getPendingFromDatabase() {
logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination) logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination)
} }
} }
if eduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); eduCapacity > 0 { if eduCapacity := maxEDUsInMemory - len(oq.pendingEDUs); eduCapacity > 0 {
// We have room in memory for some EDUs - let's request no more than that. // We have room in memory for some EDUs - let's request no more than that.
if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil { if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil {
for receipt, edu := range edus { for receipt, edu := range edus {
@ -201,8 +200,12 @@ func (oq *destinationQueue) getPendingFromDatabase() {
if len(oq.pendingPDUs) < maxPDUsInMemory && len(oq.pendingEDUs) < maxEDUsInMemory { if len(oq.pendingPDUs) < maxPDUsInMemory && len(oq.pendingEDUs) < maxEDUsInMemory {
oq.overflowed.Store(false) oq.overflowed.Store(false)
} }
// If we've retrieved some events then notify the destination queue goroutine.
if retrieved { if retrieved {
oq.notify <- struct{}{} select {
case oq.notify <- struct{}{}:
default:
}
} }
} }
@ -216,6 +219,10 @@ func (oq *destinationQueue) backgroundSend() {
} }
defer oq.running.Store(false) defer oq.running.Store(false)
// Mark the queue as overflowed, so we will consult the database
// to see if there's anything new to send.
oq.overflowed.Store(true)
for { for {
// If we are overflowing memory and have sent things out to the // If we are overflowing memory and have sent things out to the
// database then we can look up what those things are. // database then we can look up what those things are.
@ -364,7 +371,7 @@ func (oq *destinationQueue) nextTransaction(
continue continue
} }
t.EDUs = append(t.EDUs, *edu.edu) t.EDUs = append(t.EDUs, *edu.edu)
eduReceipts = append(pduReceipts, edu.receipt) eduReceipts = append(eduReceipts, edu.receipt)
} }
logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))