mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 05:43:09 -06:00
Don't block if there are PDUs waiting
This commit is contained in:
parent
5082179ce3
commit
51dba37d7a
|
|
@ -184,44 +184,46 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
|
|
||||||
// Wait either for incoming events, or until we hit an
|
// Wait either for incoming events, or until we hit an
|
||||||
// idle timeout.
|
// idle timeout.
|
||||||
select {
|
if len(oq.pendingPDUs) == 0 {
|
||||||
case <-oq.incomingPDUs:
|
select {
|
||||||
// There are new PDUs waiting in the database.
|
case <-oq.incomingPDUs:
|
||||||
case edu := <-oq.incomingEDUs:
|
// There are new PDUs waiting in the database.
|
||||||
// Likewise for EDUs, although we should probably not try
|
case edu := <-oq.incomingEDUs:
|
||||||
// too hard with some EDUs (like typing notifications) after
|
// Likewise for EDUs, although we should probably not try
|
||||||
// a certain amount of time has passed.
|
// too hard with some EDUs (like typing notifications) after
|
||||||
// TODO: think about EDU expiry some more
|
// a certain amount of time has passed.
|
||||||
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
// TODO: think about EDU expiry some more
|
||||||
// If there are any more things waiting in the channel queue
|
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
||||||
// then read them. This is safe because we guarantee only
|
// If there are any more things waiting in the channel queue
|
||||||
// having one goroutine per destination queue, so the channel
|
// then read them. This is safe because we guarantee only
|
||||||
// isn't being consumed anywhere else.
|
// having one goroutine per destination queue, so the channel
|
||||||
for len(oq.incomingEDUs) > 0 {
|
// isn't being consumed anywhere else.
|
||||||
oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
|
for len(oq.incomingEDUs) > 0 {
|
||||||
|
oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
|
||||||
|
}
|
||||||
|
case invite := <-oq.incomingInvites:
|
||||||
|
// There's no strict ordering requirement for invites like
|
||||||
|
// there is for transactions, so we put the invite onto the
|
||||||
|
// front of the queue. This means that if an invite that is
|
||||||
|
// stuck failing already, that it won't block our new invite
|
||||||
|
// from being sent.
|
||||||
|
oq.pendingInvites = append(
|
||||||
|
[]*gomatrixserverlib.InviteV2Request{invite},
|
||||||
|
oq.pendingInvites...,
|
||||||
|
)
|
||||||
|
// If there are any more things waiting in the channel queue
|
||||||
|
// then read them. This is safe because we guarantee only
|
||||||
|
// having one goroutine per destination queue, so the channel
|
||||||
|
// isn't being consumed anywhere else.
|
||||||
|
for len(oq.incomingInvites) > 0 {
|
||||||
|
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second * 30):
|
||||||
|
// The worker is idle so stop the goroutine. It'll
|
||||||
|
// get restarted automatically the next time we
|
||||||
|
// get an event.
|
||||||
|
return
|
||||||
}
|
}
|
||||||
case invite := <-oq.incomingInvites:
|
|
||||||
// There's no strict ordering requirement for invites like
|
|
||||||
// there is for transactions, so we put the invite onto the
|
|
||||||
// front of the queue. This means that if an invite that is
|
|
||||||
// stuck failing already, that it won't block our new invite
|
|
||||||
// from being sent.
|
|
||||||
oq.pendingInvites = append(
|
|
||||||
[]*gomatrixserverlib.InviteV2Request{invite},
|
|
||||||
oq.pendingInvites...,
|
|
||||||
)
|
|
||||||
// If there are any more things waiting in the channel queue
|
|
||||||
// then read them. This is safe because we guarantee only
|
|
||||||
// having one goroutine per destination queue, so the channel
|
|
||||||
// isn't being consumed anywhere else.
|
|
||||||
for len(oq.incomingInvites) > 0 {
|
|
||||||
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
|
|
||||||
}
|
|
||||||
case <-time.After(time.Second * 30):
|
|
||||||
// The worker is idle so stop the goroutine. It'll
|
|
||||||
// get restarted automatically the next time we
|
|
||||||
// get an event.
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we are backing off this server then wait for the
|
// If we are backing off this server then wait for the
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue