mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-12 17:33:09 -06:00
Update queue to support EDU events
This commit is contained in:
parent
fe17eb5ed7
commit
0bd298ad85
|
|
@ -33,12 +33,13 @@ type destinationQueue struct {
|
|||
origin gomatrixserverlib.ServerName
|
||||
destination gomatrixserverlib.ServerName
|
||||
// The running mutex protects running, sentCounter, lastTransactionIDs and
|
||||
// pendingEvents.
|
||||
// pendingEvents, pendingEDUs.
|
||||
runningMutex sync.Mutex
|
||||
running bool
|
||||
sentCounter int
|
||||
lastTransactionIDs []gomatrixserverlib.TransactionID
|
||||
pendingEvents []*gomatrixserverlib.Event
|
||||
pendingEDUs []*gomatrixserverlib.EDU
|
||||
}
|
||||
|
||||
// Send event adds the event to the pending queue for the destination.
|
||||
|
|
@ -54,6 +55,19 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) {
|
|||
}
|
||||
}
|
||||
|
||||
// sendEDU adds the EDU event to the pending queue for the destination.
|
||||
// If the queue is empty then it starts a background goroutine to
|
||||
// start sending event to that destination.
|
||||
func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) {
|
||||
oq.runningMutex.Lock()
|
||||
defer oq.runningMutex.Unlock()
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, ev)
|
||||
if !oq.running {
|
||||
oq.running = true
|
||||
go oq.backgroundSend()
|
||||
}
|
||||
}
|
||||
|
||||
func (oq *destinationQueue) backgroundSend() {
|
||||
for {
|
||||
t := oq.next()
|
||||
|
|
@ -82,10 +96,12 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
|
||||
oq.runningMutex.Lock()
|
||||
defer oq.runningMutex.Unlock()
|
||||
if len(oq.pendingEvents) == 0 {
|
||||
|
||||
if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 {
|
||||
oq.running = false
|
||||
return nil
|
||||
}
|
||||
|
||||
var t gomatrixserverlib.Transaction
|
||||
now := gomatrixserverlib.AsTimestamp(time.Now())
|
||||
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter))
|
||||
|
|
@ -96,11 +112,19 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
|
|||
if t.PreviousIDs == nil {
|
||||
t.PreviousIDs = []gomatrixserverlib.TransactionID{}
|
||||
}
|
||||
|
||||
oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}
|
||||
|
||||
for _, pdu := range oq.pendingEvents {
|
||||
t.PDUs = append(t.PDUs, *pdu)
|
||||
}
|
||||
oq.pendingEvents = nil
|
||||
oq.sentCounter += len(t.PDUs)
|
||||
|
||||
for _, edu := range oq.pendingEDUs {
|
||||
t.EDUs = append(t.EDUs, *edu)
|
||||
}
|
||||
oq.pendingEDUs = nil
|
||||
|
||||
return &t
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,10 +47,7 @@ func (oqs *OutgoingQueues) SendEvent(
|
|||
destinations []gomatrixserverlib.ServerName,
|
||||
) error {
|
||||
if origin != oqs.origin {
|
||||
// TODO: Support virtual hosting by allowing us to send events using
|
||||
// different origin server names.
|
||||
// For now assume we are always asked to send as the single server configured
|
||||
// in the dendrite config.
|
||||
// TODO: Support virtual hosting; gh issue #577.
|
||||
return fmt.Errorf(
|
||||
"sendevent: unexpected server to send as: got %q expected %q",
|
||||
origin, oqs.origin,
|
||||
|
|
@ -76,8 +73,49 @@ func (oqs *OutgoingQueues) SendEvent(
|
|||
}
|
||||
oqs.queues[destination] = oq
|
||||
}
|
||||
|
||||
oq.sendEvent(ev)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendEDU sends an EDU event to the destinations
|
||||
func (oqs *OutgoingQueues) SendEDU(
|
||||
ev *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,
|
||||
destinations []gomatrixserverlib.ServerName,
|
||||
) error {
|
||||
if origin != oqs.origin {
|
||||
// TODO: Support virtual hosting; gh issue #577.
|
||||
return fmt.Errorf(
|
||||
"sendevent: unexpected server to send as: got %q expected %q",
|
||||
origin, oqs.origin,
|
||||
)
|
||||
}
|
||||
|
||||
// Remove our own server from the list of destinations.
|
||||
destinations = filterDestinations(oqs.origin, destinations)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"destinations": destinations, "edu_type": ev.Type,
|
||||
}).Info("Sending EDU event")
|
||||
|
||||
oqs.queuesMutex.Lock()
|
||||
defer oqs.queuesMutex.Unlock()
|
||||
for _, destination := range destinations {
|
||||
oq := oqs.queues[destination]
|
||||
if oq == nil {
|
||||
oq = &destinationQueue{
|
||||
origin: oqs.origin,
|
||||
destination: destination,
|
||||
client: oqs.client,
|
||||
}
|
||||
oqs.queues[destination] = oq
|
||||
}
|
||||
|
||||
oq.sendEDU(ev)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue