Fixup to use close(chan) as signalling mechanism

This commit is contained in:
Erik Johnston 2017-10-20 15:29:53 +01:00
parent 4ab530b72a
commit 97eadc91f6
4 changed files with 82 additions and 76 deletions

View file

@ -106,13 +106,13 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
} }
} }
// GetNotifyChannel returns a channel that produces a single stream position when // GetListener returns a UserStreamListener that can be used to wait for
// a new event *may* be available to return to the client. // updates for a user. Must be closed.
// sincePos specifies from which point we want to be notified about, i.e. don't // sincePos specifies from which point we want to be notified about, i.e. don't
// notify for anything before sincePos // notify for anything before sincePos
func (n *Notifier) GetNotifyChannel( func (n *Notifier) GetListener(
req syncRequest, sincePos types.StreamPosition, req syncRequest, sincePos types.StreamPosition,
) <-chan types.StreamPosition { ) UserStreamListener {
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
// - Incoming events wake requests for a matching room ID // - Incoming events wake requests for a matching room ID
@ -126,7 +126,7 @@ func (n *Notifier) GetNotifyChannel(
n.removeEmptyUserStreams() n.removeEmptyUserStreams()
return n.fetchUserStream(req.userID, true).GetNotifyChannel(req.ctx, sincePos) return n.fetchUserStream(req.userID, true).GetListener(sincePos)
} }
// Load the membership states required to notify users correctly. // Load the membership states required to notify users correctly.

View file

@ -256,18 +256,22 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// same as Notifier.WaitForEvents but with a timeout. // same as Notifier.WaitForEvents but with a timeout.
func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
listener := n.GetListener(req, req.since)
defer listener.Close()
select { select {
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
return types.StreamPosition(0), fmt.Errorf( return types.StreamPosition(0), fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since,
) )
case p := <-n.GetNotifyChannel(req, req.since): case <-listener.GetNotifyChannel():
p := listener.GetStreamPosition()
return p, nil return p, nil
} }
} }
// Wait until something is Wait()ing on the user stream. // Wait until something is Wait()ing on the user stream.
func waitForBlocking(s *UserStream, numBlocking int) { func waitForBlocking(s *UserStream, numBlocking uint) {
for numBlocking != s.NumWaiting() { for numBlocking != s.NumWaiting() {
// This is horrible but I don't want to add a signalling mechanism JUST for testing. // This is horrible but I don't want to add a signalling mechanism JUST for testing.
time.Sleep(1 * time.Microsecond) time.Sleep(1 * time.Microsecond)

View file

@ -82,10 +82,14 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
defer timer.Stop() defer timer.Stop()
userStream := rp.notifier.GetListener(*syncReq, currPos)
defer userStream.Close()
for { for {
select { select {
// Wait for notifier to wake us up // Wait for notifier to wake us up
case currPos = <-rp.notifier.GetNotifyChannel(*syncReq, currPos): case <-userStream.GetNotifyChannel():
currPos = userStream.GetStreamPosition()
// Or for timeout to expire // Or for timeout to expire
case <-timer.C: case <-timer.C:
return util.JSONResponse{ return util.JSONResponse{

View file

@ -15,7 +15,6 @@
package sync package sync
import ( import (
"context"
"sync" "sync"
"time" "time"
@ -23,18 +22,23 @@ import (
) )
// UserStream represents a communication mechanism between the /sync request goroutine // UserStream represents a communication mechanism between the /sync request goroutine
// and the underlying sync server goroutines. Goroutines can Wait() for a stream position and // and the underlying sync server goroutines.
// goroutines can Broadcast(streamPosition) to other goroutines. // Goroutines can get a UserStreamListener to wait for updates, and can Broadcast()
// updates.
type UserStream struct { type UserStream struct {
UserID string UserID string
// The lock that protects changes to this struct // The lock that protects changes to this struct
lock sync.Mutex lock sync.Mutex
// The channels waiting for updates for this user signalChannel chan struct{}
waitingChannels []chan<- types.StreamPosition pos types.StreamPosition
// The position to broadcast to callers of Wait().
pos types.StreamPosition
// The time when waitingChannels was last non-empty
timeOfLastChannel time.Time timeOfLastChannel time.Time
numWaiting uint
}
// UserStreamListener allows a sync request to wait for updates for a user.
type UserStreamListener struct {
*UserStream
sincePos types.StreamPosition
} }
// NewUserStream creates a new user stream // NewUserStream creates a new user stream
@ -43,58 +47,24 @@ func NewUserStream(userID string, currPos types.StreamPosition) *UserStream {
UserID: userID, UserID: userID,
timeOfLastChannel: time.Now(), timeOfLastChannel: time.Now(),
pos: currPos, pos: currPos,
signalChannel: make(chan struct{}),
} }
} }
// GetNotifyChannel returns a channel that produces a single stream position when // GetListener returns UserStreamListener a sync request can use to wait for
// a new event *may* be available to return to the client. // new updates.
// sincePos specifies from which point we want to be notified about // sincePos specifies from which point we want to be notified about
func (s *UserStream) GetNotifyChannel( // UserStreamListener must be closed
ctx context.Context, sincePos types.StreamPosition, func (s *UserStream) GetListener(sincePos types.StreamPosition) UserStreamListener {
) <-chan types.StreamPosition {
posChannel := make(chan types.StreamPosition, 1)
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
// Before we start blocking, we need to make sure that we didn't race with a call s.numWaiting++ // We decrement when UserStreamListener is closed
// to Broadcast() between calling Wait() and getting the lock. We check the last
// broadcast pos to see if it is newer than the pos we are meant to wait at. If it return UserStreamListener{
// is newer, something has Broadcast to this stream more recently so return immediately. UserStream: s,
if s.pos > sincePos { sincePos: sincePos,
posChannel <- s.pos
close(posChannel)
return posChannel
} }
s.waitingChannels = append(s.waitingChannels, posChannel)
// We spawn off a goroutine that waits for the request to finish and removes the
// channel from waitingChannels
go func() {
<-ctx.Done()
s.lock.Lock()
defer s.lock.Unlock()
// Icky but efficient way of filtering out the given channel
for idx, ch := range s.waitingChannels {
if posChannel == ch {
lastIdx := len(s.waitingChannels) - 1
s.waitingChannels[idx] = s.waitingChannels[lastIdx]
s.waitingChannels[lastIdx] = nil // Ensure that the channel gets GCed
s.waitingChannels = s.waitingChannels[:lastIdx]
if len(s.waitingChannels) == 0 {
s.timeOfLastChannel = time.Now()
}
break
}
}
}()
return posChannel
} }
// Broadcast a new stream position for this user. // Broadcast a new stream position for this user.
@ -102,36 +72,64 @@ func (s *UserStream) Broadcast(pos types.StreamPosition) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
if len(s.waitingChannels) != 0 {
s.timeOfLastChannel = time.Now()
}
s.pos = pos s.pos = pos
for _, c := range s.waitingChannels { close(s.signalChannel)
c <- pos
close(c)
}
s.waitingChannels = nil s.signalChannel = make(chan struct{})
} }
// NumWaiting returns the number of goroutines waiting for Wait() to return. Used for metrics and testing. // NumWaiting returns the number of goroutines waiting for waiting for updates.
func (s *UserStream) NumWaiting() int { // Used for metrics and testing.
func (s *UserStream) NumWaiting() uint {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
return len(s.waitingChannels) return s.numWaiting
} }
// TimeOfLastNonEmpty returns the last time that the number of waiting channels // TimeOfLastNonEmpty returns the last time that the number of waiting listeners
// was non-empty, may be time.Now() if number of waiting channels is currently // was non-empty, may be time.Now() if number of waiting listeners is currently
// non-empty. // non-empty.
func (s *UserStream) TimeOfLastNonEmpty() time.Time { func (s *UserStream) TimeOfLastNonEmpty() time.Time {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
if len(s.waitingChannels) > 0 { if s.numWaiting > 0 {
return time.Now() return time.Now()
} }
return s.timeOfLastChannel return s.timeOfLastChannel
} }
// GetStreamPosition returns last stream position which the UserStream was
// notified about
func (s *UserStream) GetStreamPosition() types.StreamPosition {
s.lock.Lock()
defer s.lock.Unlock()
return s.pos
}
// GetNotifyChannel returns a channel that is closed when there may be an
// update for the user.
func (s *UserStreamListener) GetNotifyChannel() <-chan struct{} {
s.lock.Lock()
defer s.lock.Unlock()
if s.sincePos < s.pos {
posChannel := make(chan struct{})
close(posChannel)
return posChannel
}
return s.signalChannel
}
// Close cleans up resources used
func (s *UserStreamListener) Close() {
s.lock.Lock()
defer s.lock.Unlock()
s.numWaiting--
s.timeOfLastChannel = time.Now()
}