Name the mutexes

This commit is contained in:
Mark Haines 2017-06-28 14:27:56 +01:00
parent 5f0bdd68be
commit 0ba35df6b8
3 changed files with 14 additions and 12 deletions

View file

@ -28,7 +28,7 @@ import (
// ensures that only one request is in flight to a given destination // ensures that only one request is in flight to a given destination
// at a time. // at a time.
type destinationQueue struct { type destinationQueue struct {
mutex sync.Mutex runningMutex sync.Mutex
client *gomatrixserverlib.FederationClient client *gomatrixserverlib.FederationClient
origin gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName
destination gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName
@ -42,10 +42,11 @@ type destinationQueue struct {
// If the queue is empty then it starts a background goroutine to // If the queue is empty then it starts a background goroutine to
// start sending events to that destination. // start sending events to that destination.
func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) { func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) {
oq.mutex.Lock() oq.runningMutex.Lock()
defer oq.mutex.Unlock() defer oq.runningMutex.Unlock()
oq.pendingEvents = append(oq.pendingEvents, ev) oq.pendingEvents = append(oq.pendingEvents, ev)
if !oq.running { if !oq.running {
oq.running = true
go oq.backgroundSend() go oq.backgroundSend()
} }
} }
@ -76,8 +77,8 @@ func (oq *destinationQueue) backgroundSend() {
// and flushes the queue. // and flushes the queue.
// Returns nil if the queue was empty. // Returns nil if the queue was empty.
func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
oq.mutex.Lock() oq.runningMutex.Lock()
defer oq.mutex.Unlock() defer oq.runningMutex.Unlock()
if len(oq.pendingEvents) == 0 { if len(oq.pendingEvents) == 0 {
oq.running = false oq.running = false
return nil return nil

View file

@ -25,10 +25,10 @@ import (
// OutgoingQueues is a collection of queues for sending transactions to other // OutgoingQueues is a collection of queues for sending transactions to other
// matrix servers // matrix servers
type OutgoingQueues struct { type OutgoingQueues struct {
mutex sync.Mutex
queues map[gomatrixserverlib.ServerName]*destinationQueue
origin gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient client *gomatrixserverlib.FederationClient
queuesMutex sync.Mutex
queues map[gomatrixserverlib.ServerName]*destinationQueue
} }
// NewOutgoingQueues makes a new OutgoingQueues // NewOutgoingQueues makes a new OutgoingQueues
@ -63,8 +63,8 @@ func (oqs *OutgoingQueues) SendEvent(
"destinations": destinations, "event": ev.EventID(), "destinations": destinations, "event": ev.EventID(),
}).Info("Sending event") }).Info("Sending event")
oqs.mutex.Lock() oqs.queuesMutex.Lock()
defer oqs.mutex.Unlock() defer oqs.queuesMutex.Unlock()
for _, destination := range destinations { for _, destination := range destinations {
oq := oqs.queues[destination] oq := oqs.queues[destination]
if oq == nil { if oq == nil {

View file

@ -70,7 +70,8 @@ func (d *Database) SetPartitionOffset(topic string, partition int32, offset int6
return d.UpsertPartitionOffset(topic, partition, offset) return d.UpsertPartitionOffset(topic, partition, offset)
} }
// UpdateRoom updates the joined hosts for a room. // UpdateRoom updates the joined hosts for a room and returns what the joined
// hosts were before the update.
func (d *Database) UpdateRoom( func (d *Database) UpdateRoom(
roomID, oldEventID, newEventID string, roomID, oldEventID, newEventID string,
addHosts []types.JoinedHost, addHosts []types.JoinedHost,