Update comments
This commit is contained in:
parent
a91e3d6f35
commit
7246580d54
|
@ -53,15 +53,15 @@ type destinationQueue struct {
|
||||||
destination gomatrixserverlib.ServerName // destination of requests
|
destination gomatrixserverlib.ServerName // destination of requests
|
||||||
running atomic.Bool // is the queue worker running?
|
running atomic.Bool // is the queue worker running?
|
||||||
backingOff atomic.Bool // true if we're backing off
|
backingOff atomic.Bool // true if we're backing off
|
||||||
overflowed atomic.Bool // exceeded in-memory space?
|
overflowed atomic.Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more
|
||||||
statistics *statistics.ServerStatistics // statistics about this remote server
|
statistics *statistics.ServerStatistics // statistics about this remote server
|
||||||
transactionIDMutex sync.Mutex // protects transactionID
|
transactionIDMutex sync.Mutex // protects transactionID
|
||||||
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
||||||
notifyPDUs chan *queuedPDU // interrupts idle wait for PDUs
|
notifyPDUs chan *queuedPDU // interrupts idle wait for PDUs that have just been queued
|
||||||
notifyEDUs chan *queuedEDU // interrupts idle wait for EDUs
|
notifyEDUs chan *queuedEDU // interrupts idle wait for EDUs that have just been queued
|
||||||
notifyOverflow chan struct{} // interrupts idle wait for overflowed PDUs/EDUs
|
notifyOverflow chan struct{} // interrupts idle wait for overflowed PDUs/EDUs from the database
|
||||||
pendingPDUs []*queuedPDU // owned by backgroundSender goroutine once started
|
pendingPDUs []*queuedPDU // PDUs waiting to be sent, owned by backgroundSender goroutine once started
|
||||||
pendingEDUs []*queuedEDU // owned by backgroundSender goroutine once started
|
pendingEDUs []*queuedEDU // EDUs waiting to be sent, owned by backgroundSender goroutine once started
|
||||||
interruptBackoff chan bool // interrupts backoff
|
interruptBackoff chan bool // interrupts backoff
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
|
||||||
// later.
|
// later.
|
||||||
if err := oq.db.AssociatePDUWithDestination(
|
if err := oq.db.AssociatePDUWithDestination(
|
||||||
context.TODO(),
|
context.TODO(),
|
||||||
"", // the current transaction ID, TODO: do something about this
|
"", // TODO: remove this, as we don't need to persist the transaction ID
|
||||||
oq.destination, // the destination server name
|
oq.destination, // the destination server name
|
||||||
receipt, // NIDs from federationsender_queue_json table
|
receipt, // NIDs from federationsender_queue_json table
|
||||||
); err != nil {
|
); err != nil {
|
||||||
|
@ -90,8 +90,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
|
||||||
if !oq.statistics.Blacklisted() {
|
if !oq.statistics.Blacklisted() {
|
||||||
// Wake up the queue if it's asleep.
|
// Wake up the queue if it's asleep.
|
||||||
oq.wakeQueueIfNeeded()
|
oq.wakeQueueIfNeeded()
|
||||||
// If we're blocking on waiting PDUs then tell the queue that we
|
// Queue the PDU.
|
||||||
// have work to do.
|
|
||||||
select {
|
select {
|
||||||
case oq.notifyPDUs <- &queuedPDU{
|
case oq.notifyPDUs <- &queuedPDU{
|
||||||
receipt: receipt,
|
receipt: receipt,
|
||||||
|
@ -126,8 +125,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
|
||||||
if !oq.statistics.Blacklisted() {
|
if !oq.statistics.Blacklisted() {
|
||||||
// Wake up the queue if it's asleep.
|
// Wake up the queue if it's asleep.
|
||||||
oq.wakeQueueIfNeeded()
|
oq.wakeQueueIfNeeded()
|
||||||
// If we're blocking on waiting EDUs then tell the queue that we
|
// Queue the EDU.
|
||||||
// have work to do.
|
|
||||||
select {
|
select {
|
||||||
case oq.notifyEDUs <- &queuedEDU{
|
case oq.notifyEDUs <- &queuedEDU{
|
||||||
receipt: receipt,
|
receipt: receipt,
|
||||||
|
@ -156,7 +154,10 @@ func (oq *destinationQueue) wakeQueueIfNeeded() {
|
||||||
|
|
||||||
// getPendingFromDatabase will look at the database and see if
|
// getPendingFromDatabase will look at the database and see if
|
||||||
// there are any persisted events that haven't been sent to this
|
// there are any persisted events that haven't been sent to this
|
||||||
// destination yet. If so, they will be queued up.
|
// destination yet. If so, they will be queued up. This function
|
||||||
|
// MUST be called from backgroundSend() goroutine ONLY because
|
||||||
|
// it modifies oq.pendingPDUs/oq.pendingEDUs and they aren't
|
||||||
|
// mutexed.
|
||||||
func (oq *destinationQueue) getPendingFromDatabase() {
|
func (oq *destinationQueue) getPendingFromDatabase() {
|
||||||
// Check to see if there's anything to do for this server
|
// Check to see if there's anything to do for this server
|
||||||
// in the database.
|
// in the database.
|
||||||
|
|
Loading…
Reference in a new issue