Associate events in db before queueing them to send (#2833)
Fixes a race condition between sending federation events and having them fully associated in the database.
This commit is contained in:
parent
a74aea0714
commit
97491a174b
|
@ -76,21 +76,25 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there's room in memory to hold the event then add it to the
|
// Check if the destination is blacklisted. If it isn't then wake
|
||||||
// list.
|
// up the queue.
|
||||||
oq.pendingMutex.Lock()
|
if !oq.statistics.Blacklisted() {
|
||||||
if len(oq.pendingPDUs) < maxPDUsInMemory {
|
// If there's room in memory to hold the event then add it to the
|
||||||
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{
|
// list.
|
||||||
pdu: event,
|
oq.pendingMutex.Lock()
|
||||||
receipt: receipt,
|
if len(oq.pendingPDUs) < maxPDUsInMemory {
|
||||||
})
|
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{
|
||||||
} else {
|
pdu: event,
|
||||||
oq.overflowed.Store(true)
|
receipt: receipt,
|
||||||
}
|
})
|
||||||
oq.pendingMutex.Unlock()
|
} else {
|
||||||
|
oq.overflowed.Store(true)
|
||||||
|
}
|
||||||
|
oq.pendingMutex.Unlock()
|
||||||
|
|
||||||
if !oq.backingOff.Load() {
|
if !oq.backingOff.Load() {
|
||||||
oq.wakeQueueAndNotify()
|
oq.wakeQueueAndNotify()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,21 +107,25 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there's room in memory to hold the event then add it to the
|
// Check if the destination is blacklisted. If it isn't then wake
|
||||||
// list.
|
// up the queue.
|
||||||
oq.pendingMutex.Lock()
|
if !oq.statistics.Blacklisted() {
|
||||||
if len(oq.pendingEDUs) < maxEDUsInMemory {
|
// If there's room in memory to hold the event then add it to the
|
||||||
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{
|
// list.
|
||||||
edu: event,
|
oq.pendingMutex.Lock()
|
||||||
receipt: receipt,
|
if len(oq.pendingEDUs) < maxEDUsInMemory {
|
||||||
})
|
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{
|
||||||
} else {
|
edu: event,
|
||||||
oq.overflowed.Store(true)
|
receipt: receipt,
|
||||||
}
|
})
|
||||||
oq.pendingMutex.Unlock()
|
} else {
|
||||||
|
oq.overflowed.Store(true)
|
||||||
|
}
|
||||||
|
oq.pendingMutex.Unlock()
|
||||||
|
|
||||||
if !oq.backingOff.Load() {
|
if !oq.backingOff.Load() {
|
||||||
oq.wakeQueueAndNotify()
|
oq.wakeQueueAndNotify()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -247,9 +247,10 @@ func (oqs *OutgoingQueues) SendEvent(
|
||||||
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
destQueues := make([]*destinationQueue, 0, len(destmap))
|
||||||
for destination := range destmap {
|
for destination := range destmap {
|
||||||
if queue := oqs.getQueue(destination); queue != nil && !queue.statistics.Blacklisted() {
|
if queue := oqs.getQueue(destination); queue != nil {
|
||||||
queue.sendEvent(ev, nid)
|
destQueues = append(destQueues, queue)
|
||||||
} else {
|
} else {
|
||||||
delete(destmap, destination)
|
delete(destmap, destination)
|
||||||
}
|
}
|
||||||
|
@ -267,6 +268,14 @@ func (oqs *OutgoingQueues) SendEvent(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE : PDUs should be associated with destinations before sending
|
||||||
|
// them, otherwise this is technically a race.
|
||||||
|
// If the send completes before they are associated then they won't
|
||||||
|
// get properly cleaned up in the database.
|
||||||
|
for _, queue := range destQueues {
|
||||||
|
queue.sendEvent(ev, nid)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -335,20 +344,21 @@ func (oqs *OutgoingQueues) SendEDU(
|
||||||
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
destQueues := make([]*destinationQueue, 0, len(destmap))
|
||||||
for destination := range destmap {
|
for destination := range destmap {
|
||||||
if queue := oqs.getQueue(destination); queue != nil && !queue.statistics.Blacklisted() {
|
if queue := oqs.getQueue(destination); queue != nil {
|
||||||
queue.sendEDU(e, nid)
|
destQueues = append(destQueues, queue)
|
||||||
} else {
|
} else {
|
||||||
delete(destmap, destination)
|
delete(destmap, destination)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a database entry that associates the given PDU NID with
|
// Create a database entry that associates the given PDU NID with
|
||||||
// this destination queue. We'll then be able to retrieve the PDU
|
// these destination queues. We'll then be able to retrieve the PDU
|
||||||
// later.
|
// later.
|
||||||
if err := oqs.db.AssociateEDUWithDestinations(
|
if err := oqs.db.AssociateEDUWithDestinations(
|
||||||
oqs.process.Context(),
|
oqs.process.Context(),
|
||||||
destmap, // the destination server name
|
destmap, // the destination server names
|
||||||
nid, // NIDs from federationapi_queue_json table
|
nid, // NIDs from federationapi_queue_json table
|
||||||
e.Type,
|
e.Type,
|
||||||
nil, // this will use the default expireEDUTypes map
|
nil, // this will use the default expireEDUTypes map
|
||||||
|
@ -357,6 +367,14 @@ func (oqs *OutgoingQueues) SendEDU(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE : EDUs should be associated with destinations before sending
|
||||||
|
// them, otherwise this is technically a race.
|
||||||
|
// If the send completes before they are associated then they won't
|
||||||
|
// get properly cleaned up in the database.
|
||||||
|
for _, queue := range destQueues {
|
||||||
|
queue.sendEDU(e, nid)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue