Remove channels as they add extra complexity and possibly will deadlock
This commit is contained in:
parent
613a47354f
commit
36356ac8d1
|
@ -57,11 +57,10 @@ type destinationQueue struct {
|
||||||
statistics *statistics.ServerStatistics // statistics about this remote server
|
statistics *statistics.ServerStatistics // statistics about this remote server
|
||||||
transactionIDMutex sync.Mutex // protects transactionID
|
transactionIDMutex sync.Mutex // protects transactionID
|
||||||
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
||||||
notifyPDUs chan *queuedPDU // interrupts idle wait for PDUs that have just been queued
|
notify chan struct{} // interrupts idle wait for overflowed PDUs/EDUs from the database
|
||||||
notifyEDUs chan *queuedEDU // interrupts idle wait for EDUs that have just been queued
|
pendingPDUs []*queuedPDU // PDUs waiting to be sent
|
||||||
notifyOverflow chan struct{} // interrupts idle wait for overflowed PDUs/EDUs from the database
|
pendingEDUs []*queuedEDU // EDUs waiting to be sent
|
||||||
pendingPDUs []*queuedPDU // PDUs waiting to be sent, owned by backgroundSender goroutine once started
|
pendingMutex sync.RWMutex // protects pendingPDUs and pendingEDUs
|
||||||
pendingEDUs []*queuedEDU // EDUs waiting to be sent, owned by backgroundSender goroutine once started
|
|
||||||
interruptBackoff chan bool // interrupts backoff
|
interruptBackoff chan bool // interrupts backoff
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,17 +84,23 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
|
||||||
log.WithError(err).Errorf("failed to associate PDU %q with destination %q", event.EventID(), oq.destination)
|
log.WithError(err).Errorf("failed to associate PDU %q with destination %q", event.EventID(), oq.destination)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// If there's room in memory to hold the event then add it to the
|
||||||
|
// list.
|
||||||
|
oq.pendingMutex.Lock()
|
||||||
|
if len(oq.pendingPDUs) < maxPDUsInMemory {
|
||||||
|
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{
|
||||||
|
pdu: event,
|
||||||
|
receipt: receipt,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
oq.pendingMutex.Unlock()
|
||||||
// Check if the destination is blacklisted. If it isn't then wake
|
// Check if the destination is blacklisted. If it isn't then wake
|
||||||
// up the queue.
|
// up the queue.
|
||||||
if !oq.statistics.Blacklisted() {
|
if !oq.statistics.Blacklisted() {
|
||||||
// Wake up the queue if it's asleep.
|
// Wake up the queue if it's asleep.
|
||||||
oq.wakeQueueIfNeeded()
|
oq.wakeQueueIfNeeded()
|
||||||
// Queue the PDU.
|
|
||||||
select {
|
select {
|
||||||
case oq.notifyPDUs <- &queuedPDU{
|
case oq.notify <- struct{}{}:
|
||||||
receipt: receipt,
|
|
||||||
pdu: event,
|
|
||||||
}:
|
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -120,6 +125,16 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
|
||||||
log.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination)
|
log.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// If there's room in memory to hold the event then add it to the
|
||||||
|
// list.
|
||||||
|
oq.pendingMutex.Lock()
|
||||||
|
if len(oq.pendingEDUs) < maxEDUsInMemory {
|
||||||
|
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{
|
||||||
|
edu: event,
|
||||||
|
receipt: receipt,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
oq.pendingMutex.Unlock()
|
||||||
// Check if the destination is blacklisted. If it isn't then wake
|
// Check if the destination is blacklisted. If it isn't then wake
|
||||||
// up the queue.
|
// up the queue.
|
||||||
if !oq.statistics.Blacklisted() {
|
if !oq.statistics.Blacklisted() {
|
||||||
|
@ -127,10 +142,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
|
||||||
oq.wakeQueueIfNeeded()
|
oq.wakeQueueIfNeeded()
|
||||||
// Queue the EDU.
|
// Queue the EDU.
|
||||||
select {
|
select {
|
||||||
case oq.notifyEDUs <- &queuedEDU{
|
case oq.notify <- struct{}{}:
|
||||||
receipt: receipt,
|
|
||||||
edu: event,
|
|
||||||
}:
|
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -154,15 +166,14 @@ func (oq *destinationQueue) wakeQueueIfNeeded() {
|
||||||
|
|
||||||
// getPendingFromDatabase will look at the database and see if
|
// getPendingFromDatabase will look at the database and see if
|
||||||
// there are any persisted events that haven't been sent to this
|
// there are any persisted events that haven't been sent to this
|
||||||
// destination yet. If so, they will be queued up. This function
|
// destination yet. If so, they will be queued up.
|
||||||
// MUST be called from backgroundSend() goroutine ONLY because
|
|
||||||
// it modifies oq.pendingPDUs/oq.pendingEDUs and they aren't
|
|
||||||
// mutexed.
|
|
||||||
func (oq *destinationQueue) getPendingFromDatabase() {
|
func (oq *destinationQueue) getPendingFromDatabase() {
|
||||||
// Check to see if there's anything to do for this server
|
// Check to see if there's anything to do for this server
|
||||||
// in the database.
|
// in the database.
|
||||||
retrieved := false
|
retrieved := false
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
oq.pendingMutex.Lock()
|
||||||
|
defer oq.pendingMutex.Unlock()
|
||||||
if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 {
|
if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 {
|
||||||
// We have room in memory for some PDUs - let's request no more than that.
|
// We have room in memory for some PDUs - let's request no more than that.
|
||||||
if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil {
|
if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil {
|
||||||
|
@ -191,7 +202,7 @@ func (oq *destinationQueue) getPendingFromDatabase() {
|
||||||
oq.overflowed.Store(false)
|
oq.overflowed.Store(false)
|
||||||
}
|
}
|
||||||
if retrieved {
|
if retrieved {
|
||||||
oq.notifyOverflow <- struct{}{}
|
oq.notify <- struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,47 +225,11 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
|
|
||||||
// If we have nothing to do then wait either for incoming events, or
|
// If we have nothing to do then wait either for incoming events, or
|
||||||
// until we hit an idle timeout.
|
// until we hit an idle timeout.
|
||||||
awaitSelect:
|
|
||||||
select {
|
select {
|
||||||
case <-oq.notifyOverflow:
|
case <-oq.notify:
|
||||||
// getPendingFromDatabase has woken us up because of pending
|
// There's work to do, either because getPendingFromDatabase
|
||||||
// work.
|
// told us there is, or because a new event has come in via
|
||||||
case pdu := <-oq.notifyPDUs:
|
// sendEvent/sendEDU.
|
||||||
// We were woken up because there are new PDUs waiting in the
|
|
||||||
// database.
|
|
||||||
if len(oq.pendingPDUs) > maxPDUsInMemory {
|
|
||||||
oq.overflowed.Store(true)
|
|
||||||
break awaitSelect
|
|
||||||
}
|
|
||||||
oq.pendingPDUs = append(oq.pendingPDUs, pdu)
|
|
||||||
pendingPDULoop:
|
|
||||||
for i := 1; i < maxPDUsInMemory-len(oq.pendingPDUs); i++ {
|
|
||||||
select {
|
|
||||||
case pdu := <-oq.notifyPDUs:
|
|
||||||
oq.pendingPDUs = append(oq.pendingPDUs, pdu)
|
|
||||||
default:
|
|
||||||
break pendingPDULoop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case edu := <-oq.notifyEDUs:
|
|
||||||
// We were woken up because there are new PDUs waiting in the
|
|
||||||
// database.
|
|
||||||
if len(oq.pendingEDUs) > maxEDUsInMemory {
|
|
||||||
oq.overflowed.Store(true)
|
|
||||||
break awaitSelect
|
|
||||||
}
|
|
||||||
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
|
||||||
pendingEDULoop:
|
|
||||||
for i := 1; i < maxEDUsInMemory-len(oq.pendingEDUs); i++ {
|
|
||||||
select {
|
|
||||||
case edu := <-oq.notifyEDUs:
|
|
||||||
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
|
||||||
default:
|
|
||||||
break pendingEDULoop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-time.After(queueIdleTimeout):
|
case <-time.After(queueIdleTimeout):
|
||||||
// The worker is idle so stop the goroutine. It'll get
|
// The worker is idle so stop the goroutine. It'll get
|
||||||
// restarted automatically the next time we have an event to
|
// restarted automatically the next time we have an event to
|
||||||
|
@ -272,6 +247,7 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
// has exceeded a maximum allowable value. Clean up the in-memory
|
// has exceeded a maximum allowable value. Clean up the in-memory
|
||||||
// buffers at this point. The PDU clean-up is already on a defer.
|
// buffers at this point. The PDU clean-up is already on a defer.
|
||||||
log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
|
log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
|
||||||
|
oq.pendingMutex.Lock()
|
||||||
for i := range oq.pendingPDUs {
|
for i := range oq.pendingPDUs {
|
||||||
oq.pendingPDUs[i] = nil
|
oq.pendingPDUs[i] = nil
|
||||||
}
|
}
|
||||||
|
@ -280,6 +256,7 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
}
|
}
|
||||||
oq.pendingPDUs = nil
|
oq.pendingPDUs = nil
|
||||||
oq.pendingEDUs = nil
|
oq.pendingEDUs = nil
|
||||||
|
oq.pendingMutex.Lock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if until != nil && until.After(time.Now()) {
|
if until != nil && until.After(time.Now()) {
|
||||||
|
@ -293,6 +270,7 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
oq.pendingMutex.RLock()
|
||||||
pduCount := len(oq.pendingPDUs)
|
pduCount := len(oq.pendingPDUs)
|
||||||
eduCount := len(oq.pendingEDUs)
|
eduCount := len(oq.pendingEDUs)
|
||||||
if pduCount > maxPDUsPerTransaction {
|
if pduCount > maxPDUsPerTransaction {
|
||||||
|
@ -305,13 +283,16 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
// If we have pending PDUs or EDUs then construct a transaction.
|
// If we have pending PDUs or EDUs then construct a transaction.
|
||||||
// Try sending the next transaction and see what happens.
|
// Try sending the next transaction and see what happens.
|
||||||
transaction, pc, ec, terr := oq.nextTransaction(oq.pendingPDUs[:pduCount], oq.pendingEDUs[:eduCount])
|
transaction, pc, ec, terr := oq.nextTransaction(oq.pendingPDUs[:pduCount], oq.pendingEDUs[:eduCount])
|
||||||
|
oq.pendingMutex.RUnlock()
|
||||||
if terr != nil {
|
if terr != nil {
|
||||||
// We failed to send the transaction. Mark it as a failure.
|
// We failed to send the transaction. Mark it as a failure.
|
||||||
oq.statistics.Failure()
|
oq.statistics.Failure()
|
||||||
|
|
||||||
} else if transaction {
|
} else if transaction {
|
||||||
// If we successfully sent the transaction then clear out
|
// If we successfully sent the transaction then clear out
|
||||||
// the pending events and EDUs, and wipe our transaction ID.
|
// the pending events and EDUs, and wipe our transaction ID.
|
||||||
oq.statistics.Success()
|
oq.statistics.Success()
|
||||||
|
oq.pendingMutex.Lock()
|
||||||
for i := range oq.pendingPDUs {
|
for i := range oq.pendingPDUs {
|
||||||
oq.pendingPDUs[i] = nil
|
oq.pendingPDUs[i] = nil
|
||||||
}
|
}
|
||||||
|
@ -320,6 +301,7 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
}
|
}
|
||||||
oq.pendingPDUs = oq.pendingPDUs[pc:]
|
oq.pendingPDUs = oq.pendingPDUs[pc:]
|
||||||
oq.pendingEDUs = oq.pendingEDUs[ec:]
|
oq.pendingEDUs = oq.pendingEDUs[ec:]
|
||||||
|
oq.pendingMutex.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,9 +123,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
|
||||||
destination: destination,
|
destination: destination,
|
||||||
client: oqs.client,
|
client: oqs.client,
|
||||||
statistics: oqs.statistics.ForServer(destination),
|
statistics: oqs.statistics.ForServer(destination),
|
||||||
notifyPDUs: make(chan *queuedPDU, 16),
|
notify: make(chan struct{}, 1),
|
||||||
notifyEDUs: make(chan *queuedEDU, 16),
|
|
||||||
notifyOverflow: make(chan struct{}, 1),
|
|
||||||
interruptBackoff: make(chan bool),
|
interruptBackoff: make(chan bool),
|
||||||
signing: oqs.signing,
|
signing: oqs.signing,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue