Don't spin

This commit is contained in:
Neil Alexander 2020-07-02 16:19:48 +01:00
parent 9ad0781831
commit 0214ce3f4f

View file

@ -164,6 +164,9 @@ func (oq *destinationQueue) wakeQueueIfNeeded() {
} else {
log.WithError(err).Errorf("Can't get pending PDU count for %q destination queue", oq.destination)
}
if count > 0 {
oq.wakeServerCh <- true
}
// Then start the queue.
go oq.backgroundSend()
}
@ -182,46 +185,44 @@ func (oq *destinationQueue) backgroundSend() {
for {
// If we have nothing to do then wait either for incoming events, or
// until we hit an idle timeout.
if oq.pendingPDUs.Load() == 0 && len(oq.pendingEDUs) == 0 && len(oq.pendingInvites) == 0 {
select {
case <-oq.wakeServerCh:
// We were woken up because there are new PDUs waiting in the
// database.
case edu := <-oq.incomingEDUs:
// EDUs are handled in-memory for now. We will try to keep
// the ordering intact.
// TODO: Certain EDU types need persistence, e.g. send-to-device
oq.pendingEDUs = append(oq.pendingEDUs, edu)
// If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only
// having one goroutine per destination queue, so the channel
// isn't being consumed anywhere else.
for len(oq.incomingEDUs) > 0 {
oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
}
case invite := <-oq.incomingInvites:
// There's no strict ordering requirement for invites like
// there is for transactions, so we put the invite onto the
// front of the queue. This means that if an invite that is
// stuck failing already, that it won't block our new invite
// from being sent.
oq.pendingInvites = append(
[]*gomatrixserverlib.InviteV2Request{invite},
oq.pendingInvites...,
)
// If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only
// having one goroutine per destination queue, so the channel
// isn't being consumed anywhere else.
for len(oq.incomingInvites) > 0 {
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
}
case <-time.After(time.Second * 30):
// The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to
// send.
return
select {
case <-oq.wakeServerCh:
// We were woken up because there are new PDUs waiting in the
// database.
case edu := <-oq.incomingEDUs:
// EDUs are handled in-memory for now. We will try to keep
// the ordering intact.
// TODO: Certain EDU types need persistence, e.g. send-to-device
oq.pendingEDUs = append(oq.pendingEDUs, edu)
// If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only
// having one goroutine per destination queue, so the channel
// isn't being consumed anywhere else.
for len(oq.incomingEDUs) > 0 {
oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
}
case invite := <-oq.incomingInvites:
// There's no strict ordering requirement for invites like
// there is for transactions, so we put the invite onto the
// front of the queue. This means that if an invite that is
// stuck failing already, that it won't block our new invite
// from being sent.
oq.pendingInvites = append(
[]*gomatrixserverlib.InviteV2Request{invite},
oq.pendingInvites...,
)
// If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only
// having one goroutine per destination queue, so the channel
// isn't being consumed anywhere else.
for len(oq.incomingInvites) > 0 {
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
}
case <-time.After(time.Second * 30):
// The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to
// send.
return
}
// If we are backing off this server then wait for the
@ -329,8 +330,10 @@ func (oq *destinationQueue) nextTransaction(
// Ask the database for any pending PDUs from the next transaction.
// maxPDUsPerTransaction is an upper limit but we probably won't
// actually retrieve that many events.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
txid, pdus, err := oq.db.GetNextTransactionPDUs(
context.TODO(), // context
ctx, // context
oq.destination, // server name
maxPDUsPerTransaction, // max events to retrieve
)