From ea1e8397fc6d4e96c74f4a131069350195b7e2b6 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 1 Jul 2020 11:19:48 +0100 Subject: [PATCH] Thread safety on transaction ID/count --- federationsender/queue/destinationqueue.go | 54 +++++++++++----------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index b6c3aa23a..a736b3852 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -38,25 +38,25 @@ const maxPDUsPerTransaction = 50 // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { - db storage.Database - signing *SigningInfo - rsAPI api.RoomserverInternalAPI - client *gomatrixserverlib.FederationClient // federation client - origin gomatrixserverlib.ServerName // origin of requests - destination gomatrixserverlib.ServerName // destination of requests - running atomic.Bool // is the queue worker running? - backingOff atomic.Bool // true if we're backing off - statistics *types.ServerStatistics // statistics about this remote server - incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send - incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send - transactionMutex sync.Mutex // protects transactionID and transactionCount - transactionID gomatrixserverlib.TransactionID // last transaction ID - transactionCount int // how many events in this transaction so far - pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent - pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend - pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend - wakeServerCh chan bool // interrupts idle wait - retryServerCh chan bool // interrupts backoff + db storage.Database + signing *SigningInfo + rsAPI api.RoomserverInternalAPI + client *gomatrixserverlib.FederationClient // federation client + origin gomatrixserverlib.ServerName // origin of requests + destination gomatrixserverlib.ServerName // destination of requests + running atomic.Bool // is the queue worker running? + backingOff atomic.Bool // true if we're backing off + statistics *types.ServerStatistics // statistics about this remote server + incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send + incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send + transactionIDMutex sync.Mutex // protects transactionID + transactionID gomatrixserverlib.TransactionID // last transaction ID + transactionCount atomic.Int32 // how many events in this transaction so far + pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent + pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend + pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend + wakeServerCh chan bool // interrupts idle wait + retryServerCh chan bool // interrupts backoff } // retry will clear the blacklist state and attempt to send built up events to the server, @@ -95,13 +95,13 @@ func (oq *destinationQueue) sendEvent(nid int64) { // one made up yet, or if we've exceeded the number of maximum // events allowed in a single tranaction. We'll reset the counter // when we do. - oq.transactionMutex.Lock() - if oq.transactionID == "" || oq.transactionCount >= maxPDUsPerTransaction { + oq.transactionIDMutex.Lock() + if oq.transactionID == "" || oq.transactionCount.Load() >= maxPDUsPerTransaction { now := gomatrixserverlib.AsTimestamp(time.Now()) oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount())) - oq.transactionCount = 0 + oq.transactionCount.Store(0) } - oq.transactionMutex.Unlock() + oq.transactionIDMutex.Unlock() // Create a database entry that associates the given PDU NID with // this destination queue. We'll then be able to retrieve the PDU // later. @@ -116,7 +116,7 @@ func (oq *destinationQueue) sendEvent(nid int64) { } // We've successfully added a PDU to the transaction so increase // the counter. - oq.transactionCount++ + oq.transactionCount.Add(1) // If the queue isn't running at this point then start it. if !oq.running.Load() { go oq.backgroundSend() @@ -300,10 +300,10 @@ func (oq *destinationQueue) nextTransaction( // Otherwise it's possible that we'll pick up an incomplete // transaction and end up nuking the rest of the events at the // cleanup stage. - oq.transactionMutex.Lock() + oq.transactionIDMutex.Lock() oq.transactionID = "" - oq.transactionCount = 0 - oq.transactionMutex.Unlock() + oq.transactionIDMutex.Unlock() + oq.transactionCount.Store(0) // Create the transaction. t := gomatrixserverlib.Transaction{