From d79a6a21c77b2f8c189ff5ac67fdee5e80fd6ac5 Mon Sep 17 00:00:00 2001 From: Cnly Date: Thu, 27 Jun 2019 05:10:31 +0800 Subject: [PATCH] Userstreams should store and use complete pos, not pos update ("pos delta") Signed-off-by: Alex Chen --- syncapi/storage/syncserver.go | 8 ++++---- syncapi/sync/notifier.go | 16 ++++++++++------ syncapi/sync/requestpool.go | 6 +++--- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index 062344a34..4b85cafdc 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -303,15 +303,15 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse( return nil } -// addEDUDeltaToResponse adds updates for each type of EDUs since fromPos if -// the position for that type of EDU in toPos is not 0. +// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if +// the positions of that type are not equal in fromPos and toPos. func (d *SyncServerDatasource) addEDUDeltaToResponse( fromPos, toPos types.SyncPosition, joinedRoomIDs []string, res *types.Response, ) (err error) { - if toPos.TypingPosition != 0 { + if fromPos.TypingPosition != toPos.TypingPosition { err = d.addTypingDeltaToResponse( fromPos.TypingPosition, joinedRoomIDs, res, ) @@ -336,7 +336,7 @@ func (d *SyncServerDatasource) IncrementalSync( var joinedRoomIDs []string var err error - if toPos.PDUPosition != 0 { + if fromPos.PDUPosition != toPos.PDUPosition { joinedRoomIDs, err = d.addPDUDeltaToResponse( ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res, ) diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 49a34adfa..159b7350b 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -60,13 +60,17 @@ func NewNotifier(pos types.SyncPosition) *Notifier { // called from a single goroutine, to avoid races between updates which could set the // current position in the stream incorrectly. // Can be called with one among: a *gomatrixserverlib.Event, a room ID, or a list of user IDs. -// If a position in pos is 0, it means no updates available of that type. -func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userIDs []string, pos types.SyncPosition) { +// posUpdate contains the latest position(s) for one or more types of events. +// If a position in posUpdate is 0, it means no updates available of that type. +// Typically a consumer supplies a posUpdate with and only with the sync position +// for the type of event it handles to the latest. +func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userIDs []string, posUpdate types.SyncPosition) { // update the current position then notify relevant /sync streams. // This needs to be done PRIOR to waking up users as they will read this value. n.streamLock.Lock() defer n.streamLock.Unlock() - n.currPos = n.currPos.WithUpdates(pos) + latestPos := n.currPos.WithUpdates(posUpdate) + n.currPos = latestPos n.removeEmptyUserStreams() @@ -99,11 +103,11 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userID } } - n.wakeupUsers(usersToNotify, pos) + n.wakeupUsers(usersToNotify, latestPos) } else if roomID != "" { - n.wakeupUsers(n.joinedUsers(roomID), pos) + n.wakeupUsers(n.joinedUsers(roomID), latestPos) } else if len(userIDs) > 0 { - n.wakeupUsers(userIDs, pos) + n.wakeupUsers(userIDs, latestPos) } else { log.Warn("WARNING: Notifier.OnNewEvent called but caller supplied no user to wake up") } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index c7c024af6..1bfdb206e 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -130,19 +130,19 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype } } -func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.SyncPosition) (res *types.Response, err error) { +func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncPosition) (res *types.Response, err error) { // TODO: handle ignored users if req.since == nil { res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) } else { - res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, req.limit) + res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit) } if err != nil { return } - res, err = rp.appendAccountData(res, req.device.UserID, req, currentPos.PDUPosition) + res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition) return }