diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go index c1bce8efc..ac243b502 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go @@ -28,7 +28,7 @@ import ( // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { - mutex sync.Mutex + runningMutex sync.Mutex client *gomatrixserverlib.FederationClient origin gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName @@ -42,10 +42,11 @@ type destinationQueue struct { // If the queue is empty then it starts a background goroutine to // start sending events to that destination. func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) { - oq.mutex.Lock() - defer oq.mutex.Unlock() + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() oq.pendingEvents = append(oq.pendingEvents, ev) if !oq.running { + oq.running = true go oq.backgroundSend() } } @@ -76,8 +77,8 @@ func (oq *destinationQueue) backgroundSend() { // and flushes the queue. // Returns nil if the queue was empty. func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { - oq.mutex.Lock() - defer oq.mutex.Unlock() + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() if len(oq.pendingEvents) == 0 { oq.running = false return nil diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go index 52692d92d..77b245630 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go @@ -25,10 +25,10 @@ import ( // OutgoingQueues is a collection of queues for sending transactions to other // matrix servers type OutgoingQueues struct { - mutex sync.Mutex - queues map[gomatrixserverlib.ServerName]*destinationQueue - origin gomatrixserverlib.ServerName - client *gomatrixserverlib.FederationClient + origin gomatrixserverlib.ServerName + client *gomatrixserverlib.FederationClient + queuesMutex sync.Mutex + queues map[gomatrixserverlib.ServerName]*destinationQueue } // NewOutgoingQueues makes a new OutgoingQueues @@ -63,8 +63,8 @@ func (oqs *OutgoingQueues) SendEvent( "destinations": destinations, "event": ev.EventID(), }).Info("Sending event") - oqs.mutex.Lock() - defer oqs.mutex.Unlock() + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() for _, destination := range destinations { oq := oqs.queues[destination] if oq == nil { diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go index ca820b195..2f98093e4 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go @@ -70,7 +70,8 @@ func (d *Database) SetPartitionOffset(topic string, partition int32, offset int6 return d.UpsertPartitionOffset(topic, partition, offset) } -// UpdateRoom updates the joined hosts for a room. +// UpdateRoom updates the joined hosts for a room and returns what the joined +// hosts were before the update. func (d *Database) UpdateRoom( roomID, oldEventID, newEventID string, addHosts []types.JoinedHost,