From aab28c297083e159174a2b7f20bc64a7350493d4 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 12 May 2017 16:15:03 +0100 Subject: [PATCH] Implement selective /sync request wakeups Based on what Synapse does. Incomplete. --- .../dendrite/syncapi/sync/notifier.go | 206 ++++++++++++++++-- .../dendrite/syncapi/sync/request.go | 7 +- 2 files changed, 189 insertions(+), 24 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index cc986579f..08b27a900 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -15,27 +15,46 @@ package sync import ( + "encoding/json" "sync" + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) -// Notifier will wake up sleeping requests in the request pool when there -// is some new data. It does not tell requests what that data is, only the -// stream position which they can use to get at it. +// Notifier will wake up sleeping requests when there is some new data. +// It does not tell requests what that data is, only the stream position which +// they can use to get at it. This is done to prevent races whereby we tell the caller +// the event, but the token has already advanced by the time they fetch it, resulting +// in missed events. type Notifier struct { - // The latest sync stream position: guarded by 'cond'. - currPos types.StreamPosition - // A condition variable to notify all waiting goroutines of a new sync stream position - cond *sync.Cond + // The latest sync stream position: guarded by 'currPosMutex' which is RW to allow + // for concurrent reads on /sync requests + currPos types.StreamPosition + currPosMutex *sync.RWMutex + // A map of RoomID => Set : Map access is guarded by roomIDToJoinedUsersMutex. + roomIDToJoinedUsers map[string]set + roomIDToJoinedUsersMutex *sync.Mutex + // A map of user_id => Cond which can be used to wake a given user's /sync request. + // Because this is a Cond, we can notify all waiting goroutines so this works + // across devices. Map access is guarded by userIDCondsMutex. + userIDConds map[string]*sync.Cond + userIDCondsMutex *sync.Mutex } // NewNotifier creates a new notifier set to the given stream position. +// In order for this to be of any use, the Notifier needs to be told all rooms and +// the joined users within each of them by calling Notifier.UsersJoinedToRooms(). func NewNotifier(pos types.StreamPosition) *Notifier { return &Notifier{ - pos, - sync.NewCond(&sync.Mutex{}), + currPos: pos, + currPosMutex: &sync.RWMutex{}, + roomIDToJoinedUsers: make(map[string]set), + roomIDToJoinedUsersMutex: &sync.Mutex{}, + userIDConds: make(map[string]*sync.Cond), + userIDCondsMutex: &sync.Mutex{}, } } @@ -43,25 +62,166 @@ func NewNotifier(pos types.StreamPosition) *Notifier { // called from a single goroutine, to avoid races between updates which could set the // current position in the stream incorrectly. func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { - // update the current position in a guard and then notify all /sync streams - n.cond.L.Lock() + // update the current position in a guard and then notify relevant /sync streams. + // This needs to be done PRIOR to waking up users as they will read this value. + n.currPosMutex.Lock() n.currPos = pos - n.cond.L.Unlock() + n.currPosMutex.Unlock() - n.cond.Broadcast() // notify ALL waiting goroutines + // Map this event's room_id to a list of joined users, and wake them up. + userIDs := n.joinedUsers(ev.RoomID()) + // If this is an invite, also add in the invitee to this list. + if ev.Type() == "m.room.member" && ev.StateKey() != nil { + userID := *ev.StateKey() + var memberContent events.MemberContent + if err := json.Unmarshal(ev.Content(), &memberContent); err != nil { + log.WithError(err).WithField("event_id", ev.EventID()).Errorf( + "Notifier.OnNewEvent: Failed to unmarshal member event", + ) + } else { + // Keep the joined user map up-to-date + switch memberContent.Membership { + case "invite": + userIDs = append(userIDs, userID) + case "join": + n.userJoined(ev.RoomID(), userID) + case "leave": + fallthrough + case "ban": + n.userLeft(ev.RoomID(), userID) + } + } + } + + for _, userID := range userIDs { + n.wakeupUser(userID) + } } // WaitForEvents blocks until there are new events for this request. func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { - // In a guard, check if the /sync request should block, and block it until we get a new position - n.cond.L.Lock() - currentPos := n.currPos - for req.since == currentPos { - // we need to wait for a new event. - // TODO: This waits for ANY new event, we need to only wait for events which we care about. - n.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock - currentPos = n.currPos + // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 + // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID + // - Incoming events wake requests for a matching room ID + // - Incoming events wake requests for a matching user ID (needed for invites) + + // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked, + // but given we don't do /events, let's pretend it doesn't exist. + + for { + // In a guard, check if the /sync request should block, and block it until we get woken up + n.currPosMutex.RLock() + currentPos := n.currPos + n.currPosMutex.RUnlock() + + // TODO: We increment the stream position for any event, so it's possible that we return immediately + // with a pos which contains no new events for this user. We should probably re-wait for events + // automatically in this case. + if req.since != currentPos { + return currentPos + } + + // wait to be woken up, and then re-check the stream position + req.log.WithField("user_id", req.userID).Info("Waiting for event") + n.blockUser(req.userID) } - n.cond.L.Unlock() - return currentPos +} + +// UsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from +// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to +// OnNewEvent (eg on startup) to prevent racing. +func (n *Notifier) UsersJoinedToRooms(roomIDToUserIDs map[string][]string) { + // This is just the bulk form of userJoined where we only lock once. + n.roomIDToJoinedUsersMutex.Lock() + defer n.roomIDToJoinedUsersMutex.Unlock() + for roomID, userIDs := range roomIDToUserIDs { + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + n.roomIDToJoinedUsers[roomID] = make(set) + } + for _, userID := range userIDs { + n.roomIDToJoinedUsers[roomID].add(userID) + } + } +} + +func (n *Notifier) wakeupUser(userID string) { + cond := n.fetchUserCond(userID, false) + if cond == nil { + return + } + cond.Broadcast() // wakeup all goroutines Wait()ing on this Cond +} + +func (n *Notifier) blockUser(userID string) { + cond := n.fetchUserCond(userID, true) + cond.L.Lock() + cond.Wait() + cond.L.Unlock() +} + +// fetchUserCond retrieves a Cond unique to the given user. If makeIfNotExists is true, +// a Cond will be made for this user if one doesn't exist and it will be returned. This +// function does not lock the Cond. +func (n *Notifier) fetchUserCond(userID string, makeIfNotExists bool) *sync.Cond { + // There is a bit of a locking dance here, we want to lock the mutex protecting the map + // but NOT the Cond that we may be returning/creating. + n.userIDCondsMutex.Lock() + defer n.userIDCondsMutex.Unlock() + cond, ok := n.userIDConds[userID] + if !ok { + // TODO: Unbounded growth of locks (1 per user) + cond = sync.NewCond(&sync.Mutex{}) + n.userIDConds[userID] = cond + } + return cond +} + +func (n *Notifier) userJoined(roomID, userID string) { + n.roomIDToJoinedUsersMutex.Lock() + defer n.roomIDToJoinedUsersMutex.Unlock() + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + n.roomIDToJoinedUsers[roomID] = make(set) + } + n.roomIDToJoinedUsers[roomID].add(userID) +} + +func (n *Notifier) userLeft(roomID, userID string) { + n.roomIDToJoinedUsersMutex.Lock() + defer n.roomIDToJoinedUsersMutex.Unlock() + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + n.roomIDToJoinedUsers[roomID] = make(set) + } + n.roomIDToJoinedUsers[roomID].remove(userID) +} + +func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { + n.roomIDToJoinedUsersMutex.Lock() + defer n.roomIDToJoinedUsersMutex.Unlock() + if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { + return + } + return n.roomIDToJoinedUsers[roomID].values() +} + +// A string set, mainly existing for improving clarity of structs in this file. +type set map[string]bool + +func (s set) add(str string) { + s[str] = true +} + +func (s set) remove(str string) { + delete(s, str) +} + +func (s set) has(str string) bool { + _, ok := s[str] + return ok +} + +func (s set) values() (vals []string) { + for str := range s { + vals = append(vals, str) + } + return } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go index a44f8557f..5260a3639 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go @@ -15,10 +15,13 @@ package sync import ( - "github.com/matrix-org/dendrite/syncapi/types" "net/http" "strconv" "time" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/util" ) const defaultSyncTimeout = time.Duration(30) * time.Second @@ -31,6 +34,7 @@ type syncRequest struct { timeout time.Duration since types.StreamPosition wantFullState bool + log *log.Entry } func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) { @@ -48,6 +52,7 @@ func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) { since: since, wantFullState: wantFullState, limit: defaultTimelineLimit, // TODO: read from filter + log: util.GetLogger(req.Context()), }, nil }