From f641e7af14638105b5ec4c0177b1a803e8d61def Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 17 May 2017 13:44:17 +0100 Subject: [PATCH] Use a single lock for protecting currPos and userStreams --- .../dendrite/syncapi/sync/notifier.go | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index b3753b7d7..2e54c2bde 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -31,16 +31,15 @@ import ( // the event, but the token has already advanced by the time they fetch it, resulting // in missed events. type Notifier struct { + // A map of RoomID => Set : Must only be accessed by the OnNewEvent goroutine + roomIDToJoinedUsers map[string]set + // Protects currPos and userStreams. + streamLock *sync.Mutex // The latest sync stream position: guarded by 'currPosMutex' which is RW to allow // for concurrent reads on /sync requests - currPos types.StreamPosition - currPosMutex *sync.RWMutex - // A map of RoomID => Set - roomIDToJoinedUsers map[string]set + currPos types.StreamPosition // A map of user_id => UserStream which can be used to wake a given user's /sync request. - // Map access is guarded by userStreamsMutex. - userStreams map[string]*UserStream - userStreamsMutex *sync.Mutex + userStreams map[string]*UserStream } // NewNotifier creates a new notifier set to the given stream position. @@ -49,10 +48,9 @@ type Notifier struct { func NewNotifier(pos types.StreamPosition) *Notifier { return &Notifier{ currPos: pos, - currPosMutex: &sync.RWMutex{}, roomIDToJoinedUsers: make(map[string]set), userStreams: make(map[string]*UserStream), - userStreamsMutex: &sync.Mutex{}, + streamLock: &sync.Mutex{}, } } @@ -60,11 +58,11 @@ func NewNotifier(pos types.StreamPosition) *Notifier { // called from a single goroutine, to avoid races between updates which could set the // current position in the stream incorrectly. func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { - // update the current position in a guard and then notify relevant /sync streams. + // update the current position then notify relevant /sync streams. // This needs to be done PRIOR to waking up users as they will read this value. - n.currPosMutex.Lock() + n.streamLock.Lock() + defer n.streamLock.Unlock() n.currPos = pos - n.currPosMutex.Unlock() // Map this event's room_id to a list of joined users, and wake them up. userIDs := n.joinedUsers(ev.RoomID()) @@ -107,20 +105,24 @@ func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { // but given we don't do /events, let's pretend it doesn't exist. // In a guard, check if the /sync request should block, and block it until we get woken up - n.currPosMutex.RLock() + n.streamLock.Lock() currentPos := n.currPos - n.currPosMutex.RUnlock() // TODO: We increment the stream position for any event, so it's possible that we return immediately // with a pos which contains no new events for this user. We should probably re-wait for events // automatically in this case. if req.since != currentPos { + n.streamLock.Unlock() return currentPos } // wait to be woken up, and then re-check the stream position req.log.WithField("user_id", req.userID).Info("Waiting for event") - return n.blockUser(req.userID) + + // give up the stream lock prior to waiting on the user lock + stream := n.fetchUserStream(req.userID, true) + n.streamLock.Unlock() + return stream.Wait() } // Load the membership states required to notify users correctly. @@ -156,17 +158,10 @@ func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) { stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream } -func (n *Notifier) blockUser(userID string) types.StreamPosition { - stream := n.fetchUserStream(userID, true) - return stream.Wait() -} - // fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true, // a stream will be made for this user if one doesn't exist and it will be returned. This // function does not wait for data to be available on the stream. func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream { - n.userStreamsMutex.Lock() - defer n.userStreamsMutex.Unlock() stream, ok := n.userStreams[userID] if !ok { // TODO: Unbounded growth of streams (1 per user)