Thread safety on transaction ID/count

This commit is contained in:
Neil Alexander 2020-07-01 11:19:48 +01:00
parent d6c94064af
commit ea1e8397fc

View file

@ -38,25 +38,25 @@ const maxPDUsPerTransaction = 50
// ensures that only one request is in flight to a given destination
// at a time.
type destinationQueue struct {
db storage.Database
signing *SigningInfo
rsAPI api.RoomserverInternalAPI
client *gomatrixserverlib.FederationClient // federation client
origin gomatrixserverlib.ServerName // origin of requests
destination gomatrixserverlib.ServerName // destination of requests
running atomic.Bool // is the queue worker running?
backingOff atomic.Bool // true if we're backing off
statistics *types.ServerStatistics // statistics about this remote server
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
transactionMutex sync.Mutex // protects transactionID and transactionCount
transactionID gomatrixserverlib.TransactionID // last transaction ID
transactionCount int // how many events in this transaction so far
pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
wakeServerCh chan bool // interrupts idle wait
retryServerCh chan bool // interrupts backoff
db storage.Database
signing *SigningInfo
rsAPI api.RoomserverInternalAPI
client *gomatrixserverlib.FederationClient // federation client
origin gomatrixserverlib.ServerName // origin of requests
destination gomatrixserverlib.ServerName // destination of requests
running atomic.Bool // is the queue worker running?
backingOff atomic.Bool // true if we're backing off
statistics *types.ServerStatistics // statistics about this remote server
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
transactionIDMutex sync.Mutex // protects transactionID
transactionID gomatrixserverlib.TransactionID // last transaction ID
transactionCount atomic.Int32 // how many events in this transaction so far
pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
wakeServerCh chan bool // interrupts idle wait
retryServerCh chan bool // interrupts backoff
}
// retry will clear the blacklist state and attempt to send built up events to the server,
@ -95,13 +95,13 @@ func (oq *destinationQueue) sendEvent(nid int64) {
// one made up yet, or if we've exceeded the number of maximum
// events allowed in a single tranaction. We'll reset the counter
// when we do.
oq.transactionMutex.Lock()
if oq.transactionID == "" || oq.transactionCount >= maxPDUsPerTransaction {
oq.transactionIDMutex.Lock()
if oq.transactionID == "" || oq.transactionCount.Load() >= maxPDUsPerTransaction {
now := gomatrixserverlib.AsTimestamp(time.Now())
oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
oq.transactionCount = 0
oq.transactionCount.Store(0)
}
oq.transactionMutex.Unlock()
oq.transactionIDMutex.Unlock()
// Create a database entry that associates the given PDU NID with
// this destination queue. We'll then be able to retrieve the PDU
// later.
@ -116,7 +116,7 @@ func (oq *destinationQueue) sendEvent(nid int64) {
}
// We've successfully added a PDU to the transaction so increase
// the counter.
oq.transactionCount++
oq.transactionCount.Add(1)
// If the queue isn't running at this point then start it.
if !oq.running.Load() {
go oq.backgroundSend()
@ -300,10 +300,10 @@ func (oq *destinationQueue) nextTransaction(
// Otherwise it's possible that we'll pick up an incomplete
// transaction and end up nuking the rest of the events at the
// cleanup stage.
oq.transactionMutex.Lock()
oq.transactionIDMutex.Lock()
oq.transactionID = ""
oq.transactionCount = 0
oq.transactionMutex.Unlock()
oq.transactionIDMutex.Unlock()
oq.transactionCount.Store(0)
// Create the transaction.
t := gomatrixserverlib.Transaction{