diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index ac77efebc..062344a34 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -331,7 +331,8 @@ func (d *SyncServerDatasource) IncrementalSync( fromPos, toPos types.SyncPosition, numRecentEventsPerRoom int, ) (*types.Response, error) { - res := types.NewResponse(toPos) + nextBatchPos := fromPos.WithUpdates(toPos) + res := types.NewResponse(nextBatchPos) var joinedRoomIDs []string var err error diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 2f4d2cde7..b182e86ee 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -66,7 +66,7 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userID // This needs to be done PRIOR to waking up users as they will read this value. n.streamLock.Lock() defer n.streamLock.Unlock() - n.currPos = pos + n.currPos = n.currPos.WithUpdates(pos) n.removeEmptyUserStreams() diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 6b1d923e6..5ecede5c7 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -41,6 +41,19 @@ func (sp SyncPosition) IsAfter(other SyncPosition) bool { sp.TypingPosition > other.TypingPosition } +// WithUpdates returns a copy of sp with updates represented by other applied. +// If a fieldn is not 0 in other, it is considered an update. +func (sp SyncPosition) WithUpdates(other SyncPosition) SyncPosition { + ret := sp + if other.PDUPosition != 0 { + ret.PDUPosition = other.PDUPosition + } + if other.TypingPosition != 0 { + ret.TypingPosition = other.TypingPosition + } + return ret +} + // PrevEventRef represents a reference to a previous event in a state event upgrade type PrevEventRef struct { PrevContent json.RawMessage `json:"prev_content"`