This commit is contained in:
Neil Alexander 2020-06-30 15:36:42 +01:00
parent 61ff558fef
commit b4bbefd523
2 changed files with 69 additions and 44 deletions

View file

@ -160,14 +160,24 @@ func (oq *destinationQueue) backgroundSend() {
}
defer oq.running.Store(false)
// Since the PDU buffer can be rehydrated from the database, we
// dont need to keep it in memory after the worker stops running.
// transactionID and transactionCount are enough to help queue up
// additional event in the same transaction.
defer oq.cleanPendingPDUs()
for {
// For now we don't know the next transaction ID that we'll
// pluck from the database.
// For now we don't know the next transaction ID. Set it to an
// empty one. The next step will populate it if we have pending
// PDUs in the database. Otherwise we'll generate one later on,
// e.g. in response to EDUs.
transactionID := gomatrixserverlib.TransactionID("")
// Check to see if there are any pending PDUs in the database.
// If we haven't reached the PDU limit yet then retrieve those
// events so that they can be added into this transaction.
// If we haven't reached the PDU limit yet then rehydrate the
// PDU queue from the database. We'll retrieve the events in the
// next transaction. Note that this does *not* necessarily mean
// we will fill the buffer - the important thing here is that we
// still continue to send transactions in order.
if len(oq.pendingPDUs) < maxPDUsPerTransaction {
txid, pdus, err := oq.db.GetNextTransactionPDUs(
context.TODO(), // context
@ -219,9 +229,9 @@ func (oq *destinationQueue) backgroundSend() {
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
}
case <-time.After(time.Second * 30):
// The worker is idle so stop the goroutine. It'll
// get restarted automatically the next time we
// get an event.
// The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to
// send.
return
}
}
@ -238,13 +248,8 @@ func (oq *destinationQueue) backgroundSend() {
oq.backingOff.Store(false)
}
// How many things do we have waiting?
numPDUs := len(oq.pendingPDUs)
numEDUs := len(oq.pendingEDUs)
numInvites := len(oq.pendingInvites)
// If we have pending PDUs or EDUs then construct a transaction.
if numPDUs > 0 || numEDUs > 0 {
if len(oq.pendingPDUs) > 0 || len(oq.pendingEDUs) > 0 {
// If we haven't got a transaction ID then we should generate
// one. Ideally we'd know this already because something queued
// in the database would give us one, but if we're dealing with
@ -259,8 +264,11 @@ func (oq *destinationQueue) backgroundSend() {
if terr != nil {
// We failed to send the transaction.
if giveUp := oq.statistics.Failure(); giveUp {
// It's been suggested that we should give up because
// the backoff has exceeded a maximum allowable value.
// It's been suggested that we should give up because the backoff
// has exceeded a maximum allowable value. Clean up the in-memory
// buffers at this point. The PDU clean-up is already on a defer.
oq.cleanPendingEDUs()
oq.cleanPendingInvites()
return
}
} else if transaction {
@ -268,22 +276,9 @@ func (oq *destinationQueue) backgroundSend() {
// the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success()
oq.transactionID = ""
// Reallocate so that the underlying arrays can be GC'd, as
// opposed to growing forever.
for i := 0; i < numPDUs; i++ {
oq.pendingPDUs[i] = nil
}
for i := 0; i < numEDUs; i++ {
oq.pendingEDUs[i] = nil
}
oq.pendingPDUs = append(
[]*gomatrixserverlib.HeaderedEvent{},
oq.pendingPDUs[numPDUs:]...,
)
oq.pendingEDUs = append(
[]*gomatrixserverlib.EDU{},
oq.pendingEDUs[numEDUs:]...,
)
// Clean up the in-memory buffers.
oq.cleanPendingPDUs()
oq.cleanPendingEDUs()
// Clean up the transaction in the database.
if err := oq.db.CleanTransactionPDUs(
context.TODO(),
@ -296,7 +291,7 @@ func (oq *destinationQueue) backgroundSend() {
}
// Try sending the next invite and see what happens.
if numInvites > 0 {
if len(oq.pendingInvites) > 0 {
sent, ierr := oq.nextInvites(oq.pendingInvites)
if ierr != nil {
// We failed to send the transaction so increase the
@ -312,15 +307,52 @@ func (oq *destinationQueue) backgroundSend() {
oq.statistics.Success()
// Reallocate so that the underlying array can be GC'd, as
// opposed to growing forever.
oq.pendingInvites = append(
[]*gomatrixserverlib.InviteV2Request{},
oq.pendingInvites[sent:]...,
)
oq.cleanPendingInvites()
}
}
}
}
// cleanPendingPDUs cleans out the pending PDU buffer, removing
// all references so that the underlying objects can be GC'd.
func (oq *destinationQueue) cleanPendingPDUs() {
numPDUs := len(oq.pendingPDUs)
for i := 0; i < numPDUs; i++ {
oq.pendingPDUs[i] = nil
}
oq.pendingPDUs = append(
[]*gomatrixserverlib.HeaderedEvent{},
oq.pendingPDUs[numPDUs:]...,
)
}
// cleanPendingEDUs cleans out the pending EDU buffer, removing
// all references so that the underlying objects can be GC'd.
func (oq *destinationQueue) cleanPendingEDUs() {
numEDUs := len(oq.pendingEDUs)
for i := 0; i < numEDUs; i++ {
oq.pendingEDUs[i] = nil
}
oq.pendingEDUs = append(
[]*gomatrixserverlib.EDU{},
oq.pendingEDUs[numEDUs:]...,
)
}
// cleanPendingInvites cleans out the pending invite buffer,
// removing all references so that the underlying objects can
// be GC'd.
func (oq *destinationQueue) cleanPendingInvites() {
numInvites := len(oq.pendingInvites)
for i := 0; i < numInvites; i++ {
oq.pendingInvites[i] = nil
}
oq.pendingInvites = append(
[]*gomatrixserverlib.InviteV2Request{},
oq.pendingInvites[numInvites:]...,
)
}
// nextTransaction creates a new transaction from the pending event
// queue and sends it. Returns true if a transaction was sent or
// false otherwise.

View file

@ -225,15 +225,11 @@ func (d *Database) CleanTransactionPDUs(
transactionID gomatrixserverlib.TransactionID,
) error {
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
fmt.Println("Cleaning up after transaction", transactionID)
nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50)
if err != nil {
return fmt.Errorf("d.selectQueuePDUs: %w", err)
}
fmt.Println("Transaction", transactionID, "has", len(nids), "NIDs")
if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil {
return fmt.Errorf("d.deleteQueueTransaction: %w", err)
}
@ -250,9 +246,6 @@ func (d *Database) CleanTransactionPDUs(
}
}
fmt.Println("There are", len(deleteNIDs), "unreferenced NIDs ready for deletion")
fmt.Println("There are", len(nids)-len(deleteNIDs), "NIDs still referenced")
if len(deleteNIDs) > 0 {
if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
return fmt.Errorf("d.deleteQueueJSON: %w", err)