Don't use implicit UserStream

This commit is contained in:
Erik Johnston 2017-10-24 11:50:57 +01:00
parent ce0ec60fe1
commit 8fdc520e32

View file

@ -44,7 +44,7 @@ type UserStream struct {
// UserStreamListener allows a sync request to wait for updates for a user. // UserStreamListener allows a sync request to wait for updates for a user.
type UserStreamListener struct { type UserStreamListener struct {
*UserStream userStream *UserStream
// Whether the stream has been closed // Whether the stream has been closed
hasClosed bool hasClosed bool
@ -70,7 +70,7 @@ func (s *UserStream) GetListener(ctx context.Context) UserStreamListener {
s.numWaiting++ // We decrement when UserStreamListener is closed s.numWaiting++ // We decrement when UserStreamListener is closed
listener := UserStreamListener{ listener := UserStreamListener{
UserStream: s, userStream: s,
} }
// Lets be a bit paranoid here and check that Close() is being called // Lets be a bit paranoid here and check that Close() is being called
@ -120,21 +120,21 @@ func (s *UserStream) TimeOfLastNonEmpty() time.Time {
// GetStreamPosition returns last stream position which the UserStream was // GetStreamPosition returns last stream position which the UserStream was
// notified about // notified about
func (s *UserStream) GetStreamPosition() types.StreamPosition { func (s *UserStreamListener) GetStreamPosition() types.StreamPosition {
s.lock.Lock() s.userStream.lock.Lock()
defer s.lock.Unlock() defer s.userStream.lock.Unlock()
return s.pos return s.userStream.pos
} }
// GetNotifyChannel returns a channel that is closed when there may be an // GetNotifyChannel returns a channel that is closed when there may be an
// update for the user. // update for the user.
// sincePos specifies from which point we want to be notified about // sincePos specifies from which point we want to be notified about
func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} { func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} {
s.lock.Lock() s.userStream.lock.Lock()
defer s.lock.Unlock() defer s.userStream.lock.Unlock()
if sincePos < s.pos { if sincePos < s.userStream.pos {
// If the listener is behind, i.e. missed a potential update, then we // If the listener is behind, i.e. missed a potential update, then we
// want them to wake up immediately. We do this by returning a new // want them to wake up immediately. We do this by returning a new
// closed stream, which returns immediately when selected. // closed stream, which returns immediately when selected.
@ -143,17 +143,17 @@ func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-c
return closedChannel return closedChannel
} }
return s.signalChannel return s.userStream.signalChannel
} }
// Close cleans up resources used // Close cleans up resources used
func (s *UserStreamListener) Close() { func (s *UserStreamListener) Close() {
s.lock.Lock() s.userStream.lock.Lock()
defer s.lock.Unlock() defer s.userStream.lock.Unlock()
if !s.hasClosed { if !s.hasClosed {
s.numWaiting-- s.userStream.numWaiting--
s.timeOfLastChannel = time.Now() s.userStream.timeOfLastChannel = time.Now()
} }
s.hasClosed = true s.hasClosed = true