Remodel a bit with channels

This commit is contained in:
Neil Alexander 2020-05-06 15:43:38 +01:00
parent 1be77d2a97
commit e4f828101d
2 changed files with 73 additions and 126 deletions

View file

@ -18,7 +18,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/producers"
@ -34,32 +33,19 @@ import (
// ensures that only one request is in flight to a given destination // ensures that only one request is in flight to a given destination
// at a time. // at a time.
type destinationQueue struct { type destinationQueue struct {
rsProducer *producers.RoomserverProducer // roomserver producer rsProducer *producers.RoomserverProducer // roomserver producer
client *gomatrixserverlib.FederationClient // federation client client *gomatrixserverlib.FederationClient // federation client
origin gomatrixserverlib.ServerName // origin of requests origin gomatrixserverlib.ServerName // origin of requests
destination gomatrixserverlib.ServerName // destination of requests destination gomatrixserverlib.ServerName // destination of requests
running atomic.Bool // is the queue worker running? running atomic.Bool // is the queue worker running?
wakeup chan bool // wakes up a sleeping worker statistics *types.ServerStatistics // statistics about this remote server
statistics *types.ServerStatistics // statistics about this remote server incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send
idleCounter atomic.Uint32 // how many ticks have we done nothing? incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
runningMutex sync.RWMutex // protects the below incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
lastTransactionIDs []gomatrixserverlib.TransactionID // protected by runningMutex lastTransactionIDs []gomatrixserverlib.TransactionID // last transaction ID
pendingPDUs []*gomatrixserverlib.HeaderedEvent // protected by runningMutex pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend
pendingEDUs []*gomatrixserverlib.EDU // protected by runningMutex pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // protected by runningMutex pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
}
// Start the destination queue if it needs to be started, or
// otherwise signal to it that it should wake up from sleep.
func (oq *destinationQueue) wake() {
if !oq.running.Load() {
go oq.backgroundSend()
} else {
select {
case oq.wakeup <- true:
default:
}
}
} }
// Send event adds the event to the pending queue for the destination. // Send event adds the event to the pending queue for the destination.
@ -70,10 +56,10 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) {
// If the destination is blacklisted then drop the event. // If the destination is blacklisted then drop the event.
return return
} }
oq.runningMutex.Lock() if !oq.running.Load() {
oq.pendingPDUs = append(oq.pendingPDUs, ev) go oq.backgroundSend()
oq.runningMutex.Unlock() }
oq.wake() oq.incomingPDUs <- ev
} }
// sendEDU adds the EDU event to the pending queue for the destination. // sendEDU adds the EDU event to the pending queue for the destination.
@ -84,10 +70,10 @@ func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) {
// If the destination is blacklisted then drop the event. // If the destination is blacklisted then drop the event.
return return
} }
oq.runningMutex.Lock() if !oq.running.Load() {
oq.pendingEDUs = append(oq.pendingEDUs, ev) go oq.backgroundSend()
oq.runningMutex.Unlock() }
oq.wake() oq.incomingEDUs <- ev
} }
// sendInvite adds the invite event to the pending queue for the // sendInvite adds the invite event to the pending queue for the
@ -98,51 +84,51 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
// If the destination is blacklisted then drop the event. // If the destination is blacklisted then drop the event.
return return
} }
oq.runningMutex.Lock() if !oq.running.Load() {
oq.pendingInvites = append(oq.pendingInvites, ev) go oq.backgroundSend()
oq.runningMutex.Unlock() }
oq.wake() oq.incomingInvites <- ev
} }
// backgroundSend is the worker goroutine for sending events. // backgroundSend is the worker goroutine for sending events.
// nolint:gocyclo // nolint:gocyclo
func (oq *destinationQueue) backgroundSend() { func (oq *destinationQueue) backgroundSend() {
// Mark the worker as running for its lifetime. // Mark the worker as running for its lifetime.
oq.wakeup = make(chan bool)
oq.running.Store(true) oq.running.Store(true)
defer oq.running.Store(false) defer oq.running.Store(false)
defer close(oq.wakeup)
for { for {
// Wait either for incoming events, or until we hit an
// idle timeout.
select {
case pdu := <-oq.incomingPDUs:
oq.pendingPDUs = append(oq.pendingPDUs, pdu)
case edu := <-oq.incomingEDUs:
oq.pendingEDUs = append(oq.pendingEDUs, edu)
case invite := <-oq.incomingInvites:
oq.pendingInvites = append(oq.pendingInvites, invite)
case <-time.After(time.Second * 30):
// The worker is idle so stop the goroutine. It'll
// get restarted automatically the next time we
// get an event.
return
}
// If we are backing off this server then wait for the // If we are backing off this server then wait for the
// backoff duration to complete first. // backoff duration to complete first.
if backoff, duration := oq.statistics.BackoffDuration(); backoff { if backoff, duration := oq.statistics.BackoffDuration(); backoff {
<-time.After(duration) <-time.After(duration)
} }
// Retrieve any waiting things. // How many things do we have waiting?
oq.runningMutex.RLock() numPDUs := len(oq.pendingPDUs)
pendingPDUs, numPDUs := oq.pendingPDUs, len(oq.pendingPDUs) numEDUs := len(oq.pendingEDUs)
pendingEDUs, numEDUs := oq.pendingEDUs, len(oq.pendingEDUs) numInvites := len(oq.pendingInvites)
pendingInvites := oq.pendingInvites
idleCounter, sentCounter := oq.idleCounter.Load(), oq.statistics.SuccessCount()
oq.runningMutex.RUnlock()
// If this worker has been idle for a while then stop
// running it, otherwise the goroutine will just tick
// endlessly. It'll get automatically restarted when
// a new event needs to be sent.
if idleCounter >= 5 {
return
}
if len(pendingInvites) == 0 && len(pendingPDUs) == 0 && len(pendingEDUs) == 0 {
oq.idleCounter.Add(1)
}
// If we have pending PDUs or EDUs then construct a transaction. // If we have pending PDUs or EDUs then construct a transaction.
if len(pendingPDUs) > 0 || len(pendingEDUs) > 0 { if numPDUs > 0 || numEDUs > 0 {
// Try sending the next transaction and see what happens. // Try sending the next transaction and see what happens.
transaction, terr := oq.nextTransaction(pendingPDUs, pendingEDUs, sentCounter) transaction, terr := oq.nextTransaction(oq.pendingPDUs, oq.pendingEDUs, oq.statistics.SuccessCount())
if terr != nil { if terr != nil {
// We failed to send the transaction. // We failed to send the transaction.
if giveUp := oq.statistics.Failure(); giveUp { if giveUp := oq.statistics.Failure(); giveUp {
@ -154,7 +140,6 @@ func (oq *destinationQueue) backgroundSend() {
// If we successfully sent the transaction then clear out // If we successfully sent the transaction then clear out
// the pending events and EDUs. // the pending events and EDUs.
oq.statistics.Success() oq.statistics.Success()
oq.runningMutex.Lock()
// Reallocate so that the underlying arrays can be GC'd, as // Reallocate so that the underlying arrays can be GC'd, as
// opposed to growing forever. // opposed to growing forever.
for i := 0; i < numPDUs; i++ { for i := 0; i < numPDUs; i++ {
@ -171,13 +156,12 @@ func (oq *destinationQueue) backgroundSend() {
[]*gomatrixserverlib.EDU{}, []*gomatrixserverlib.EDU{},
oq.pendingEDUs[numEDUs:]..., oq.pendingEDUs[numEDUs:]...,
) )
oq.runningMutex.Unlock()
} }
} }
// Try sending the next invite and see what happens. // Try sending the next invite and see what happens.
if len(pendingInvites) > 0 { if numInvites > 0 {
sent, ierr := oq.nextInvites(pendingInvites) sent, ierr := oq.nextInvites(oq.pendingInvites)
if ierr != nil { if ierr != nil {
// We failed to send the transaction so increase the // We failed to send the transaction so increase the
// backoff and give it another go shortly. // backoff and give it another go shortly.
@ -190,23 +174,14 @@ func (oq *destinationQueue) backgroundSend() {
// If we successfully sent the invites then clear out // If we successfully sent the invites then clear out
// the pending invites. // the pending invites.
oq.statistics.Success() oq.statistics.Success()
oq.runningMutex.Lock()
// Reallocate so that the underlying array can be GC'd, as // Reallocate so that the underlying array can be GC'd, as
// opposed to growing forever. // opposed to growing forever.
oq.pendingInvites = append( oq.pendingInvites = append(
[]*gomatrixserverlib.InviteV2Request{}, []*gomatrixserverlib.InviteV2Request{},
oq.pendingInvites[sent:]..., oq.pendingInvites[sent:]...,
) )
oq.runningMutex.Unlock()
} }
} }
// Wait either for a few seconds, or until a new event is
// available.
select {
case <-oq.wakeup:
case <-time.After(time.Second * 5):
}
} }
} }

View file

@ -32,7 +32,7 @@ type OutgoingQueues struct {
origin gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient client *gomatrixserverlib.FederationClient
statistics *types.Statistics statistics *types.Statistics
queuesMutex sync.RWMutex // protects the below queuesMutex sync.Mutex // protects the below
queues map[gomatrixserverlib.ServerName]*destinationQueue queues map[gomatrixserverlib.ServerName]*destinationQueue
} }
@ -52,6 +52,26 @@ func NewOutgoingQueues(
} }
} }
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
oq := oqs.queues[destination]
if oq == nil {
oq = &destinationQueue{
rsProducer: oqs.rsProducer,
origin: oqs.origin,
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128),
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
}
oqs.queues[destination] = oq
}
return oq
}
// SendEvent sends an event to the destinations // SendEvent sends an event to the destinations
func (oqs *OutgoingQueues) SendEvent( func (oqs *OutgoingQueues) SendEvent(
ev *gomatrixserverlib.HeaderedEvent, origin gomatrixserverlib.ServerName, ev *gomatrixserverlib.HeaderedEvent, origin gomatrixserverlib.ServerName,
@ -73,23 +93,7 @@ func (oqs *OutgoingQueues) SendEvent(
}).Info("Sending event") }).Info("Sending event")
for _, destination := range destinations { for _, destination := range destinations {
oqs.queuesMutex.RLock() oqs.getQueue(destination).sendEvent(ev)
oq := oqs.queues[destination]
oqs.queuesMutex.RUnlock()
if oq == nil {
oq = &destinationQueue{
rsProducer: oqs.rsProducer,
origin: oqs.origin,
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
}
oqs.queuesMutex.Lock()
oqs.queues[destination] = oq
oqs.queuesMutex.Unlock()
}
go oq.sendEvent(ev)
} }
return nil return nil
@ -122,23 +126,7 @@ func (oqs *OutgoingQueues) SendInvite(
"server_name": destination, "server_name": destination,
}).Info("Sending invite") }).Info("Sending invite")
oqs.queuesMutex.RLock() oqs.getQueue(destination).sendInvite(inviteReq)
oq := oqs.queues[destination]
oqs.queuesMutex.RUnlock()
if oq == nil {
oq = &destinationQueue{
rsProducer: oqs.rsProducer,
origin: oqs.origin,
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
}
oqs.queuesMutex.Lock()
oqs.queues[destination] = oq
oqs.queuesMutex.Unlock()
}
go oq.sendInvite(inviteReq)
return nil return nil
} }
@ -166,23 +154,7 @@ func (oqs *OutgoingQueues) SendEDU(
} }
for _, destination := range destinations { for _, destination := range destinations {
oqs.queuesMutex.RLock() oqs.getQueue(destination).sendEDU(e)
oq := oqs.queues[destination]
oqs.queuesMutex.RUnlock()
if oq == nil {
oq = &destinationQueue{
rsProducer: oqs.rsProducer,
origin: oqs.origin,
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
}
oqs.queuesMutex.Lock()
oqs.queues[destination] = oq
oqs.queuesMutex.Unlock()
}
go oq.sendEDU(e)
} }
return nil return nil