From 5bd570df64f6be620bbf3dc5acdd31ba03ca3b0d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Oct 2017 11:26:00 +0100 Subject: [PATCH] Fixup names and comments --- .../dendrite/syncapi/sync/notifier.go | 10 +++++++--- .../dendrite/syncapi/sync/notifier_test.go | 2 +- .../dendrite/syncapi/sync/requestpool.go | 2 +- .../dendrite/syncapi/sync/userstream.go | 17 ++++++++++------- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 3df942060..3210a7d8b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -106,9 +106,13 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty } } -// WaitForEvents returns a channel that produces a single stream position when +// GetNotifyChannel returns a channel that produces a single stream position when // a new event *may* be available to return to the client. -func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) <-chan types.StreamPosition { +// sincePos specifies from which point we want to be notified about, i.e. don't +// notify for anything before sincePos +func (n *Notifier) GetNotifyChannel( + req syncRequest, sincePos types.StreamPosition, +) <-chan types.StreamPosition { // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID // - Incoming events wake requests for a matching room ID @@ -123,7 +127,7 @@ func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) n.removeEmptyUserStreams() - return n.fetchUserStream(req.userID, true).Wait(req.ctx, sincePos) + return n.fetchUserStream(req.userID, true).GetNotifyChannel(req.ctx, sincePos) } // Load the membership states required to notify users correctly. diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 09af9be90..b24843e76 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -261,7 +261,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { return types.StreamPosition(0), fmt.Errorf( "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, ) - case p := <-n.WaitForEvents(req, req.since): + case p := <-n.GetNotifyChannel(req, req.since): return p, nil } } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index cc571a1ff..dde4c5272 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -85,7 +85,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype for { select { // Wait for notifier to wake us up - case currPos = <-rp.notifier.WaitForEvents(*syncReq, currPos): + case currPos = <-rp.notifier.GetNotifyChannel(*syncReq, currPos): // Or for timeout to expire case <-timer.C: return util.JSONResponse{ diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go index 6dc1821c2..ee1ce444b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go @@ -27,10 +27,10 @@ import ( // goroutines can Broadcast(streamPosition) to other goroutines. type UserStream struct { UserID string - // The waiting channels....... TODO - waitingChannels []chan<- types.StreamPosition - // The lock that protects pos + // The lock that protects changes to this struct lock sync.Mutex + // The channels waiting for updates for this user + waitingChannels []chan<- types.StreamPosition // The position to broadcast to callers of Wait(). pos types.StreamPosition // The time when waitingChannels was last non-empty @@ -46,19 +46,22 @@ func NewUserStream(userID string, currPos types.StreamPosition) *UserStream { } } -// Wait returns a channel that produces a single stream position when +// GetNotifyChannel returns a channel that produces a single stream position when // a new event *may* be available to return to the client. -func (s *UserStream) Wait(ctx context.Context, waitAtPos types.StreamPosition) <-chan types.StreamPosition { +// sincePos specifies from which point we want to be notified about +func (s *UserStream) GetNotifyChannel( + ctx context.Context, sincePos types.StreamPosition, +) <-chan types.StreamPosition { posChannel := make(chan types.StreamPosition, 1) s.lock.Lock() defer s.lock.Unlock() // Before we start blocking, we need to make sure that we didn't race with a call - // to Broadcast() between calling Wait() and actually sleeping. We check the last + // to Broadcast() between calling Wait() and getting the lock. We check the last // broadcast pos to see if it is newer than the pos we are meant to wait at. If it // is newer, something has Broadcast to this stream more recently so return immediately. - if s.pos > waitAtPos { + if s.pos > sincePos { posChannel <- s.pos close(posChannel) return posChannel