mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 06:53:09 -06:00
Fix race between Wait() and Broadcast()
This commit is contained in:
parent
f641e7af14
commit
3f334fac75
|
|
@ -122,7 +122,7 @@ func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition {
|
|||
// give up the stream lock prior to waiting on the user lock
|
||||
stream := n.fetchUserStream(req.userID, true)
|
||||
n.streamLock.Unlock()
|
||||
return stream.Wait()
|
||||
return stream.Wait(currentPos)
|
||||
}
|
||||
|
||||
// Load the membership states required to notify users correctly.
|
||||
|
|
|
|||
|
|
@ -43,8 +43,18 @@ func NewUserStream(userID string) *UserStream {
|
|||
}
|
||||
|
||||
// Wait blocks until there is a new stream position for this user, which is then returned.
|
||||
func (s *UserStream) Wait() (pos types.StreamPosition) {
|
||||
// waitAtPos should be the position the stream thinks it should be waiting at.
|
||||
func (s *UserStream) Wait(waitAtPos types.StreamPosition) (pos types.StreamPosition) {
|
||||
s.cond.L.Lock()
|
||||
// Before we start blocking, we need to make sure that we didn't race with a call
|
||||
// to Broadcast() between calling Wait() and actually sleeping. We check the last
|
||||
// broadcast pos to see if it is newer than the pos we are meant to wait at. If it
|
||||
// is newer, something has Broadcast to this stream more recently so return immediately.
|
||||
if s.pos > waitAtPos {
|
||||
pos = s.pos
|
||||
s.cond.L.Unlock()
|
||||
return
|
||||
}
|
||||
s.numWaiting++
|
||||
s.cond.Wait()
|
||||
pos = s.pos
|
||||
|
|
|
|||
Loading…
Reference in a new issue