diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index a739e9126..db9fbf164 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -107,23 +107,22 @@ func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked, // but given we don't do /events, let's pretend it doesn't exist. - for { - // In a guard, check if the /sync request should block, and block it until we get woken up - n.currPosMutex.RLock() - currentPos := n.currPos - n.currPosMutex.RUnlock() - // TODO: We increment the stream position for any event, so it's possible that we return immediately - // with a pos which contains no new events for this user. We should probably re-wait for events - // automatically in this case. - if req.since != currentPos { - return currentPos - } + // In a guard, check if the /sync request should block, and block it until we get woken up + n.currPosMutex.RLock() + currentPos := n.currPos + n.currPosMutex.RUnlock() - // wait to be woken up, and then re-check the stream position - req.log.WithField("user_id", req.userID).Info("Waiting for event") - n.blockUser(req.userID) + // TODO: We increment the stream position for any event, so it's possible that we return immediately + // with a pos which contains no new events for this user. We should probably re-wait for events + // automatically in this case. + if req.since != currentPos { + return currentPos } + + // wait to be woken up, and then re-check the stream position + req.log.WithField("user_id", req.userID).Info("Waiting for event") + return n.blockUser(req.userID) } // Load the membership states required to notify users correctly. @@ -161,17 +160,15 @@ func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) { stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream } -func (n *Notifier) blockUser(userID string) { +func (n *Notifier) blockUser(userID string) types.StreamPosition { stream := n.fetchUserStream(userID, true) - stream.Wait() + return stream.Wait() } // fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true, // a stream will be made for this user if one doesn't exist and it will be returned. This // function does not wait for data to be available on the stream. func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream { - // There is a bit of a locking dance here, we want to lock the mutex protecting the map - // but NOT the Cond that we may be returning/creating. n.userStreamsMutex.Lock() defer n.userStreamsMutex.Unlock() stream, ok := n.userStreams[userID]