mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-12 17:33:09 -06:00
Refactor Notifier to return channel
This has two benefits: 1. Using channels makes it easier to time out while waiting 2. Allows us to clean up goroutines that were waiting if we timeout the request
This commit is contained in:
parent
bd07447abe
commit
5d6169632a
|
|
@ -17,11 +17,12 @@ package sync
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Notifier will wake up sleeping requests when there is some new data.
|
// Notifier will wake up sleeping requests when there is some new data.
|
||||||
|
|
@ -38,6 +39,8 @@ type Notifier struct {
|
||||||
currPos types.StreamPosition
|
currPos types.StreamPosition
|
||||||
// A map of user_id => UserStream which can be used to wake a given user's /sync request.
|
// A map of user_id => UserStream which can be used to wake a given user's /sync request.
|
||||||
userStreams map[string]*UserStream
|
userStreams map[string]*UserStream
|
||||||
|
// The last time we cleaned out stale entries from the userStreams map
|
||||||
|
lastCleanUpTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNotifier creates a new notifier set to the given stream position.
|
// NewNotifier creates a new notifier set to the given stream position.
|
||||||
|
|
@ -49,6 +52,7 @@ func NewNotifier(pos types.StreamPosition) *Notifier {
|
||||||
roomIDToJoinedUsers: make(map[string]userIDSet),
|
roomIDToJoinedUsers: make(map[string]userIDSet),
|
||||||
userStreams: make(map[string]*UserStream),
|
userStreams: make(map[string]*UserStream),
|
||||||
streamLock: &sync.Mutex{},
|
streamLock: &sync.Mutex{},
|
||||||
|
lastCleanUpTime: time.Now(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -63,6 +67,8 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
|
||||||
defer n.streamLock.Unlock()
|
defer n.streamLock.Unlock()
|
||||||
n.currPos = pos
|
n.currPos = pos
|
||||||
|
|
||||||
|
n.removeEmptyUserStreams()
|
||||||
|
|
||||||
if ev != nil {
|
if ev != nil {
|
||||||
// Map this event's room_id to a list of joined users, and wake them up.
|
// Map this event's room_id to a list of joined users, and wake them up.
|
||||||
userIDs := n.joinedUsers(ev.RoomID())
|
userIDs := n.joinedUsers(ev.RoomID())
|
||||||
|
|
@ -103,7 +109,7 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
|
||||||
// WaitForEvents blocks until there are events for this request after sincePos.
|
// WaitForEvents blocks until there are events for this request after sincePos.
|
||||||
// In particular, it will return immediately if there are already events after
|
// In particular, it will return immediately if there are already events after
|
||||||
// sincePos for the request, but otherwise blocks waiting for new events.
|
// sincePos for the request, but otherwise blocks waiting for new events.
|
||||||
func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) types.StreamPosition {
|
func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) <-chan types.StreamPosition {
|
||||||
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
|
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
|
||||||
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
|
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
|
||||||
// - Incoming events wake requests for a matching room ID
|
// - Incoming events wake requests for a matching room ID
|
||||||
|
|
@ -114,23 +120,11 @@ func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition)
|
||||||
|
|
||||||
// In a guard, check if the /sync request should block, and block it until we get woken up
|
// In a guard, check if the /sync request should block, and block it until we get woken up
|
||||||
n.streamLock.Lock()
|
n.streamLock.Lock()
|
||||||
currentPos := n.currPos
|
defer n.streamLock.Unlock()
|
||||||
|
|
||||||
// TODO: We increment the stream position for any event, so it's possible that we return immediately
|
n.removeEmptyUserStreams()
|
||||||
// with a pos which contains no new events for this user. We should probably re-wait for events
|
|
||||||
// automatically in this case.
|
|
||||||
if sincePos != currentPos {
|
|
||||||
n.streamLock.Unlock()
|
|
||||||
return currentPos
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait to be woken up, and then re-check the stream position
|
return n.fetchUserStream(req.userID, true).Wait(req.ctx, sincePos)
|
||||||
req.log.WithField("user_id", req.userID).Info("Waiting for event")
|
|
||||||
|
|
||||||
// give up the stream lock prior to waiting on the user lock
|
|
||||||
stream := n.fetchUserStream(req.userID, true)
|
|
||||||
n.streamLock.Unlock()
|
|
||||||
return stream.Wait(currentPos)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load the membership states required to notify users correctly.
|
// Load the membership states required to notify users correctly.
|
||||||
|
|
@ -178,7 +172,7 @@ func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStr
|
||||||
stream, ok := n.userStreams[userID]
|
stream, ok := n.userStreams[userID]
|
||||||
if !ok && makeIfNotExists {
|
if !ok && makeIfNotExists {
|
||||||
// TODO: Unbounded growth of streams (1 per user)
|
// TODO: Unbounded growth of streams (1 per user)
|
||||||
stream = NewUserStream(userID)
|
stream = NewUserStream(userID, n.currPos)
|
||||||
n.userStreams[userID] = stream
|
n.userStreams[userID] = stream
|
||||||
}
|
}
|
||||||
return stream
|
return stream
|
||||||
|
|
@ -208,6 +202,29 @@ func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
|
||||||
return n.roomIDToJoinedUsers[roomID].values()
|
return n.roomIDToJoinedUsers[roomID].values()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// removeEmptyUserStreams iterates through the user stream map and removes any
|
||||||
|
// that have been empty for a certain amount of time. This is a crude way of
|
||||||
|
// ensuring that the userStreams map doesn't grow forver.
|
||||||
|
// This should be called when the notifier gets called for whatever reason,
|
||||||
|
// the function itself is responsible for ensuring it doesn't iterate too
|
||||||
|
// often.
|
||||||
|
// NB: Callers should have locked the mutex before calling this function.
|
||||||
|
func (n *Notifier) removeEmptyUserStreams() {
|
||||||
|
// Only clean up now and again
|
||||||
|
now := time.Now()
|
||||||
|
if n.lastCleanUpTime.Add(time.Minute).After(now) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n.lastCleanUpTime = now
|
||||||
|
|
||||||
|
deleteBefore := now.Add(-5 * time.Minute)
|
||||||
|
for key, value := range n.userStreams {
|
||||||
|
if value.TimeOfLastNonEmpty().Before(deleteBefore) {
|
||||||
|
delete(n.userStreams, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// A string set, mainly existing for improving clarity of structs in this file.
|
// A string set, mainly existing for improving clarity of structs in this file.
|
||||||
type userIDSet map[string]bool
|
type userIDSet map[string]bool
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -256,18 +256,12 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||||
|
|
||||||
// same as Notifier.WaitForEvents but with a timeout.
|
// same as Notifier.WaitForEvents but with a timeout.
|
||||||
func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
|
func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
|
||||||
done := make(chan types.StreamPosition, 1)
|
|
||||||
go func() {
|
|
||||||
newPos := n.WaitForEvents(req, req.since)
|
|
||||||
done <- newPos
|
|
||||||
close(done)
|
|
||||||
}()
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
return types.StreamPosition(0), fmt.Errorf(
|
return types.StreamPosition(0), fmt.Errorf(
|
||||||
"waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since,
|
"waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since,
|
||||||
)
|
)
|
||||||
case p := <-done:
|
case p := <-n.WaitForEvents(req, req.since):
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -288,5 +282,6 @@ func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
|
||||||
wantFullState: false,
|
wantFullState: false,
|
||||||
limit: defaultTimelineLimit,
|
limit: defaultTimelineLimit,
|
||||||
log: util.GetLogger(context.TODO()),
|
log: util.GetLogger(context.TODO()),
|
||||||
|
ctx: context.TODO(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
|
|
@ -28,6 +27,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RequestPool manages HTTP long-poll connections for /sync
|
// RequestPool manages HTTP long-poll connections for /sync
|
||||||
|
|
@ -85,7 +85,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
// Wait for notifier to wake us up
|
// Wait for notifier to wake us up
|
||||||
case currPos = <-rp.makeNotifyChannel(*syncReq, currPos):
|
case currPos = <-rp.notifier.WaitForEvents(*syncReq, currPos):
|
||||||
// Or for timeout to expire
|
// Or for timeout to expire
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
|
@ -116,24 +116,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeNotifyChannel returns a channel that produces the current stream position
|
|
||||||
// when there *may* be something to return to the client. Only produces a single
|
|
||||||
// value and then closes the channel.
|
|
||||||
func (rp *RequestPool) makeNotifyChannel(syncReq syncRequest, sincePos types.StreamPosition) chan types.StreamPosition {
|
|
||||||
notified := make(chan types.StreamPosition)
|
|
||||||
|
|
||||||
// TODO(#303): We need to ensure that WaitForEvents gets properly cancelled
|
|
||||||
// when the request is finished, or use some other mechanism to ensure we
|
|
||||||
// don't leak goroutines here
|
|
||||||
go (func() {
|
|
||||||
currentPos := rp.notifier.WaitForEvents(syncReq, sincePos)
|
|
||||||
notified <- currentPos
|
|
||||||
close(notified)
|
|
||||||
})()
|
|
||||||
|
|
||||||
return notified
|
|
||||||
}
|
|
||||||
|
|
||||||
type stateEventInStateResp struct {
|
type stateEventInStateResp struct {
|
||||||
gomatrixserverlib.ClientEvent
|
gomatrixserverlib.ClientEvent
|
||||||
PrevContent json.RawMessage `json:"prev_content,omitempty"`
|
PrevContent json.RawMessage `json:"prev_content,omitempty"`
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,9 @@
|
||||||
package sync
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
)
|
)
|
||||||
|
|
@ -25,55 +27,108 @@ import (
|
||||||
// goroutines can Broadcast(streamPosition) to other goroutines.
|
// goroutines can Broadcast(streamPosition) to other goroutines.
|
||||||
type UserStream struct {
|
type UserStream struct {
|
||||||
UserID string
|
UserID string
|
||||||
// Because this is a Cond, we can notify all waiting goroutines so this works
|
// The waiting channels....... TODO
|
||||||
// across devices for the same user. Protects pos.
|
waitingChannels []chan<- types.StreamPosition
|
||||||
cond *sync.Cond
|
// The lock that protects pos
|
||||||
|
lock sync.Mutex
|
||||||
// The position to broadcast to callers of Wait().
|
// The position to broadcast to callers of Wait().
|
||||||
pos types.StreamPosition
|
pos types.StreamPosition
|
||||||
// The number of goroutines blocked on Wait() - used for testing and metrics
|
// The time when waitingChannels was last non-empty
|
||||||
numWaiting int
|
timeOfLastChannel time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewUserStream creates a new user stream
|
// NewUserStream creates a new user stream
|
||||||
func NewUserStream(userID string) *UserStream {
|
func NewUserStream(userID string, currPos types.StreamPosition) *UserStream {
|
||||||
return &UserStream{
|
return &UserStream{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
cond: sync.NewCond(&sync.Mutex{}),
|
timeOfLastChannel: time.Now(),
|
||||||
|
pos: currPos,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait blocks until there is a new stream position for this user, which is then returned.
|
// Wait blocks until there is a new stream position for this user, which is then returned.
|
||||||
// waitAtPos should be the position the stream thinks it should be waiting at.
|
// waitAtPos should be the position the stream thinks it should be waiting at.
|
||||||
func (s *UserStream) Wait(waitAtPos types.StreamPosition) (pos types.StreamPosition) {
|
func (s *UserStream) Wait(ctx context.Context, waitAtPos types.StreamPosition) <-chan types.StreamPosition {
|
||||||
s.cond.L.Lock()
|
posChannel := make(chan types.StreamPosition, 1)
|
||||||
|
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
// Before we start blocking, we need to make sure that we didn't race with a call
|
// Before we start blocking, we need to make sure that we didn't race with a call
|
||||||
// to Broadcast() between calling Wait() and actually sleeping. We check the last
|
// to Broadcast() between calling Wait() and actually sleeping. We check the last
|
||||||
// broadcast pos to see if it is newer than the pos we are meant to wait at. If it
|
// broadcast pos to see if it is newer than the pos we are meant to wait at. If it
|
||||||
// is newer, something has Broadcast to this stream more recently so return immediately.
|
// is newer, something has Broadcast to this stream more recently so return immediately.
|
||||||
if s.pos > waitAtPos {
|
if s.pos > waitAtPos {
|
||||||
pos = s.pos
|
posChannel <- s.pos
|
||||||
s.cond.L.Unlock()
|
close(posChannel)
|
||||||
return
|
return posChannel
|
||||||
}
|
}
|
||||||
s.numWaiting++
|
|
||||||
s.cond.Wait()
|
s.waitingChannels = append(s.waitingChannels, posChannel)
|
||||||
pos = s.pos
|
|
||||||
s.numWaiting--
|
// We spawn off a goroutine that waits for the request to finish and removes the
|
||||||
s.cond.L.Unlock()
|
// channel from waitingChannels
|
||||||
return
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
// Icky but efficient way of filtering out the given channel
|
||||||
|
for idx, ch := range s.waitingChannels {
|
||||||
|
if posChannel == ch {
|
||||||
|
lastIdx := len(s.waitingChannels)
|
||||||
|
s.waitingChannels[idx] = s.waitingChannels[lastIdx]
|
||||||
|
s.waitingChannels[lastIdx] = nil
|
||||||
|
s.waitingChannels = s.waitingChannels[:lastIdx]
|
||||||
|
|
||||||
|
if len(s.waitingChannels) == 0 {
|
||||||
|
s.timeOfLastChannel = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return posChannel
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcast a new stream position for this user.
|
// Broadcast a new stream position for this user.
|
||||||
func (s *UserStream) Broadcast(pos types.StreamPosition) {
|
func (s *UserStream) Broadcast(pos types.StreamPosition) {
|
||||||
s.cond.L.Lock()
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
if len(s.waitingChannels) != 0 {
|
||||||
|
s.timeOfLastChannel = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
s.pos = pos
|
s.pos = pos
|
||||||
s.cond.L.Unlock()
|
|
||||||
s.cond.Broadcast()
|
for _, c := range s.waitingChannels {
|
||||||
|
c <- pos
|
||||||
|
close(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.waitingChannels = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NumWaiting returns the number of goroutines waiting for Wait() to return. Used for metrics and testing.
|
// NumWaiting returns the number of goroutines waiting for Wait() to return. Used for metrics and testing.
|
||||||
func (s *UserStream) NumWaiting() int {
|
func (s *UserStream) NumWaiting() int {
|
||||||
s.cond.L.Lock()
|
s.lock.Lock()
|
||||||
defer s.cond.L.Unlock()
|
defer s.lock.Unlock()
|
||||||
return s.numWaiting
|
return len(s.waitingChannels)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TimeOfLastNonEmpty returns the last time that the number of waiting channels
|
||||||
|
// was non-empty, may be time.Now() if number of waiting channels is currently
|
||||||
|
// non-empty.
|
||||||
|
func (s *UserStream) TimeOfLastNonEmpty() time.Time {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
if len(s.waitingChannels) > 0 {
|
||||||
|
return time.Now()
|
||||||
|
}
|
||||||
|
return s.timeOfLastChannel
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue