Use the pos returned by Wait() rather than re-querying

This commit is contained in:
Kegan Dougal 2017-05-15 11:36:17 +01:00
parent baa47b5a71
commit 37ded4fe2f

View file

@ -107,23 +107,22 @@ func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition {
// TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked, // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
// but given we don't do /events, let's pretend it doesn't exist. // but given we don't do /events, let's pretend it doesn't exist.
for {
// In a guard, check if the /sync request should block, and block it until we get woken up
n.currPosMutex.RLock()
currentPos := n.currPos
n.currPosMutex.RUnlock()
// TODO: We increment the stream position for any event, so it's possible that we return immediately // In a guard, check if the /sync request should block, and block it until we get woken up
// with a pos which contains no new events for this user. We should probably re-wait for events n.currPosMutex.RLock()
// automatically in this case. currentPos := n.currPos
if req.since != currentPos { n.currPosMutex.RUnlock()
return currentPos
}
// wait to be woken up, and then re-check the stream position // TODO: We increment the stream position for any event, so it's possible that we return immediately
req.log.WithField("user_id", req.userID).Info("Waiting for event") // with a pos which contains no new events for this user. We should probably re-wait for events
n.blockUser(req.userID) // automatically in this case.
if req.since != currentPos {
return currentPos
} }
// wait to be woken up, and then re-check the stream position
req.log.WithField("user_id", req.userID).Info("Waiting for event")
return n.blockUser(req.userID)
} }
// Load the membership states required to notify users correctly. // Load the membership states required to notify users correctly.
@ -161,17 +160,15 @@ func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) {
stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream
} }
func (n *Notifier) blockUser(userID string) { func (n *Notifier) blockUser(userID string) types.StreamPosition {
stream := n.fetchUserStream(userID, true) stream := n.fetchUserStream(userID, true)
stream.Wait() return stream.Wait()
} }
// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true, // fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true,
// a stream will be made for this user if one doesn't exist and it will be returned. This // a stream will be made for this user if one doesn't exist and it will be returned. This
// function does not wait for data to be available on the stream. // function does not wait for data to be available on the stream.
func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream { func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream {
// There is a bit of a locking dance here, we want to lock the mutex protecting the map
// but NOT the Cond that we may be returning/creating.
n.userStreamsMutex.Lock() n.userStreamsMutex.Lock()
defer n.userStreamsMutex.Unlock() defer n.userStreamsMutex.Unlock()
stream, ok := n.userStreams[userID] stream, ok := n.userStreams[userID]