From 3f334fac75bf5668775400115ec9ea18f81eb5cd Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 17 May 2017 14:55:11 +0100 Subject: [PATCH] Fix race between Wait() and Broadcast() --- .../matrix-org/dendrite/syncapi/sync/notifier.go | 2 +- .../matrix-org/dendrite/syncapi/sync/userstream.go | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 2e54c2bde..1ed0cf55d 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -122,7 +122,7 @@ func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { // give up the stream lock prior to waiting on the user lock stream := n.fetchUserStream(req.userID, true) n.streamLock.Unlock() - return stream.Wait() + return stream.Wait(currentPos) } // Load the membership states required to notify users correctly. diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go index ab91161f3..349b3e272 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go @@ -43,8 +43,18 @@ func NewUserStream(userID string) *UserStream { } // Wait blocks until there is a new stream position for this user, which is then returned. -func (s *UserStream) Wait() (pos types.StreamPosition) { +// waitAtPos should be the position the stream thinks it should be waiting at. +func (s *UserStream) Wait(waitAtPos types.StreamPosition) (pos types.StreamPosition) { s.cond.L.Lock() + // Before we start blocking, we need to make sure that we didn't race with a call + // to Broadcast() between calling Wait() and actually sleeping. We check the last + // broadcast pos to see if it is newer than the pos we are meant to wait at. If it + // is newer, something has Broadcast to this stream more recently so return immediately. + if s.pos > waitAtPos { + pos = s.pos + s.cond.L.Unlock() + return + } s.numWaiting++ s.cond.Wait() pos = s.pos