From 97eadc91f610064a9676f26777a2ab48ff05d192 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Oct 2017 15:29:53 +0100 Subject: [PATCH] Fixup to use close(chan) as signalling mechanism --- .../dendrite/syncapi/sync/notifier.go | 10 +- .../dendrite/syncapi/sync/notifier_test.go | 8 +- .../dendrite/syncapi/sync/requestpool.go | 6 +- .../dendrite/syncapi/sync/userstream.go | 134 +++++++++--------- 4 files changed, 82 insertions(+), 76 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 11b07f23d..457664e5e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -106,13 +106,13 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty } } -// GetNotifyChannel returns a channel that produces a single stream position when -// a new event *may* be available to return to the client. +// GetListener returns a UserStreamListener that can be used to wait for +// updates for a user. Must be closed. // sincePos specifies from which point we want to be notified about, i.e. don't // notify for anything before sincePos -func (n *Notifier) GetNotifyChannel( +func (n *Notifier) GetListener( req syncRequest, sincePos types.StreamPosition, -) <-chan types.StreamPosition { +) UserStreamListener { // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID // - Incoming events wake requests for a matching room ID @@ -126,7 +126,7 @@ func (n *Notifier) GetNotifyChannel( n.removeEmptyUserStreams() - return n.fetchUserStream(req.userID, true).GetNotifyChannel(req.ctx, sincePos) + return n.fetchUserStream(req.userID, true).GetListener(sincePos) } // Load the membership states required to notify users correctly. diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index b24843e76..e087c51c8 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -256,18 +256,22 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { // same as Notifier.WaitForEvents but with a timeout. func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { + listener := n.GetListener(req, req.since) + defer listener.Close() + select { case <-time.After(5 * time.Second): return types.StreamPosition(0), fmt.Errorf( "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, ) - case p := <-n.GetNotifyChannel(req, req.since): + case <-listener.GetNotifyChannel(): + p := listener.GetStreamPosition() return p, nil } } // Wait until something is Wait()ing on the user stream. -func waitForBlocking(s *UserStream, numBlocking int) { +func waitForBlocking(s *UserStream, numBlocking uint) { for numBlocking != s.NumWaiting() { // This is horrible but I don't want to add a signalling mechanism JUST for testing. time.Sleep(1 * time.Microsecond) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index dde4c5272..aa9beef5f 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -82,10 +82,14 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above defer timer.Stop() + userStream := rp.notifier.GetListener(*syncReq, currPos) + defer userStream.Close() + for { select { // Wait for notifier to wake us up - case currPos = <-rp.notifier.GetNotifyChannel(*syncReq, currPos): + case <-userStream.GetNotifyChannel(): + currPos = userStream.GetStreamPosition() // Or for timeout to expire case <-timer.C: return util.JSONResponse{ diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go index ee1ce444b..213a0387b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go @@ -15,7 +15,6 @@ package sync import ( - "context" "sync" "time" @@ -23,18 +22,23 @@ import ( ) // UserStream represents a communication mechanism between the /sync request goroutine -// and the underlying sync server goroutines. Goroutines can Wait() for a stream position and -// goroutines can Broadcast(streamPosition) to other goroutines. +// and the underlying sync server goroutines. +// Goroutines can get a UserStreamListener to wait for updates, and can Broadcast() +// updates. type UserStream struct { UserID string // The lock that protects changes to this struct - lock sync.Mutex - // The channels waiting for updates for this user - waitingChannels []chan<- types.StreamPosition - // The position to broadcast to callers of Wait(). - pos types.StreamPosition - // The time when waitingChannels was last non-empty + lock sync.Mutex + signalChannel chan struct{} + pos types.StreamPosition timeOfLastChannel time.Time + numWaiting uint +} + +// UserStreamListener allows a sync request to wait for updates for a user. +type UserStreamListener struct { + *UserStream + sincePos types.StreamPosition } // NewUserStream creates a new user stream @@ -43,58 +47,24 @@ func NewUserStream(userID string, currPos types.StreamPosition) *UserStream { UserID: userID, timeOfLastChannel: time.Now(), pos: currPos, + signalChannel: make(chan struct{}), } } -// GetNotifyChannel returns a channel that produces a single stream position when -// a new event *may* be available to return to the client. +// GetListener returns UserStreamListener a sync request can use to wait for +// new updates. // sincePos specifies from which point we want to be notified about -func (s *UserStream) GetNotifyChannel( - ctx context.Context, sincePos types.StreamPosition, -) <-chan types.StreamPosition { - posChannel := make(chan types.StreamPosition, 1) - +// UserStreamListener must be closed +func (s *UserStream) GetListener(sincePos types.StreamPosition) UserStreamListener { s.lock.Lock() defer s.lock.Unlock() - // Before we start blocking, we need to make sure that we didn't race with a call - // to Broadcast() between calling Wait() and getting the lock. We check the last - // broadcast pos to see if it is newer than the pos we are meant to wait at. If it - // is newer, something has Broadcast to this stream more recently so return immediately. - if s.pos > sincePos { - posChannel <- s.pos - close(posChannel) - return posChannel + s.numWaiting++ // We decrement when UserStreamListener is closed + + return UserStreamListener{ + UserStream: s, + sincePos: sincePos, } - - s.waitingChannels = append(s.waitingChannels, posChannel) - - // We spawn off a goroutine that waits for the request to finish and removes the - // channel from waitingChannels - go func() { - <-ctx.Done() - - s.lock.Lock() - defer s.lock.Unlock() - - // Icky but efficient way of filtering out the given channel - for idx, ch := range s.waitingChannels { - if posChannel == ch { - lastIdx := len(s.waitingChannels) - 1 - s.waitingChannels[idx] = s.waitingChannels[lastIdx] - s.waitingChannels[lastIdx] = nil // Ensure that the channel gets GCed - s.waitingChannels = s.waitingChannels[:lastIdx] - - if len(s.waitingChannels) == 0 { - s.timeOfLastChannel = time.Now() - } - - break - } - } - }() - - return posChannel } // Broadcast a new stream position for this user. @@ -102,36 +72,64 @@ func (s *UserStream) Broadcast(pos types.StreamPosition) { s.lock.Lock() defer s.lock.Unlock() - if len(s.waitingChannels) != 0 { - s.timeOfLastChannel = time.Now() - } - s.pos = pos - for _, c := range s.waitingChannels { - c <- pos - close(c) - } + close(s.signalChannel) - s.waitingChannels = nil + s.signalChannel = make(chan struct{}) } -// NumWaiting returns the number of goroutines waiting for Wait() to return. Used for metrics and testing. -func (s *UserStream) NumWaiting() int { +// NumWaiting returns the number of goroutines waiting for waiting for updates. +// Used for metrics and testing. +func (s *UserStream) NumWaiting() uint { s.lock.Lock() defer s.lock.Unlock() - return len(s.waitingChannels) + return s.numWaiting } -// TimeOfLastNonEmpty returns the last time that the number of waiting channels -// was non-empty, may be time.Now() if number of waiting channels is currently +// TimeOfLastNonEmpty returns the last time that the number of waiting listeners +// was non-empty, may be time.Now() if number of waiting listeners is currently // non-empty. func (s *UserStream) TimeOfLastNonEmpty() time.Time { s.lock.Lock() defer s.lock.Unlock() - if len(s.waitingChannels) > 0 { + if s.numWaiting > 0 { return time.Now() } + return s.timeOfLastChannel } + +// GetStreamPosition returns last stream position which the UserStream was +// notified about +func (s *UserStream) GetStreamPosition() types.StreamPosition { + s.lock.Lock() + defer s.lock.Unlock() + + return s.pos +} + +// GetNotifyChannel returns a channel that is closed when there may be an +// update for the user. +func (s *UserStreamListener) GetNotifyChannel() <-chan struct{} { + s.lock.Lock() + defer s.lock.Unlock() + + if s.sincePos < s.pos { + posChannel := make(chan struct{}) + close(posChannel) + return posChannel + } + + return s.signalChannel +} + +// Close cleans up resources used +func (s *UserStreamListener) Close() { + s.lock.Lock() + defer s.lock.Unlock() + + s.numWaiting-- + s.timeOfLastChannel = time.Now() +}