Userstreams should store and use complete pos, not pos update ("pos delta")

Signed-off-by: Alex Chen <minecnly@gmail.com>
This commit is contained in:
Cnly 2019-06-27 05:10:31 +08:00
parent 274db2fbaf
commit d79a6a21c7
3 changed files with 17 additions and 13 deletions

View file

@ -303,15 +303,15 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse(
return nil return nil
} }
// addEDUDeltaToResponse adds updates for each type of EDUs since fromPos if // addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
// the position for that type of EDU in toPos is not 0. // the positions of that type are not equal in fromPos and toPos.
func (d *SyncServerDatasource) addEDUDeltaToResponse( func (d *SyncServerDatasource) addEDUDeltaToResponse(
fromPos, toPos types.SyncPosition, fromPos, toPos types.SyncPosition,
joinedRoomIDs []string, joinedRoomIDs []string,
res *types.Response, res *types.Response,
) (err error) { ) (err error) {
if toPos.TypingPosition != 0 { if fromPos.TypingPosition != toPos.TypingPosition {
err = d.addTypingDeltaToResponse( err = d.addTypingDeltaToResponse(
fromPos.TypingPosition, joinedRoomIDs, res, fromPos.TypingPosition, joinedRoomIDs, res,
) )
@ -336,7 +336,7 @@ func (d *SyncServerDatasource) IncrementalSync(
var joinedRoomIDs []string var joinedRoomIDs []string
var err error var err error
if toPos.PDUPosition != 0 { if fromPos.PDUPosition != toPos.PDUPosition {
joinedRoomIDs, err = d.addPDUDeltaToResponse( joinedRoomIDs, err = d.addPDUDeltaToResponse(
ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res, ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res,
) )

View file

@ -60,13 +60,17 @@ func NewNotifier(pos types.SyncPosition) *Notifier {
// called from a single goroutine, to avoid races between updates which could set the // called from a single goroutine, to avoid races between updates which could set the
// current position in the stream incorrectly. // current position in the stream incorrectly.
// Can be called with one among: a *gomatrixserverlib.Event, a room ID, or a list of user IDs. // Can be called with one among: a *gomatrixserverlib.Event, a room ID, or a list of user IDs.
// If a position in pos is 0, it means no updates available of that type. // posUpdate contains the latest position(s) for one or more types of events.
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userIDs []string, pos types.SyncPosition) { // If a position in posUpdate is 0, it means no updates available of that type.
// Typically a consumer supplies a posUpdate with and only with the sync position
// for the type of event it handles to the latest.
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userIDs []string, posUpdate types.SyncPosition) {
// update the current position then notify relevant /sync streams. // update the current position then notify relevant /sync streams.
// This needs to be done PRIOR to waking up users as they will read this value. // This needs to be done PRIOR to waking up users as they will read this value.
n.streamLock.Lock() n.streamLock.Lock()
defer n.streamLock.Unlock() defer n.streamLock.Unlock()
n.currPos = n.currPos.WithUpdates(pos) latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos
n.removeEmptyUserStreams() n.removeEmptyUserStreams()
@ -99,11 +103,11 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userID
} }
} }
n.wakeupUsers(usersToNotify, pos) n.wakeupUsers(usersToNotify, latestPos)
} else if roomID != "" { } else if roomID != "" {
n.wakeupUsers(n.joinedUsers(roomID), pos) n.wakeupUsers(n.joinedUsers(roomID), latestPos)
} else if len(userIDs) > 0 { } else if len(userIDs) > 0 {
n.wakeupUsers(userIDs, pos) n.wakeupUsers(userIDs, latestPos)
} else { } else {
log.Warn("WARNING: Notifier.OnNewEvent called but caller supplied no user to wake up") log.Warn("WARNING: Notifier.OnNewEvent called but caller supplied no user to wake up")
} }

View file

@ -130,19 +130,19 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
} }
} }
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.SyncPosition) (res *types.Response, err error) { func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncPosition) (res *types.Response, err error) {
// TODO: handle ignored users // TODO: handle ignored users
if req.since == nil { if req.since == nil {
res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit)
} else { } else {
res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, req.limit) res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit)
} }
if err != nil { if err != nil {
return return
} }
res, err = rp.appendAccountData(res, req.device.UserID, req, currentPos.PDUPosition) res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition)
return return
} }