mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-20 21:33:19 -06:00
Don't stop when there is work to be done
This commit is contained in:
parent
a9aa3c263b
commit
4cc1eaac5a
|
|
@ -46,7 +46,6 @@ type destinationQueue struct {
|
||||||
running atomic.Bool // is the queue worker running?
|
running atomic.Bool // is the queue worker running?
|
||||||
backingOff atomic.Bool // true if we're backing off
|
backingOff atomic.Bool // true if we're backing off
|
||||||
statistics *types.ServerStatistics // statistics about this remote server
|
statistics *types.ServerStatistics // statistics about this remote server
|
||||||
incomingPDUs chan struct{} // signal that there are PDUs waiting
|
|
||||||
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
|
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
|
||||||
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
|
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
|
||||||
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
||||||
|
|
@ -54,6 +53,7 @@ type destinationQueue struct {
|
||||||
pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent
|
pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent
|
||||||
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
|
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
|
||||||
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
|
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
|
||||||
|
wakeServerCh chan bool // interrupts idle wait
|
||||||
retryServerCh chan bool // interrupts backoff
|
retryServerCh chan bool // interrupts backoff
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -120,7 +120,7 @@ func (oq *destinationQueue) sendEvent(nid int64) {
|
||||||
// Signal that we've sent a new PDU. This will cause the queue to
|
// Signal that we've sent a new PDU. This will cause the queue to
|
||||||
// wake up if it's asleep.
|
// wake up if it's asleep.
|
||||||
oq.pendingPDUs.Add(1)
|
oq.pendingPDUs.Add(1)
|
||||||
oq.incomingPDUs <- struct{}{}
|
oq.wakeServerCh <- true
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendEDU adds the EDU event to the pending queue for the destination.
|
// sendEDU adds the EDU event to the pending queue for the destination.
|
||||||
|
|
@ -168,46 +168,48 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
// e.g. in response to EDUs.
|
// e.g. in response to EDUs.
|
||||||
transactionID := gomatrixserverlib.TransactionID("")
|
transactionID := gomatrixserverlib.TransactionID("")
|
||||||
|
|
||||||
// Wait either for incoming events, or until we hit an
|
// If we have nothing to do then wait either for incoming events, or
|
||||||
// idle timeout.
|
// until we hit an idle timeout.
|
||||||
select {
|
if oq.pendingPDUs.Load() == 0 {
|
||||||
case <-oq.incomingPDUs:
|
select {
|
||||||
// We were woken up because there are new PDUs waiting in the
|
case <-oq.wakeServerCh:
|
||||||
// database.
|
// We were woken up because there are new PDUs waiting in the
|
||||||
case edu := <-oq.incomingEDUs:
|
// database.
|
||||||
// EDUs are handled in-memory for now. We will try to keep
|
case edu := <-oq.incomingEDUs:
|
||||||
// the ordering intact.
|
// EDUs are handled in-memory for now. We will try to keep
|
||||||
// TODO: Certain EDU types need persistence, e.g. send-to-device
|
// the ordering intact.
|
||||||
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
// TODO: Certain EDU types need persistence, e.g. send-to-device
|
||||||
// If there are any more things waiting in the channel queue
|
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
||||||
// then read them. This is safe because we guarantee only
|
// If there are any more things waiting in the channel queue
|
||||||
// having one goroutine per destination queue, so the channel
|
// then read them. This is safe because we guarantee only
|
||||||
// isn't being consumed anywhere else.
|
// having one goroutine per destination queue, so the channel
|
||||||
for len(oq.incomingEDUs) > 0 {
|
// isn't being consumed anywhere else.
|
||||||
oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
|
for len(oq.incomingEDUs) > 0 {
|
||||||
|
oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
|
||||||
|
}
|
||||||
|
case invite := <-oq.incomingInvites:
|
||||||
|
// There's no strict ordering requirement for invites like
|
||||||
|
// there is for transactions, so we put the invite onto the
|
||||||
|
// front of the queue. This means that if an invite that is
|
||||||
|
// stuck failing already, that it won't block our new invite
|
||||||
|
// from being sent.
|
||||||
|
oq.pendingInvites = append(
|
||||||
|
[]*gomatrixserverlib.InviteV2Request{invite},
|
||||||
|
oq.pendingInvites...,
|
||||||
|
)
|
||||||
|
// If there are any more things waiting in the channel queue
|
||||||
|
// then read them. This is safe because we guarantee only
|
||||||
|
// having one goroutine per destination queue, so the channel
|
||||||
|
// isn't being consumed anywhere else.
|
||||||
|
for len(oq.incomingInvites) > 0 {
|
||||||
|
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second * 30):
|
||||||
|
// The worker is idle so stop the goroutine. It'll get
|
||||||
|
// restarted automatically the next time we have an event to
|
||||||
|
// send.
|
||||||
|
return
|
||||||
}
|
}
|
||||||
case invite := <-oq.incomingInvites:
|
|
||||||
// There's no strict ordering requirement for invites like
|
|
||||||
// there is for transactions, so we put the invite onto the
|
|
||||||
// front of the queue. This means that if an invite that is
|
|
||||||
// stuck failing already, that it won't block our new invite
|
|
||||||
// from being sent.
|
|
||||||
oq.pendingInvites = append(
|
|
||||||
[]*gomatrixserverlib.InviteV2Request{invite},
|
|
||||||
oq.pendingInvites...,
|
|
||||||
)
|
|
||||||
// If there are any more things waiting in the channel queue
|
|
||||||
// then read them. This is safe because we guarantee only
|
|
||||||
// having one goroutine per destination queue, so the channel
|
|
||||||
// isn't being consumed anywhere else.
|
|
||||||
for len(oq.incomingInvites) > 0 {
|
|
||||||
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
|
|
||||||
}
|
|
||||||
case <-time.After(time.Second * 30):
|
|
||||||
// The worker is idle so stop the goroutine. It'll get
|
|
||||||
// restarted automatically the next time we have an event to
|
|
||||||
// send.
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we are backing off this server then wait for the
|
// If we are backing off this server then wait for the
|
||||||
|
|
|
||||||
|
|
@ -88,9 +88,9 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
|
||||||
destination: destination,
|
destination: destination,
|
||||||
client: oqs.client,
|
client: oqs.client,
|
||||||
statistics: oqs.statistics.ForServer(destination),
|
statistics: oqs.statistics.ForServer(destination),
|
||||||
incomingPDUs: make(chan struct{}, 128),
|
|
||||||
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
|
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
|
||||||
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
|
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
|
||||||
|
wakeServerCh: make(chan bool, 128),
|
||||||
retryServerCh: make(chan bool),
|
retryServerCh: make(chan bool),
|
||||||
signing: oqs.signing,
|
signing: oqs.signing,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue