This commit is contained in:
Erik Johnston 2017-10-20 15:56:08 +01:00
parent b6440a198a
commit a1118b01ce
4 changed files with 33 additions and 18 deletions

View file

@ -108,11 +108,8 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// sincePos specifies from which point we want to be notified about, i.e. don't
// notify for anything before sincePos
func (n *Notifier) GetListener(
req syncRequest, sincePos types.StreamPosition,
) UserStreamListener {
func (n *Notifier) GetListener(req syncRequest) UserStreamListener {
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
// - Incoming events wake requests for a matching room ID
@ -126,7 +123,7 @@ func (n *Notifier) GetListener(
n.removeEmptyUserStreams()
return n.fetchUserStream(req.userID, true).GetListener(sincePos)
return n.fetchUserStream(req.userID, true).GetListener(req.ctx)
}
// Load the membership states required to notify users correctly.

View file

@ -256,7 +256,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// same as Notifier.WaitForEvents but with a timeout.
func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
listener := n.GetListener(req, req.since)
listener := n.GetListener(req)
defer listener.Close()
select {
@ -264,7 +264,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
return types.StreamPosition(0), fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since,
)
case <-listener.GetNotifyChannel():
case <-listener.GetNotifyChannel(req.since):
p := listener.GetStreamPosition()
return p, nil
}

View file

@ -82,13 +82,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
defer timer.Stop()
userStream := rp.notifier.GetListener(*syncReq, currPos)
userStream := rp.notifier.GetListener(*syncReq)
defer userStream.Close()
for {
select {
// Wait for notifier to wake us up
case <-userStream.GetNotifyChannel():
case <-userStream.GetNotifyChannel(currPos):
currPos = userStream.GetStreamPosition()
// Or for timeout to expire
case <-timer.C:

View file

@ -15,10 +15,13 @@
package sync
import (
"context"
"runtime"
"sync"
"time"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/util"
)
// UserStream represents a communication mechanism between the /sync request goroutine
@ -42,7 +45,9 @@ type UserStream struct {
// UserStreamListener allows a sync request to wait for updates for a user.
type UserStreamListener struct {
*UserStream
sincePos types.StreamPosition
// Whether the stream has been closed
hasClosed bool
}
// NewUserStream creates a new user stream
@ -57,18 +62,26 @@ func NewUserStream(userID string, currPos types.StreamPosition) *UserStream {
// GetListener returns UserStreamListener that a sync request can use to wait
// for new updates with.
// sincePos specifies from which point we want to be notified about
// UserStreamListener must be closed
func (s *UserStream) GetListener(sincePos types.StreamPosition) UserStreamListener {
func (s *UserStream) GetListener(ctx context.Context) UserStreamListener {
s.lock.Lock()
defer s.lock.Unlock()
s.numWaiting++ // We decrement when UserStreamListener is closed
return UserStreamListener{
listener := UserStreamListener{
UserStream: s,
sincePos: sincePos,
}
// Lets be a bit paranoid here and check that Close() is being called
runtime.SetFinalizer(&listener, func(l *UserStreamListener) {
if !l.hasClosed {
util.GetLogger(ctx).Warn("Didn't call Close on UserStreamListener")
l.Close()
}
})
return listener
}
// Broadcast a new stream position for this user.
@ -116,11 +129,12 @@ func (s *UserStream) GetStreamPosition() types.StreamPosition {
// GetNotifyChannel returns a channel that is closed when there may be an
// update for the user.
func (s *UserStreamListener) GetNotifyChannel() <-chan struct{} {
// sincePos specifies from which point we want to be notified about
func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} {
s.lock.Lock()
defer s.lock.Unlock()
if s.sincePos < s.pos {
if sincePos < s.pos {
posChannel := make(chan struct{})
close(posChannel)
return posChannel
@ -134,6 +148,10 @@ func (s *UserStreamListener) Close() {
s.lock.Lock()
defer s.lock.Unlock()
if !s.hasClosed {
s.numWaiting--
s.timeOfLastChannel = time.Now()
}
s.hasClosed = true
}