Fixup names and comments

This commit is contained in:
Erik Johnston 2017-10-18 11:26:00 +01:00
parent fd61776f93
commit 5bd570df64
4 changed files with 19 additions and 12 deletions

View file

@ -106,9 +106,13 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
}
}
// WaitForEvents returns a channel that produces a single stream position when
// GetNotifyChannel returns a channel that produces a single stream position when
// a new event *may* be available to return to the client.
func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) <-chan types.StreamPosition {
// sincePos specifies from which point we want to be notified about, i.e. don't
// notify for anything before sincePos
func (n *Notifier) GetNotifyChannel(
req syncRequest, sincePos types.StreamPosition,
) <-chan types.StreamPosition {
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
// - Incoming events wake requests for a matching room ID
@ -123,7 +127,7 @@ func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition)
n.removeEmptyUserStreams()
return n.fetchUserStream(req.userID, true).Wait(req.ctx, sincePos)
return n.fetchUserStream(req.userID, true).GetNotifyChannel(req.ctx, sincePos)
}
// Load the membership states required to notify users correctly.

View file

@ -261,7 +261,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
return types.StreamPosition(0), fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since,
)
case p := <-n.WaitForEvents(req, req.since):
case p := <-n.GetNotifyChannel(req, req.since):
return p, nil
}
}

View file

@ -85,7 +85,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
for {
select {
// Wait for notifier to wake us up
case currPos = <-rp.notifier.WaitForEvents(*syncReq, currPos):
case currPos = <-rp.notifier.GetNotifyChannel(*syncReq, currPos):
// Or for timeout to expire
case <-timer.C:
return util.JSONResponse{

View file

@ -27,10 +27,10 @@ import (
// goroutines can Broadcast(streamPosition) to other goroutines.
type UserStream struct {
UserID string
// The waiting channels....... TODO
waitingChannels []chan<- types.StreamPosition
// The lock that protects pos
// The lock that protects changes to this struct
lock sync.Mutex
// The channels waiting for updates for this user
waitingChannels []chan<- types.StreamPosition
// The position to broadcast to callers of Wait().
pos types.StreamPosition
// The time when waitingChannels was last non-empty
@ -46,19 +46,22 @@ func NewUserStream(userID string, currPos types.StreamPosition) *UserStream {
}
}
// Wait returns a channel that produces a single stream position when
// GetNotifyChannel returns a channel that produces a single stream position when
// a new event *may* be available to return to the client.
func (s *UserStream) Wait(ctx context.Context, waitAtPos types.StreamPosition) <-chan types.StreamPosition {
// sincePos specifies from which point we want to be notified about
func (s *UserStream) GetNotifyChannel(
ctx context.Context, sincePos types.StreamPosition,
) <-chan types.StreamPosition {
posChannel := make(chan types.StreamPosition, 1)
s.lock.Lock()
defer s.lock.Unlock()
// Before we start blocking, we need to make sure that we didn't race with a call
// to Broadcast() between calling Wait() and actually sleeping. We check the last
// to Broadcast() between calling Wait() and getting the lock. We check the last
// broadcast pos to see if it is newer than the pos we are meant to wait at. If it
// is newer, something has Broadcast to this stream more recently so return immediately.
if s.pos > waitAtPos {
if s.pos > sincePos {
posChannel <- s.pos
close(posChannel)
return posChannel