From 0634d9fed13d3c0d9a9440d08cb7403e45af0a28 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 11 Dec 2020 10:42:55 +0000 Subject: [PATCH] Fix positions, add ApplyUpdates --- syncapi/internal/keychange.go | 2 +- syncapi/storage/shared/syncserver.go | 6 +++-- syncapi/sync/requestpool.go | 2 +- syncapi/types/types.go | 35 ++++++++++++++++------------ 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index b0e56a465..f171e2899 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -134,7 +134,7 @@ func DeviceListCatchup( Partition: queryRes.Partition, Offset: queryRes.Offset, }) - res.NextBatch = res.NextBatch.WithUpdates(to) + res.NextBatch.ApplyUpdates(to) return hasNew, nil } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index b8cee13ec..69dcf4d11 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -575,6 +575,7 @@ func (d *Database) addTypingDeltaToResponse( jr = *types.NewJoinResponse() } jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) + res.NextBatch.TypingPosition++ res.Rooms.Join[roomID] = jr } } @@ -630,6 +631,7 @@ func (d *Database) addReceiptDeltaToResponse( } jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) + res.NextBatch.ReceiptPosition++ res.Rooms.Join[roomID] = jr } @@ -686,7 +688,7 @@ func (d *Database) IncrementalSync( numRecentEventsPerRoom int, wantFullState bool, ) (*types.Response, error) { - res.NextBatch = res.NextBatch.WithUpdates(toPos) + res.NextBatch = fromPos.WithUpdates(toPos) var joinedRoomIDs []string var err error @@ -778,7 +780,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( To: toPos.PDUPosition, } - res.NextBatch = res.NextBatch.WithUpdates(toPos) + res.NextBatch.ApplyUpdates(toPos) // Extract room state and recent events for all rooms the user is joined to. joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 5507085a8..1102f2e16 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -427,7 +427,7 @@ func (rp *RequestPool) appendAccountData( // or timeout=0, or full_state=true, in any of the cases the request should // return immediately. func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool { - if syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState { + if syncReq.since.IsEmpty() || syncReq.timeout == 0 || syncReq.wantFullState { return true } waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID) diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 3108a1033..1bd5aabf0 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -187,28 +187,33 @@ func (t *StreamingToken) IsEmpty() bool { // If the latter StreamingToken contains a field that is not 0, it is considered an update, // and its value will replace the corresponding value in the StreamingToken on which WithUpdates is called. // If the other token has a log, they will replace any existing log on this token. -func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) { - ret = *t +func (t *StreamingToken) WithUpdates(other StreamingToken) StreamingToken { + ret := *t + ret.ApplyUpdates(other) + return ret +} + +// ApplyUpdates applies any changes from the supplied StreamingToken. If the supplied +// streaming token contains any positions that are not 0, they are considered updates +// and will overwrite the value in the token. +func (t *StreamingToken) ApplyUpdates(other StreamingToken) { switch { case other.PDUPosition > 0: - ret.PDUPosition = other.PDUPosition + t.PDUPosition = other.PDUPosition case other.TypingPosition > 0: - ret.TypingPosition = other.TypingPosition + t.TypingPosition = other.TypingPosition case other.ReceiptPosition > 0: - ret.ReceiptPosition = other.ReceiptPosition + t.ReceiptPosition = other.ReceiptPosition case other.SendToDevicePosition > 0: - ret.SendToDevicePosition = other.SendToDevicePosition + t.SendToDevicePosition = other.SendToDevicePosition } - ret.Logs = make(map[string]*LogPosition) - for name := range t.Logs { - otherLog := other.Log(name) - if otherLog == nil { - continue - } - copy := *otherLog - ret.Logs[name] = © + if t.Logs == nil { + t.Logs = make(map[string]*LogPosition) + } + for name, pos := range other.Logs { + copy := *pos + t.Logs[name] = © } - return ret } type TopologyToken struct {