From 3419a7450b0821a96379f29bcf2198a40e0ed551 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 1 Jul 2020 10:12:14 +0100 Subject: [PATCH] Shuffle things around a bit --- federationsender/queue/destinationqueue.go | 149 +++++++++------------ 1 file changed, 63 insertions(+), 86 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index dd8243845..72414e8c3 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -51,7 +51,6 @@ type destinationQueue struct { incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send transactionID gomatrixserverlib.TransactionID // last transaction ID transactionCount int // how many events in this transaction so far - pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend retryServerCh chan bool // interrupts backoff @@ -160,85 +159,55 @@ func (oq *destinationQueue) backgroundSend() { } defer oq.running.Store(false) - // Since the PDU buffer can be rehydrated from the database, we - // dont need to keep it in memory after the worker stops running. - // transactionID and transactionCount are enough to help queue up - // additional event in the same transaction. - defer oq.cleanPendingPDUs() - for { + pendingPDUs := false + // For now we don't know the next transaction ID. Set it to an // empty one. The next step will populate it if we have pending // PDUs in the database. Otherwise we'll generate one later on, // e.g. in response to EDUs. transactionID := gomatrixserverlib.TransactionID("") - // Retrieve the events in the next transaction. Note that this - // does *not* necessarily mean we will fill the buffer - the - // important thing here is that we still continue to send - // transactions in order. - hydrate := func() { - txid, pdus, err := oq.db.GetNextTransactionPDUs( - context.TODO(), // context - oq.destination, // server name - maxPDUsPerTransaction-len(oq.pendingPDUs), // how many events to retrieve - ) - if err != nil { - log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination) - return - } - transactionID = txid - oq.pendingPDUs = append(oq.pendingPDUs, pdus...) - } - - // If we haven't reached the PDU limit yet then rehydrate the - // PDU queue from the database. - if len(oq.pendingPDUs) < maxPDUsPerTransaction { - hydrate() - } - // Wait either for incoming events, or until we hit an // idle timeout. - if len(oq.pendingPDUs) == 0 { - select { - case <-oq.incomingPDUs: - // There are new PDUs waiting in the database. - hydrate() - case edu := <-oq.incomingEDUs: - // EDUs are handled in-memory for now. We will try to keep - // the ordering intact. - // TODO: Certain EDU types need persistence, e.g. send-to-device - oq.pendingEDUs = append(oq.pendingEDUs, edu) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingEDUs) > 0 { - oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) - } - case invite := <-oq.incomingInvites: - // There's no strict ordering requirement for invites like - // there is for transactions, so we put the invite onto the - // front of the queue. This means that if an invite that is - // stuck failing already, that it won't block our new invite - // from being sent. - oq.pendingInvites = append( - []*gomatrixserverlib.InviteV2Request{invite}, - oq.pendingInvites..., - ) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingInvites) > 0 { - oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) - } - case <-time.After(time.Second * 30): - // The worker is idle so stop the goroutine. It'll get - // restarted automatically the next time we have an event to - // send. - return + select { + case <-oq.incomingPDUs: + // There are new PDUs waiting in the database. + pendingPDUs = true + case edu := <-oq.incomingEDUs: + // EDUs are handled in-memory for now. We will try to keep + // the ordering intact. + // TODO: Certain EDU types need persistence, e.g. send-to-device + oq.pendingEDUs = append(oq.pendingEDUs, edu) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingEDUs) > 0 { + oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) } + case invite := <-oq.incomingInvites: + // There's no strict ordering requirement for invites like + // there is for transactions, so we put the invite onto the + // front of the queue. This means that if an invite that is + // stuck failing already, that it won't block our new invite + // from being sent. + oq.pendingInvites = append( + []*gomatrixserverlib.InviteV2Request{invite}, + oq.pendingInvites..., + ) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingInvites) > 0 { + oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) + } + case <-time.After(time.Second * 30): + // The worker is idle so stop the goroutine. It'll get + // restarted automatically the next time we have an event to + // send. + return } // If we are backing off this server then wait for the @@ -254,7 +223,7 @@ func (oq *destinationQueue) backgroundSend() { } // If we have pending PDUs or EDUs then construct a transaction. - if len(oq.pendingPDUs) > 0 || len(oq.pendingEDUs) > 0 { + if pendingPDUs || len(oq.pendingEDUs) > 0 { // If we haven't got a transaction ID then we should generate // one. Ideally we'd know this already because something queued // in the database would give us one, but if we're dealing with @@ -265,7 +234,7 @@ func (oq *destinationQueue) backgroundSend() { } // Try sending the next transaction and see what happens. - transaction, terr := oq.nextTransaction(transactionID, oq.pendingPDUs, oq.pendingEDUs) + transaction, terr := oq.nextTransaction(transactionID, oq.pendingEDUs) if terr != nil { // We failed to send the transaction. if giveUp := oq.statistics.Failure(); giveUp { @@ -282,7 +251,6 @@ func (oq *destinationQueue) backgroundSend() { oq.statistics.Success() oq.transactionID = "" // Clean up the in-memory buffers. - oq.cleanPendingPDUs() oq.cleanPendingEDUs() // Clean up the transaction in the database. if err := oq.db.CleanTransactionPDUs( @@ -318,15 +286,6 @@ func (oq *destinationQueue) backgroundSend() { } } -// cleanPendingPDUs cleans out the pending PDU buffer, removing -// all references so that the underlying objects can be GC'd. -func (oq *destinationQueue) cleanPendingPDUs() { - for i := 0; i < len(oq.pendingPDUs); i++ { - oq.pendingPDUs[i] = nil - } - oq.pendingPDUs = []*gomatrixserverlib.HeaderedEvent{} -} - // cleanPendingEDUs cleans out the pending EDU buffer, removing // all references so that the underlying objects can be GC'd. func (oq *destinationQueue) cleanPendingEDUs() { @@ -351,19 +310,37 @@ func (oq *destinationQueue) cleanPendingInvites() { // false otherwise. func (oq *destinationQueue) nextTransaction( transactionID gomatrixserverlib.TransactionID, - pendingPDUs []*gomatrixserverlib.HeaderedEvent, pendingEDUs []*gomatrixserverlib.EDU, ) (bool, error) { t := gomatrixserverlib.Transaction{ PDUs: []json.RawMessage{}, EDUs: []gomatrixserverlib.EDU{}, } - t.TransactionID = transactionID t.Origin = oq.origin t.Destination = oq.destination t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now()) - for _, pdu := range pendingPDUs { + txid, pdus, err := oq.db.GetNextTransactionPDUs( + context.TODO(), // context + oq.destination, // server name + maxPDUsPerTransaction, // how many events to retrieve + ) + if err != nil { + log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination) + return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err) + } + + if txid != "" { + // The database supplied us with a transaction ID to use + // from a failed PDU so use that. + t.TransactionID = txid + } else { + // Otherwise, use the one that the function call gave us. + // This would happen if it's EDUs only. + t.TransactionID = transactionID + } + + for _, pdu := range pdus { // Append the JSON of the event, since this is a json.RawMessage type in the // gomatrixserverlib.Transaction struct t.PDUs = append(t.PDUs, (*pdu).JSON()) @@ -378,7 +355,7 @@ func (oq *destinationQueue) nextTransaction( // TODO: we should check for 500-ish fails vs 400-ish here, // since we shouldn't queue things indefinitely in response // to a 400-ish error - _, err := oq.client.SendTransaction(context.TODO(), t) + _, err = oq.client.SendTransaction(context.TODO(), t) switch e := err.(type) { case nil: // No error was returned so the transaction looks to have