From 5d6169632abe1e2b1069211362154e20dd5bc17f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Oct 2017 17:01:45 +0100 Subject: [PATCH] Refactor Notifier to return channel This has two benefits: 1. Using channels makes it easier to time out while waiting 2. Allows us to clean up goroutines that were waiting if we timeout the request --- .../dendrite/syncapi/sync/notifier.go | 53 ++++++--- .../dendrite/syncapi/sync/notifier_test.go | 9 +- .../dendrite/syncapi/sync/requestpool.go | 22 +--- .../dendrite/syncapi/sync/userstream.go | 105 +++++++++++++----- 4 files changed, 119 insertions(+), 70 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index e0e5891de..48b0ce378 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -17,11 +17,12 @@ package sync import ( "context" "sync" + "time" - log "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" ) // Notifier will wake up sleeping requests when there is some new data. @@ -38,6 +39,8 @@ type Notifier struct { currPos types.StreamPosition // A map of user_id => UserStream which can be used to wake a given user's /sync request. userStreams map[string]*UserStream + // The last time we cleaned out stale entries from the userStreams map + lastCleanUpTime time.Time } // NewNotifier creates a new notifier set to the given stream position. @@ -49,6 +52,7 @@ func NewNotifier(pos types.StreamPosition) *Notifier { roomIDToJoinedUsers: make(map[string]userIDSet), userStreams: make(map[string]*UserStream), streamLock: &sync.Mutex{}, + lastCleanUpTime: time.Now(), } } @@ -63,6 +67,8 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty defer n.streamLock.Unlock() n.currPos = pos + n.removeEmptyUserStreams() + if ev != nil { // Map this event's room_id to a list of joined users, and wake them up. userIDs := n.joinedUsers(ev.RoomID()) @@ -103,7 +109,7 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty // WaitForEvents blocks until there are events for this request after sincePos. // In particular, it will return immediately if there are already events after // sincePos for the request, but otherwise blocks waiting for new events. -func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) types.StreamPosition { +func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) <-chan types.StreamPosition { // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID // - Incoming events wake requests for a matching room ID @@ -114,23 +120,11 @@ func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) // In a guard, check if the /sync request should block, and block it until we get woken up n.streamLock.Lock() - currentPos := n.currPos + defer n.streamLock.Unlock() - // TODO: We increment the stream position for any event, so it's possible that we return immediately - // with a pos which contains no new events for this user. We should probably re-wait for events - // automatically in this case. - if sincePos != currentPos { - n.streamLock.Unlock() - return currentPos - } + n.removeEmptyUserStreams() - // wait to be woken up, and then re-check the stream position - req.log.WithField("user_id", req.userID).Info("Waiting for event") - - // give up the stream lock prior to waiting on the user lock - stream := n.fetchUserStream(req.userID, true) - n.streamLock.Unlock() - return stream.Wait(currentPos) + return n.fetchUserStream(req.userID, true).Wait(req.ctx, sincePos) } // Load the membership states required to notify users correctly. @@ -178,7 +172,7 @@ func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStr stream, ok := n.userStreams[userID] if !ok && makeIfNotExists { // TODO: Unbounded growth of streams (1 per user) - stream = NewUserStream(userID) + stream = NewUserStream(userID, n.currPos) n.userStreams[userID] = stream } return stream @@ -208,6 +202,29 @@ func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { return n.roomIDToJoinedUsers[roomID].values() } +// removeEmptyUserStreams iterates through the user stream map and removes any +// that have been empty for a certain amount of time. This is a crude way of +// ensuring that the userStreams map doesn't grow forver. +// This should be called when the notifier gets called for whatever reason, +// the function itself is responsible for ensuring it doesn't iterate too +// often. +// NB: Callers should have locked the mutex before calling this function. +func (n *Notifier) removeEmptyUserStreams() { + // Only clean up now and again + now := time.Now() + if n.lastCleanUpTime.Add(time.Minute).After(now) { + return + } + n.lastCleanUpTime = now + + deleteBefore := now.Add(-5 * time.Minute) + for key, value := range n.userStreams { + if value.TimeOfLastNonEmpty().Before(deleteBefore) { + delete(n.userStreams, key) + } + } +} + // A string set, mainly existing for improving clarity of structs in this file. type userIDSet map[string]bool diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 7aab417bc..09af9be90 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -256,18 +256,12 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { // same as Notifier.WaitForEvents but with a timeout. func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { - done := make(chan types.StreamPosition, 1) - go func() { - newPos := n.WaitForEvents(req, req.since) - done <- newPos - close(done) - }() select { case <-time.After(5 * time.Second): return types.StreamPosition(0), fmt.Errorf( "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, ) - case p := <-done: + case p := <-n.WaitForEvents(req, req.since): return p, nil } } @@ -288,5 +282,6 @@ func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest { wantFullState: false, limit: defaultTimelineLimit, log: util.GetLogger(context.TODO()), + ctx: context.TODO(), } } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 81469087d..cc571a1ff 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -19,7 +19,6 @@ import ( "net/http" "time" - log "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" @@ -28,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + log "github.com/sirupsen/logrus" ) // RequestPool manages HTTP long-poll connections for /sync @@ -85,7 +85,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype for { select { // Wait for notifier to wake us up - case currPos = <-rp.makeNotifyChannel(*syncReq, currPos): + case currPos = <-rp.notifier.WaitForEvents(*syncReq, currPos): // Or for timeout to expire case <-timer.C: return util.JSONResponse{ @@ -116,24 +116,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype } } -// makeNotifyChannel returns a channel that produces the current stream position -// when there *may* be something to return to the client. Only produces a single -// value and then closes the channel. -func (rp *RequestPool) makeNotifyChannel(syncReq syncRequest, sincePos types.StreamPosition) chan types.StreamPosition { - notified := make(chan types.StreamPosition) - - // TODO(#303): We need to ensure that WaitForEvents gets properly cancelled - // when the request is finished, or use some other mechanism to ensure we - // don't leak goroutines here - go (func() { - currentPos := rp.notifier.WaitForEvents(syncReq, sincePos) - notified <- currentPos - close(notified) - })() - - return notified -} - type stateEventInStateResp struct { gomatrixserverlib.ClientEvent PrevContent json.RawMessage `json:"prev_content,omitempty"` diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go index 349b3e272..5a32b86b5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go @@ -15,7 +15,9 @@ package sync import ( + "context" "sync" + "time" "github.com/matrix-org/dendrite/syncapi/types" ) @@ -25,55 +27,108 @@ import ( // goroutines can Broadcast(streamPosition) to other goroutines. type UserStream struct { UserID string - // Because this is a Cond, we can notify all waiting goroutines so this works - // across devices for the same user. Protects pos. - cond *sync.Cond + // The waiting channels....... TODO + waitingChannels []chan<- types.StreamPosition + // The lock that protects pos + lock sync.Mutex // The position to broadcast to callers of Wait(). pos types.StreamPosition - // The number of goroutines blocked on Wait() - used for testing and metrics - numWaiting int + // The time when waitingChannels was last non-empty + timeOfLastChannel time.Time } // NewUserStream creates a new user stream -func NewUserStream(userID string) *UserStream { +func NewUserStream(userID string, currPos types.StreamPosition) *UserStream { return &UserStream{ - UserID: userID, - cond: sync.NewCond(&sync.Mutex{}), + UserID: userID, + timeOfLastChannel: time.Now(), + pos: currPos, } } // Wait blocks until there is a new stream position for this user, which is then returned. // waitAtPos should be the position the stream thinks it should be waiting at. -func (s *UserStream) Wait(waitAtPos types.StreamPosition) (pos types.StreamPosition) { - s.cond.L.Lock() +func (s *UserStream) Wait(ctx context.Context, waitAtPos types.StreamPosition) <-chan types.StreamPosition { + posChannel := make(chan types.StreamPosition, 1) + + s.lock.Lock() + defer s.lock.Unlock() + // Before we start blocking, we need to make sure that we didn't race with a call // to Broadcast() between calling Wait() and actually sleeping. We check the last // broadcast pos to see if it is newer than the pos we are meant to wait at. If it // is newer, something has Broadcast to this stream more recently so return immediately. if s.pos > waitAtPos { - pos = s.pos - s.cond.L.Unlock() - return + posChannel <- s.pos + close(posChannel) + return posChannel } - s.numWaiting++ - s.cond.Wait() - pos = s.pos - s.numWaiting-- - s.cond.L.Unlock() - return + + s.waitingChannels = append(s.waitingChannels, posChannel) + + // We spawn off a goroutine that waits for the request to finish and removes the + // channel from waitingChannels + go func() { + <-ctx.Done() + + s.lock.Lock() + defer s.lock.Unlock() + + // Icky but efficient way of filtering out the given channel + for idx, ch := range s.waitingChannels { + if posChannel == ch { + lastIdx := len(s.waitingChannels) + s.waitingChannels[idx] = s.waitingChannels[lastIdx] + s.waitingChannels[lastIdx] = nil + s.waitingChannels = s.waitingChannels[:lastIdx] + + if len(s.waitingChannels) == 0 { + s.timeOfLastChannel = time.Now() + } + + break + } + } + }() + + return posChannel } // Broadcast a new stream position for this user. func (s *UserStream) Broadcast(pos types.StreamPosition) { - s.cond.L.Lock() + s.lock.Lock() + defer s.lock.Unlock() + + if len(s.waitingChannels) != 0 { + s.timeOfLastChannel = time.Now() + } + s.pos = pos - s.cond.L.Unlock() - s.cond.Broadcast() + + for _, c := range s.waitingChannels { + c <- pos + close(c) + } + + s.waitingChannels = nil } // NumWaiting returns the number of goroutines waiting for Wait() to return. Used for metrics and testing. func (s *UserStream) NumWaiting() int { - s.cond.L.Lock() - defer s.cond.L.Unlock() - return s.numWaiting + s.lock.Lock() + defer s.lock.Unlock() + return len(s.waitingChannels) +} + +// TimeOfLastNonEmpty returns the last time that the number of waiting channels +// was non-empty, may be time.Now() if number of waiting channels is currently +// non-empty. +func (s *UserStream) TimeOfLastNonEmpty() time.Time { + s.lock.Lock() + defer s.lock.Unlock() + + if len(s.waitingChannels) > 0 { + return time.Now() + } + return s.timeOfLastChannel }