diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 457664e5e..4712a2c74 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -108,11 +108,8 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty // GetListener returns a UserStreamListener that can be used to wait for // updates for a user. Must be closed. -// sincePos specifies from which point we want to be notified about, i.e. don't // notify for anything before sincePos -func (n *Notifier) GetListener( - req syncRequest, sincePos types.StreamPosition, -) UserStreamListener { +func (n *Notifier) GetListener(req syncRequest) UserStreamListener { // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID // - Incoming events wake requests for a matching room ID @@ -126,7 +123,7 @@ func (n *Notifier) GetListener( n.removeEmptyUserStreams() - return n.fetchUserStream(req.userID, true).GetListener(sincePos) + return n.fetchUserStream(req.userID, true).GetListener(req.ctx) } // Load the membership states required to notify users correctly. diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index e087c51c8..6ee259681 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -256,7 +256,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { // same as Notifier.WaitForEvents but with a timeout. func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { - listener := n.GetListener(req, req.since) + listener := n.GetListener(req) defer listener.Close() select { @@ -264,7 +264,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { return types.StreamPosition(0), fmt.Errorf( "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, ) - case <-listener.GetNotifyChannel(): + case <-listener.GetNotifyChannel(req.since): p := listener.GetStreamPosition() return p, nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index aa9beef5f..7e8afb4c9 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -82,13 +82,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above defer timer.Stop() - userStream := rp.notifier.GetListener(*syncReq, currPos) + userStream := rp.notifier.GetListener(*syncReq) defer userStream.Close() for { select { // Wait for notifier to wake us up - case <-userStream.GetNotifyChannel(): + case <-userStream.GetNotifyChannel(currPos): currPos = userStream.GetStreamPosition() // Or for timeout to expire case <-timer.C: diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go index a3f2b4739..ef84816a0 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go @@ -15,10 +15,13 @@ package sync import ( + "context" + "runtime" "sync" "time" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/util" ) // UserStream represents a communication mechanism between the /sync request goroutine @@ -42,7 +45,9 @@ type UserStream struct { // UserStreamListener allows a sync request to wait for updates for a user. type UserStreamListener struct { *UserStream - sincePos types.StreamPosition + + // Whether the stream has been closed + hasClosed bool } // NewUserStream creates a new user stream @@ -57,18 +62,26 @@ func NewUserStream(userID string, currPos types.StreamPosition) *UserStream { // GetListener returns UserStreamListener that a sync request can use to wait // for new updates with. -// sincePos specifies from which point we want to be notified about // UserStreamListener must be closed -func (s *UserStream) GetListener(sincePos types.StreamPosition) UserStreamListener { +func (s *UserStream) GetListener(ctx context.Context) UserStreamListener { s.lock.Lock() defer s.lock.Unlock() s.numWaiting++ // We decrement when UserStreamListener is closed - return UserStreamListener{ + listener := UserStreamListener{ UserStream: s, - sincePos: sincePos, } + + // Lets be a bit paranoid here and check that Close() is being called + runtime.SetFinalizer(&listener, func(l *UserStreamListener) { + if !l.hasClosed { + util.GetLogger(ctx).Warn("Didn't call Close on UserStreamListener") + l.Close() + } + }) + + return listener } // Broadcast a new stream position for this user. @@ -116,11 +129,12 @@ func (s *UserStream) GetStreamPosition() types.StreamPosition { // GetNotifyChannel returns a channel that is closed when there may be an // update for the user. -func (s *UserStreamListener) GetNotifyChannel() <-chan struct{} { +// sincePos specifies from which point we want to be notified about +func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} { s.lock.Lock() defer s.lock.Unlock() - if s.sincePos < s.pos { + if sincePos < s.pos { posChannel := make(chan struct{}) close(posChannel) return posChannel @@ -134,6 +148,10 @@ func (s *UserStreamListener) Close() { s.lock.Lock() defer s.lock.Unlock() - s.numWaiting-- - s.timeOfLastChannel = time.Now() + if !s.hasClosed { + s.numWaiting-- + s.timeOfLastChannel = time.Now() + } + + s.hasClosed = true }