From 8fdc520e3204e7636d5b6819514955338d9fc522 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Oct 2017 11:50:57 +0100 Subject: [PATCH] Don't use implicit UserStream --- .../dendrite/syncapi/sync/userstream.go | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go index 8f90b7bfb..eabe442b0 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go @@ -44,7 +44,7 @@ type UserStream struct { // UserStreamListener allows a sync request to wait for updates for a user. type UserStreamListener struct { - *UserStream + userStream *UserStream // Whether the stream has been closed hasClosed bool @@ -70,7 +70,7 @@ func (s *UserStream) GetListener(ctx context.Context) UserStreamListener { s.numWaiting++ // We decrement when UserStreamListener is closed listener := UserStreamListener{ - UserStream: s, + userStream: s, } // Lets be a bit paranoid here and check that Close() is being called @@ -120,21 +120,21 @@ func (s *UserStream) TimeOfLastNonEmpty() time.Time { // GetStreamPosition returns last stream position which the UserStream was // notified about -func (s *UserStream) GetStreamPosition() types.StreamPosition { - s.lock.Lock() - defer s.lock.Unlock() +func (s *UserStreamListener) GetStreamPosition() types.StreamPosition { + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() - return s.pos + return s.userStream.pos } // GetNotifyChannel returns a channel that is closed when there may be an // update for the user. // sincePos specifies from which point we want to be notified about func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} { - s.lock.Lock() - defer s.lock.Unlock() + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() - if sincePos < s.pos { + if sincePos < s.userStream.pos { // If the listener is behind, i.e. missed a potential update, then we // want them to wake up immediately. We do this by returning a new // closed stream, which returns immediately when selected. @@ -143,17 +143,17 @@ func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-c return closedChannel } - return s.signalChannel + return s.userStream.signalChannel } // Close cleans up resources used func (s *UserStreamListener) Close() { - s.lock.Lock() - defer s.lock.Unlock() + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() if !s.hasClosed { - s.numWaiting-- - s.timeOfLastChannel = time.Now() + s.userStream.numWaiting-- + s.userStream.timeOfLastChannel = time.Now() } s.hasClosed = true