mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 15:03:09 -06:00
Implement selective /sync request wakeups
Based on what Synapse does. Incomplete.
This commit is contained in:
parent
2a9abefd92
commit
aab28c2970
|
|
@ -15,27 +15,46 @@
|
||||||
package sync
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
log "github.com/Sirupsen/logrus"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/events"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Notifier will wake up sleeping requests in the request pool when there
|
// Notifier will wake up sleeping requests when there is some new data.
|
||||||
// is some new data. It does not tell requests what that data is, only the
|
// It does not tell requests what that data is, only the stream position which
|
||||||
// stream position which they can use to get at it.
|
// they can use to get at it. This is done to prevent races whereby we tell the caller
|
||||||
|
// the event, but the token has already advanced by the time they fetch it, resulting
|
||||||
|
// in missed events.
|
||||||
type Notifier struct {
|
type Notifier struct {
|
||||||
// The latest sync stream position: guarded by 'cond'.
|
// The latest sync stream position: guarded by 'currPosMutex' which is RW to allow
|
||||||
currPos types.StreamPosition
|
// for concurrent reads on /sync requests
|
||||||
// A condition variable to notify all waiting goroutines of a new sync stream position
|
currPos types.StreamPosition
|
||||||
cond *sync.Cond
|
currPosMutex *sync.RWMutex
|
||||||
|
// A map of RoomID => Set<UserID> : Map access is guarded by roomIDToJoinedUsersMutex.
|
||||||
|
roomIDToJoinedUsers map[string]set
|
||||||
|
roomIDToJoinedUsersMutex *sync.Mutex
|
||||||
|
// A map of user_id => Cond which can be used to wake a given user's /sync request.
|
||||||
|
// Because this is a Cond, we can notify all waiting goroutines so this works
|
||||||
|
// across devices. Map access is guarded by userIDCondsMutex.
|
||||||
|
userIDConds map[string]*sync.Cond
|
||||||
|
userIDCondsMutex *sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNotifier creates a new notifier set to the given stream position.
|
// NewNotifier creates a new notifier set to the given stream position.
|
||||||
|
// In order for this to be of any use, the Notifier needs to be told all rooms and
|
||||||
|
// the joined users within each of them by calling Notifier.UsersJoinedToRooms().
|
||||||
func NewNotifier(pos types.StreamPosition) *Notifier {
|
func NewNotifier(pos types.StreamPosition) *Notifier {
|
||||||
return &Notifier{
|
return &Notifier{
|
||||||
pos,
|
currPos: pos,
|
||||||
sync.NewCond(&sync.Mutex{}),
|
currPosMutex: &sync.RWMutex{},
|
||||||
|
roomIDToJoinedUsers: make(map[string]set),
|
||||||
|
roomIDToJoinedUsersMutex: &sync.Mutex{},
|
||||||
|
userIDConds: make(map[string]*sync.Cond),
|
||||||
|
userIDCondsMutex: &sync.Mutex{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -43,25 +62,166 @@ func NewNotifier(pos types.StreamPosition) *Notifier {
|
||||||
// called from a single goroutine, to avoid races between updates which could set the
|
// called from a single goroutine, to avoid races between updates which could set the
|
||||||
// current position in the stream incorrectly.
|
// current position in the stream incorrectly.
|
||||||
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) {
|
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) {
|
||||||
// update the current position in a guard and then notify all /sync streams
|
// update the current position in a guard and then notify relevant /sync streams.
|
||||||
n.cond.L.Lock()
|
// This needs to be done PRIOR to waking up users as they will read this value.
|
||||||
|
n.currPosMutex.Lock()
|
||||||
n.currPos = pos
|
n.currPos = pos
|
||||||
n.cond.L.Unlock()
|
n.currPosMutex.Unlock()
|
||||||
|
|
||||||
n.cond.Broadcast() // notify ALL waiting goroutines
|
// Map this event's room_id to a list of joined users, and wake them up.
|
||||||
|
userIDs := n.joinedUsers(ev.RoomID())
|
||||||
|
// If this is an invite, also add in the invitee to this list.
|
||||||
|
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
||||||
|
userID := *ev.StateKey()
|
||||||
|
var memberContent events.MemberContent
|
||||||
|
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
|
||||||
|
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
|
||||||
|
"Notifier.OnNewEvent: Failed to unmarshal member event",
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// Keep the joined user map up-to-date
|
||||||
|
switch memberContent.Membership {
|
||||||
|
case "invite":
|
||||||
|
userIDs = append(userIDs, userID)
|
||||||
|
case "join":
|
||||||
|
n.userJoined(ev.RoomID(), userID)
|
||||||
|
case "leave":
|
||||||
|
fallthrough
|
||||||
|
case "ban":
|
||||||
|
n.userLeft(ev.RoomID(), userID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, userID := range userIDs {
|
||||||
|
n.wakeupUser(userID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForEvents blocks until there are new events for this request.
|
// WaitForEvents blocks until there are new events for this request.
|
||||||
func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition {
|
func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition {
|
||||||
// In a guard, check if the /sync request should block, and block it until we get a new position
|
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
|
||||||
n.cond.L.Lock()
|
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
|
||||||
currentPos := n.currPos
|
// - Incoming events wake requests for a matching room ID
|
||||||
for req.since == currentPos {
|
// - Incoming events wake requests for a matching user ID (needed for invites)
|
||||||
// we need to wait for a new event.
|
|
||||||
// TODO: This waits for ANY new event, we need to only wait for events which we care about.
|
// TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
|
||||||
n.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock
|
// but given we don't do /events, let's pretend it doesn't exist.
|
||||||
currentPos = n.currPos
|
|
||||||
|
for {
|
||||||
|
// In a guard, check if the /sync request should block, and block it until we get woken up
|
||||||
|
n.currPosMutex.RLock()
|
||||||
|
currentPos := n.currPos
|
||||||
|
n.currPosMutex.RUnlock()
|
||||||
|
|
||||||
|
// TODO: We increment the stream position for any event, so it's possible that we return immediately
|
||||||
|
// with a pos which contains no new events for this user. We should probably re-wait for events
|
||||||
|
// automatically in this case.
|
||||||
|
if req.since != currentPos {
|
||||||
|
return currentPos
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait to be woken up, and then re-check the stream position
|
||||||
|
req.log.WithField("user_id", req.userID).Info("Waiting for event")
|
||||||
|
n.blockUser(req.userID)
|
||||||
}
|
}
|
||||||
n.cond.L.Unlock()
|
}
|
||||||
return currentPos
|
|
||||||
|
// UsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from
|
||||||
|
// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
|
||||||
|
// OnNewEvent (eg on startup) to prevent racing.
|
||||||
|
func (n *Notifier) UsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
|
||||||
|
// This is just the bulk form of userJoined where we only lock once.
|
||||||
|
n.roomIDToJoinedUsersMutex.Lock()
|
||||||
|
defer n.roomIDToJoinedUsersMutex.Unlock()
|
||||||
|
for roomID, userIDs := range roomIDToUserIDs {
|
||||||
|
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
||||||
|
n.roomIDToJoinedUsers[roomID] = make(set)
|
||||||
|
}
|
||||||
|
for _, userID := range userIDs {
|
||||||
|
n.roomIDToJoinedUsers[roomID].add(userID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Notifier) wakeupUser(userID string) {
|
||||||
|
cond := n.fetchUserCond(userID, false)
|
||||||
|
if cond == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cond.Broadcast() // wakeup all goroutines Wait()ing on this Cond
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Notifier) blockUser(userID string) {
|
||||||
|
cond := n.fetchUserCond(userID, true)
|
||||||
|
cond.L.Lock()
|
||||||
|
cond.Wait()
|
||||||
|
cond.L.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchUserCond retrieves a Cond unique to the given user. If makeIfNotExists is true,
|
||||||
|
// a Cond will be made for this user if one doesn't exist and it will be returned. This
|
||||||
|
// function does not lock the Cond.
|
||||||
|
func (n *Notifier) fetchUserCond(userID string, makeIfNotExists bool) *sync.Cond {
|
||||||
|
// There is a bit of a locking dance here, we want to lock the mutex protecting the map
|
||||||
|
// but NOT the Cond that we may be returning/creating.
|
||||||
|
n.userIDCondsMutex.Lock()
|
||||||
|
defer n.userIDCondsMutex.Unlock()
|
||||||
|
cond, ok := n.userIDConds[userID]
|
||||||
|
if !ok {
|
||||||
|
// TODO: Unbounded growth of locks (1 per user)
|
||||||
|
cond = sync.NewCond(&sync.Mutex{})
|
||||||
|
n.userIDConds[userID] = cond
|
||||||
|
}
|
||||||
|
return cond
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Notifier) userJoined(roomID, userID string) {
|
||||||
|
n.roomIDToJoinedUsersMutex.Lock()
|
||||||
|
defer n.roomIDToJoinedUsersMutex.Unlock()
|
||||||
|
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
||||||
|
n.roomIDToJoinedUsers[roomID] = make(set)
|
||||||
|
}
|
||||||
|
n.roomIDToJoinedUsers[roomID].add(userID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Notifier) userLeft(roomID, userID string) {
|
||||||
|
n.roomIDToJoinedUsersMutex.Lock()
|
||||||
|
defer n.roomIDToJoinedUsersMutex.Unlock()
|
||||||
|
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
||||||
|
n.roomIDToJoinedUsers[roomID] = make(set)
|
||||||
|
}
|
||||||
|
n.roomIDToJoinedUsers[roomID].remove(userID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
|
||||||
|
n.roomIDToJoinedUsersMutex.Lock()
|
||||||
|
defer n.roomIDToJoinedUsersMutex.Unlock()
|
||||||
|
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return n.roomIDToJoinedUsers[roomID].values()
|
||||||
|
}
|
||||||
|
|
||||||
|
// A string set, mainly existing for improving clarity of structs in this file.
|
||||||
|
type set map[string]bool
|
||||||
|
|
||||||
|
func (s set) add(str string) {
|
||||||
|
s[str] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s set) remove(str string) {
|
||||||
|
delete(s, str)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s set) has(str string) bool {
|
||||||
|
_, ok := s[str]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s set) values() (vals []string) {
|
||||||
|
for str := range s {
|
||||||
|
vals = append(vals, str)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,10 +15,13 @@
|
||||||
package sync
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
log "github.com/Sirupsen/logrus"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const defaultSyncTimeout = time.Duration(30) * time.Second
|
const defaultSyncTimeout = time.Duration(30) * time.Second
|
||||||
|
|
@ -31,6 +34,7 @@ type syncRequest struct {
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
since types.StreamPosition
|
since types.StreamPosition
|
||||||
wantFullState bool
|
wantFullState bool
|
||||||
|
log *log.Entry
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) {
|
func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) {
|
||||||
|
|
@ -48,6 +52,7 @@ func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) {
|
||||||
since: since,
|
since: since,
|
||||||
wantFullState: wantFullState,
|
wantFullState: wantFullState,
|
||||||
limit: defaultTimelineLimit, // TODO: read from filter
|
limit: defaultTimelineLimit, // TODO: read from filter
|
||||||
|
log: util.GetLogger(req.Context()),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue