From 0bd298ad85387a9b4dabfb9f195734c2622d2b7e Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Tue, 7 Aug 2018 22:00:12 +0530 Subject: [PATCH] Update queue to support EDU events --- .../queue/destinationqueue.go | 28 ++++++++++- .../dendrite/federationsender/queue/queue.go | 46 +++++++++++++++++-- 2 files changed, 68 insertions(+), 6 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go index 2013a7a4b..026e3e51c 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go @@ -33,12 +33,13 @@ type destinationQueue struct { origin gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName // The running mutex protects running, sentCounter, lastTransactionIDs and - // pendingEvents. + // pendingEvents, pendingEDUs. runningMutex sync.Mutex running bool sentCounter int lastTransactionIDs []gomatrixserverlib.TransactionID pendingEvents []*gomatrixserverlib.Event + pendingEDUs []*gomatrixserverlib.EDU } // Send event adds the event to the pending queue for the destination. @@ -54,6 +55,19 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) { } } +// sendEDU adds the EDU event to the pending queue for the destination. +// If the queue is empty then it starts a background goroutine to +// start sending event to that destination. +func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) { + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() + oq.pendingEDUs = append(oq.pendingEDUs, ev) + if !oq.running { + oq.running = true + go oq.backgroundSend() + } +} + func (oq *destinationQueue) backgroundSend() { for { t := oq.next() @@ -82,10 +96,12 @@ func (oq *destinationQueue) backgroundSend() { func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() - if len(oq.pendingEvents) == 0 { + + if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 { oq.running = false return nil } + var t gomatrixserverlib.Transaction now := gomatrixserverlib.AsTimestamp(time.Now()) t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter)) @@ -96,11 +112,19 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { if t.PreviousIDs == nil { t.PreviousIDs = []gomatrixserverlib.TransactionID{} } + oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} + for _, pdu := range oq.pendingEvents { t.PDUs = append(t.PDUs, *pdu) } oq.pendingEvents = nil oq.sentCounter += len(t.PDUs) + + for _, edu := range oq.pendingEDUs { + t.EDUs = append(t.EDUs, *edu) + } + oq.pendingEDUs = nil + return &t } diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go index d31c12f99..8d910f627 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go @@ -47,10 +47,7 @@ func (oqs *OutgoingQueues) SendEvent( destinations []gomatrixserverlib.ServerName, ) error { if origin != oqs.origin { - // TODO: Support virtual hosting by allowing us to send events using - // different origin server names. - // For now assume we are always asked to send as the single server configured - // in the dendrite config. + // TODO: Support virtual hosting; gh issue #577. return fmt.Errorf( "sendevent: unexpected server to send as: got %q expected %q", origin, oqs.origin, @@ -76,8 +73,49 @@ func (oqs *OutgoingQueues) SendEvent( } oqs.queues[destination] = oq } + oq.sendEvent(ev) } + + return nil +} + +// SendEDU sends an EDU event to the destinations +func (oqs *OutgoingQueues) SendEDU( + ev *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName, + destinations []gomatrixserverlib.ServerName, +) error { + if origin != oqs.origin { + // TODO: Support virtual hosting; gh issue #577. + return fmt.Errorf( + "sendevent: unexpected server to send as: got %q expected %q", + origin, oqs.origin, + ) + } + + // Remove our own server from the list of destinations. + destinations = filterDestinations(oqs.origin, destinations) + + log.WithFields(log.Fields{ + "destinations": destinations, "edu_type": ev.Type, + }).Info("Sending EDU event") + + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + for _, destination := range destinations { + oq := oqs.queues[destination] + if oq == nil { + oq = &destinationQueue{ + origin: oqs.origin, + destination: destination, + client: oqs.client, + } + oqs.queues[destination] = oq + } + + oq.sendEDU(ev) + } + return nil }