mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 13:53:09 -06:00
Merge branch 'kegan/fedsender-logging' into kegan/redactions-4
This commit is contained in:
commit
ca64812ef9
|
|
@ -65,6 +65,7 @@ type destinationQueue struct {
|
||||||
func (oq *destinationQueue) sendEvent(nid int64) {
|
func (oq *destinationQueue) sendEvent(nid int64) {
|
||||||
if oq.statistics.Blacklisted() {
|
if oq.statistics.Blacklisted() {
|
||||||
// If the destination is blacklisted then drop the event.
|
// If the destination is blacklisted then drop the event.
|
||||||
|
log.Infof("%s is blacklisted; dropping event", oq.destination)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
oq.wakeQueueIfNeeded()
|
oq.wakeQueueIfNeeded()
|
||||||
|
|
@ -214,6 +215,7 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
// backoff duration to complete first, or until explicitly
|
// backoff duration to complete first, or until explicitly
|
||||||
// told to retry.
|
// told to retry.
|
||||||
if backoff, duration := oq.statistics.BackoffDuration(); backoff {
|
if backoff, duration := oq.statistics.BackoffDuration(); backoff {
|
||||||
|
log.WithField("duration", duration).Infof("Backing off %s", oq.destination)
|
||||||
oq.backingOff.Store(true)
|
oq.backingOff.Store(true)
|
||||||
select {
|
select {
|
||||||
case <-time.After(duration):
|
case <-time.After(duration):
|
||||||
|
|
@ -223,7 +225,7 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we have pending PDUs or EDUs then construct a transaction.
|
// If we have pending PDUs or EDUs then construct a transaction.
|
||||||
if oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 {
|
for oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 {
|
||||||
// Try sending the next transaction and see what happens.
|
// Try sending the next transaction and see what happens.
|
||||||
transaction, terr := oq.nextTransaction(oq.pendingEDUs)
|
transaction, terr := oq.nextTransaction(oq.pendingEDUs)
|
||||||
if terr != nil {
|
if terr != nil {
|
||||||
|
|
@ -248,6 +250,8 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
oq.statistics.Success()
|
oq.statistics.Success()
|
||||||
// Clean up the in-memory buffers.
|
// Clean up the in-memory buffers.
|
||||||
oq.cleanPendingEDUs()
|
oq.cleanPendingEDUs()
|
||||||
|
} else {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -336,6 +340,7 @@ func (oq *destinationQueue) nextTransaction(
|
||||||
// If we didn't get anything from the database and there are no
|
// If we didn't get anything from the database and there are no
|
||||||
// pending EDUs then there's nothing to do - stop here.
|
// pending EDUs then there's nothing to do - stop here.
|
||||||
if len(pdus) == 0 && len(pendingEDUs) == 0 {
|
if len(pdus) == 0 && len(pendingEDUs) == 0 {
|
||||||
|
log.Warnf("no pdus/edus for nextTransaction for destination %q", oq.destination)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue