Try to get overflowed events from database
This commit is contained in:
parent
9700f0b31c
commit
f8825e0c12
|
@ -35,6 +35,8 @@ import (
|
|||
const (
|
||||
maxPDUsPerTransaction = 50
|
||||
maxEDUsPerTransaction = 50
|
||||
maxPDUsInMemory = 128
|
||||
maxEDUsInMemory = 128
|
||||
queueIdleTimeout = time.Second * 30
|
||||
)
|
||||
|
||||
|
@ -51,12 +53,15 @@ type destinationQueue struct {
|
|||
destination gomatrixserverlib.ServerName // destination of requests
|
||||
running atomic.Bool // is the queue worker running?
|
||||
backingOff atomic.Bool // true if we're backing off
|
||||
overflowed atomic.Bool // exceeded in-memory space?
|
||||
statistics *statistics.ServerStatistics // statistics about this remote server
|
||||
transactionIDMutex sync.Mutex // protects transactionID
|
||||
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
||||
transactionCount atomic.Int32 // how many events in this transaction so far
|
||||
notifyPDUs chan *queuedPDU // interrupts idle wait for PDUs
|
||||
notifyEDUs chan *queuedEDU // interrupts idle wait for EDUs
|
||||
pendingPDUs []*queuedPDU // owned by backgroundSender goroutine
|
||||
pendingEDUs []*queuedEDU // owned by backgroundSender goroutine
|
||||
interruptBackoff chan bool // interrupts backoff
|
||||
}
|
||||
|
||||
|
@ -155,33 +160,36 @@ func (oq *destinationQueue) wakeQueueIfNeeded() {
|
|||
if !oq.running.Load() {
|
||||
// Start the queue.
|
||||
go oq.backgroundSend()
|
||||
}
|
||||
}
|
||||
|
||||
// getPendingFromDatabase will look at the database and see if
|
||||
// there are any persisted events that haven't been sent to this
|
||||
// destination yet. If so, they will be queued up.
|
||||
func (oq *destinationQueue) getPendingFromDatabase() {
|
||||
// Check to see if there's anything to do for this server
|
||||
// in the database.
|
||||
ctx := context.Background()
|
||||
go func(ctx context.Context) {
|
||||
if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, maxPDUsPerTransaction); err == nil {
|
||||
if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 {
|
||||
logrus.Infof("Retrieving up to %d pending PDUs from the database for %q", pduCapacity, oq.destination)
|
||||
if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil {
|
||||
for receipt, pdu := range pdus {
|
||||
select {
|
||||
case oq.notifyPDUs <- &queuedPDU{receipt, pdu}:
|
||||
default:
|
||||
return
|
||||
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu})
|
||||
}
|
||||
} else {
|
||||
logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination)
|
||||
}
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
go func(ctx context.Context) {
|
||||
if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, maxEDUsPerTransaction); err == nil {
|
||||
if eduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); eduCapacity > 0 {
|
||||
logrus.Infof("Retrieving up to %d pending EDUs from the database for %q", eduCapacity, oq.destination)
|
||||
if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil {
|
||||
for receipt, edu := range edus {
|
||||
select {
|
||||
case oq.notifyEDUs <- &queuedEDU{receipt, edu}:
|
||||
default:
|
||||
return
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu})
|
||||
}
|
||||
} else {
|
||||
logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination)
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// backgroundSend is the worker goroutine for sending events.
|
||||
|
@ -194,22 +202,30 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
}
|
||||
defer oq.running.Store(false)
|
||||
|
||||
pendingPDUs := []*queuedPDU{}
|
||||
pendingEDUs := []*queuedEDU{}
|
||||
|
||||
for {
|
||||
// If we are overflowing memory and have sent things out to the
|
||||
// database then we can look up what those things are.
|
||||
if oq.overflowed.Load() {
|
||||
oq.getPendingFromDatabase()
|
||||
}
|
||||
|
||||
// If we have nothing to do then wait either for incoming events, or
|
||||
// until we hit an idle timeout.
|
||||
awaitSelect:
|
||||
select {
|
||||
case pdu := <-oq.notifyPDUs:
|
||||
// We were woken up because there are new PDUs waiting in the
|
||||
// database.
|
||||
pendingPDUs = append(pendingPDUs, pdu)
|
||||
if len(oq.pendingPDUs) > maxPDUsInMemory {
|
||||
oq.overflowed.Store(true)
|
||||
break awaitSelect
|
||||
}
|
||||
oq.pendingPDUs = append(oq.pendingPDUs, pdu)
|
||||
pendingPDULoop:
|
||||
for i := 1; i < maxPDUsPerTransaction; i++ {
|
||||
for i := 1; i < maxPDUsInMemory-len(oq.pendingPDUs); i++ {
|
||||
select {
|
||||
case edu := <-oq.notifyEDUs:
|
||||
pendingEDUs = append(pendingEDUs, edu)
|
||||
case pdu := <-oq.notifyPDUs:
|
||||
oq.pendingPDUs = append(oq.pendingPDUs, pdu)
|
||||
default:
|
||||
break pendingPDULoop
|
||||
}
|
||||
|
@ -218,12 +234,16 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
case edu := <-oq.notifyEDUs:
|
||||
// We were woken up because there are new PDUs waiting in the
|
||||
// database.
|
||||
pendingEDUs = append(pendingEDUs, edu)
|
||||
if len(oq.pendingEDUs) > maxEDUsInMemory {
|
||||
oq.overflowed.Store(true)
|
||||
break awaitSelect
|
||||
}
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
||||
pendingEDULoop:
|
||||
for i := 1; i < maxEDUsPerTransaction; i++ {
|
||||
for i := 1; i < maxEDUsInMemory-len(oq.pendingEDUs); i++ {
|
||||
select {
|
||||
case edu := <-oq.notifyEDUs:
|
||||
pendingEDUs = append(pendingEDUs, edu)
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
||||
default:
|
||||
break pendingEDULoop
|
||||
}
|
||||
|
@ -246,14 +266,14 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
// has exceeded a maximum allowable value. Clean up the in-memory
|
||||
// buffers at this point. The PDU clean-up is already on a defer.
|
||||
log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
|
||||
for i := range pendingPDUs {
|
||||
pendingPDUs[i] = nil
|
||||
for i := range oq.pendingPDUs {
|
||||
oq.pendingPDUs[i] = nil
|
||||
}
|
||||
for i := range pendingEDUs {
|
||||
pendingEDUs[i] = nil
|
||||
for i := range oq.pendingEDUs {
|
||||
oq.pendingEDUs[i] = nil
|
||||
}
|
||||
pendingPDUs = nil
|
||||
pendingEDUs = nil
|
||||
oq.pendingPDUs = nil
|
||||
oq.pendingEDUs = nil
|
||||
return
|
||||
}
|
||||
if until != nil && until.After(time.Now()) {
|
||||
|
@ -267,9 +287,18 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
}
|
||||
}
|
||||
|
||||
pduCount := len(oq.pendingPDUs)
|
||||
eduCount := len(oq.pendingEDUs)
|
||||
if pduCount > maxPDUsPerTransaction {
|
||||
pduCount = maxPDUsPerTransaction
|
||||
}
|
||||
if eduCount > maxEDUsPerTransaction {
|
||||
eduCount = maxEDUsPerTransaction
|
||||
}
|
||||
|
||||
// If we have pending PDUs or EDUs then construct a transaction.
|
||||
// Try sending the next transaction and see what happens.
|
||||
transaction, terr := oq.nextTransaction(pendingPDUs, pendingEDUs)
|
||||
transaction, pc, ec, terr := oq.nextTransaction(oq.pendingPDUs[:pduCount], oq.pendingEDUs[:eduCount])
|
||||
if terr != nil {
|
||||
// We failed to send the transaction. Mark it as a failure.
|
||||
oq.statistics.Failure()
|
||||
|
@ -277,14 +306,14 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
// If we successfully sent the transaction then clear out
|
||||
// the pending events and EDUs, and wipe our transaction ID.
|
||||
oq.statistics.Success()
|
||||
for i := range pendingPDUs {
|
||||
pendingPDUs[i] = nil
|
||||
for i := range oq.pendingPDUs {
|
||||
oq.pendingPDUs[i] = nil
|
||||
}
|
||||
for i := range pendingEDUs {
|
||||
pendingEDUs[i] = nil
|
||||
for i := range oq.pendingEDUs {
|
||||
oq.pendingEDUs[i] = nil
|
||||
}
|
||||
pendingPDUs = pendingPDUs[:0]
|
||||
pendingEDUs = pendingEDUs[:0]
|
||||
oq.pendingPDUs = oq.pendingPDUs[pc:]
|
||||
oq.pendingEDUs = oq.pendingEDUs[ec:]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -296,7 +325,7 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
func (oq *destinationQueue) nextTransaction(
|
||||
pdus []*queuedPDU,
|
||||
edus []*queuedEDU,
|
||||
) (bool, error) {
|
||||
) (bool, int, int, error) {
|
||||
// Before we do anything, we need to roll over the transaction
|
||||
// ID that is being used to coalesce events into the next TX.
|
||||
// Otherwise it's possible that we'll pick up an incomplete
|
||||
|
@ -319,7 +348,7 @@ func (oq *destinationQueue) nextTransaction(
|
|||
// If we didn't get anything from the database and there are no
|
||||
// pending EDUs then there's nothing to do - stop here.
|
||||
if len(pdus) == 0 && len(edus) == 0 {
|
||||
return false, nil
|
||||
return false, 0, 0, nil
|
||||
}
|
||||
|
||||
// Pick out the transaction ID from the database. If we didn't
|
||||
|
@ -372,16 +401,16 @@ func (oq *destinationQueue) nextTransaction(
|
|||
log.WithError(err).Errorf("failed to clean EDUs for server %q", t.Destination)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
return true, len(t.PDUs), len(t.EDUs), nil
|
||||
case gomatrix.HTTPError:
|
||||
// Report that we failed to send the transaction and we
|
||||
// will retry again, subject to backoff.
|
||||
return false, err
|
||||
return false, 0, 0, err
|
||||
default:
|
||||
log.WithFields(log.Fields{
|
||||
"destination": oq.destination,
|
||||
log.ErrorKey: err,
|
||||
}).Info("problem sending transaction")
|
||||
return false, err
|
||||
return false, 0, 0, err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,8 +84,9 @@ func NewOutgoingQueues(
|
|||
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
|
||||
}
|
||||
for serverName := range serverNames {
|
||||
if !queues.getQueue(serverName).statistics.Blacklisted() {
|
||||
queues.getQueue(serverName).wakeQueueIfNeeded()
|
||||
queue := queues.getQueue(serverName)
|
||||
if !queue.statistics.Blacklisted() {
|
||||
queue.wakeQueueIfNeeded()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -123,12 +124,13 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
|
|||
destination: destination,
|
||||
client: oqs.client,
|
||||
statistics: oqs.statistics.ForServer(destination),
|
||||
notifyPDUs: make(chan *queuedPDU, 128),
|
||||
notifyEDUs: make(chan *queuedEDU, 128),
|
||||
notifyPDUs: make(chan *queuedPDU, 16),
|
||||
notifyEDUs: make(chan *queuedEDU, 16),
|
||||
interruptBackoff: make(chan bool),
|
||||
signing: oqs.signing,
|
||||
}
|
||||
oqs.queues[destination] = oq
|
||||
oq.getPendingFromDatabase()
|
||||
}
|
||||
return oq
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue