diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 2d81bc67b..dd01d3e45 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -160,14 +160,24 @@ func (oq *destinationQueue) backgroundSend() { } defer oq.running.Store(false) + // Since the PDU buffer can be rehydrated from the database, we + // dont need to keep it in memory after the worker stops running. + // transactionID and transactionCount are enough to help queue up + // additional event in the same transaction. + defer oq.cleanPendingPDUs() + for { - // For now we don't know the next transaction ID that we'll - // pluck from the database. + // For now we don't know the next transaction ID. Set it to an + // empty one. The next step will populate it if we have pending + // PDUs in the database. Otherwise we'll generate one later on, + // e.g. in response to EDUs. transactionID := gomatrixserverlib.TransactionID("") - // Check to see if there are any pending PDUs in the database. - // If we haven't reached the PDU limit yet then retrieve those - // events so that they can be added into this transaction. + // If we haven't reached the PDU limit yet then rehydrate the + // PDU queue from the database. We'll retrieve the events in the + // next transaction. Note that this does *not* necessarily mean + // we will fill the buffer - the important thing here is that we + // still continue to send transactions in order. if len(oq.pendingPDUs) < maxPDUsPerTransaction { txid, pdus, err := oq.db.GetNextTransactionPDUs( context.TODO(), // context @@ -219,9 +229,9 @@ func (oq *destinationQueue) backgroundSend() { oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) } case <-time.After(time.Second * 30): - // The worker is idle so stop the goroutine. It'll - // get restarted automatically the next time we - // get an event. + // The worker is idle so stop the goroutine. It'll get + // restarted automatically the next time we have an event to + // send. return } } @@ -238,13 +248,8 @@ func (oq *destinationQueue) backgroundSend() { oq.backingOff.Store(false) } - // How many things do we have waiting? - numPDUs := len(oq.pendingPDUs) - numEDUs := len(oq.pendingEDUs) - numInvites := len(oq.pendingInvites) - // If we have pending PDUs or EDUs then construct a transaction. - if numPDUs > 0 || numEDUs > 0 { + if len(oq.pendingPDUs) > 0 || len(oq.pendingEDUs) > 0 { // If we haven't got a transaction ID then we should generate // one. Ideally we'd know this already because something queued // in the database would give us one, but if we're dealing with @@ -259,8 +264,11 @@ func (oq *destinationQueue) backgroundSend() { if terr != nil { // We failed to send the transaction. if giveUp := oq.statistics.Failure(); giveUp { - // It's been suggested that we should give up because - // the backoff has exceeded a maximum allowable value. + // It's been suggested that we should give up because the backoff + // has exceeded a maximum allowable value. Clean up the in-memory + // buffers at this point. The PDU clean-up is already on a defer. + oq.cleanPendingEDUs() + oq.cleanPendingInvites() return } } else if transaction { @@ -268,22 +276,9 @@ func (oq *destinationQueue) backgroundSend() { // the pending events and EDUs, and wipe our transaction ID. oq.statistics.Success() oq.transactionID = "" - // Reallocate so that the underlying arrays can be GC'd, as - // opposed to growing forever. - for i := 0; i < numPDUs; i++ { - oq.pendingPDUs[i] = nil - } - for i := 0; i < numEDUs; i++ { - oq.pendingEDUs[i] = nil - } - oq.pendingPDUs = append( - []*gomatrixserverlib.HeaderedEvent{}, - oq.pendingPDUs[numPDUs:]..., - ) - oq.pendingEDUs = append( - []*gomatrixserverlib.EDU{}, - oq.pendingEDUs[numEDUs:]..., - ) + // Clean up the in-memory buffers. + oq.cleanPendingPDUs() + oq.cleanPendingEDUs() // Clean up the transaction in the database. if err := oq.db.CleanTransactionPDUs( context.TODO(), @@ -296,7 +291,7 @@ func (oq *destinationQueue) backgroundSend() { } // Try sending the next invite and see what happens. - if numInvites > 0 { + if len(oq.pendingInvites) > 0 { sent, ierr := oq.nextInvites(oq.pendingInvites) if ierr != nil { // We failed to send the transaction so increase the @@ -312,15 +307,52 @@ func (oq *destinationQueue) backgroundSend() { oq.statistics.Success() // Reallocate so that the underlying array can be GC'd, as // opposed to growing forever. - oq.pendingInvites = append( - []*gomatrixserverlib.InviteV2Request{}, - oq.pendingInvites[sent:]..., - ) + oq.cleanPendingInvites() } } } } +// cleanPendingPDUs cleans out the pending PDU buffer, removing +// all references so that the underlying objects can be GC'd. +func (oq *destinationQueue) cleanPendingPDUs() { + numPDUs := len(oq.pendingPDUs) + for i := 0; i < numPDUs; i++ { + oq.pendingPDUs[i] = nil + } + oq.pendingPDUs = append( + []*gomatrixserverlib.HeaderedEvent{}, + oq.pendingPDUs[numPDUs:]..., + ) +} + +// cleanPendingEDUs cleans out the pending EDU buffer, removing +// all references so that the underlying objects can be GC'd. +func (oq *destinationQueue) cleanPendingEDUs() { + numEDUs := len(oq.pendingEDUs) + for i := 0; i < numEDUs; i++ { + oq.pendingEDUs[i] = nil + } + oq.pendingEDUs = append( + []*gomatrixserverlib.EDU{}, + oq.pendingEDUs[numEDUs:]..., + ) +} + +// cleanPendingInvites cleans out the pending invite buffer, +// removing all references so that the underlying objects can +// be GC'd. +func (oq *destinationQueue) cleanPendingInvites() { + numInvites := len(oq.pendingInvites) + for i := 0; i < numInvites; i++ { + oq.pendingInvites[i] = nil + } + oq.pendingInvites = append( + []*gomatrixserverlib.InviteV2Request{}, + oq.pendingInvites[numInvites:]..., + ) +} + // nextTransaction creates a new transaction from the pending event // queue and sends it. Returns true if a transaction was sent or // false otherwise. diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index 153dec049..18d1532a4 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -225,15 +225,11 @@ func (d *Database) CleanTransactionPDUs( transactionID gomatrixserverlib.TransactionID, ) error { return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { - fmt.Println("Cleaning up after transaction", transactionID) - nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50) if err != nil { return fmt.Errorf("d.selectQueuePDUs: %w", err) } - fmt.Println("Transaction", transactionID, "has", len(nids), "NIDs") - if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil { return fmt.Errorf("d.deleteQueueTransaction: %w", err) } @@ -250,9 +246,6 @@ func (d *Database) CleanTransactionPDUs( } } - fmt.Println("There are", len(deleteNIDs), "unreferenced NIDs ready for deletion") - fmt.Println("There are", len(nids)-len(deleteNIDs), "NIDs still referenced") - if len(deleteNIDs) > 0 { if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil { return fmt.Errorf("d.deleteQueueJSON: %w", err)