Comments
This commit is contained in:
parent
e0fde0c24e
commit
7faef15339
|
@ -60,8 +60,8 @@ type destinationQueue struct {
|
||||||
transactionCount atomic.Int32 // how many events in this transaction so far
|
transactionCount atomic.Int32 // how many events in this transaction so far
|
||||||
notifyPDUs chan *queuedPDU // interrupts idle wait for PDUs
|
notifyPDUs chan *queuedPDU // interrupts idle wait for PDUs
|
||||||
notifyEDUs chan *queuedEDU // interrupts idle wait for EDUs
|
notifyEDUs chan *queuedEDU // interrupts idle wait for EDUs
|
||||||
pendingPDUs []*queuedPDU // owned by backgroundSender goroutine
|
pendingPDUs []*queuedPDU // owned by backgroundSender goroutine once started
|
||||||
pendingEDUs []*queuedEDU // owned by backgroundSender goroutine
|
pendingEDUs []*queuedEDU // owned by backgroundSender goroutine once started
|
||||||
interruptBackoff chan bool // interrupts backoff
|
interruptBackoff chan bool // interrupts backoff
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,6 +171,7 @@ func (oq *destinationQueue) getPendingFromDatabase() {
|
||||||
// in the database.
|
// in the database.
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 {
|
if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 {
|
||||||
|
// We have room in memory for some PDUs - let's request no more than that.
|
||||||
if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil {
|
if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil {
|
||||||
for receipt, pdu := range pdus {
|
for receipt, pdu := range pdus {
|
||||||
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu})
|
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu})
|
||||||
|
@ -180,6 +181,7 @@ func (oq *destinationQueue) getPendingFromDatabase() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if eduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); eduCapacity > 0 {
|
if eduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); eduCapacity > 0 {
|
||||||
|
// We have room in memory for some EDUs - let's request no more than that.
|
||||||
if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil {
|
if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil {
|
||||||
for receipt, edu := range edus {
|
for receipt, edu := range edus {
|
||||||
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu})
|
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu})
|
||||||
|
@ -188,6 +190,11 @@ func (oq *destinationQueue) getPendingFromDatabase() {
|
||||||
logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination)
|
logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// If we've retrieved all of the events from the database with room to spare
|
||||||
|
// in memory then we'll no longer consider this queue to be overflowed.
|
||||||
|
if len(oq.pendingPDUs) < maxPDUsInMemory && len(oq.pendingEDUs) < maxEDUsInMemory {
|
||||||
|
oq.overflowed.Store(false)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// backgroundSend is the worker goroutine for sending events.
|
// backgroundSend is the worker goroutine for sending events.
|
||||||
|
|
|
@ -84,8 +84,7 @@ func NewOutgoingQueues(
|
||||||
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
|
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
|
||||||
}
|
}
|
||||||
for serverName := range serverNames {
|
for serverName := range serverNames {
|
||||||
queue := queues.getQueue(serverName)
|
if queue := queues.getQueue(serverName); !queue.statistics.Blacklisted() {
|
||||||
if !queue.statistics.Blacklisted() {
|
|
||||||
queue.wakeQueueIfNeeded()
|
queue.wakeQueueIfNeeded()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue