diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index e1833501b..44c0fbca5 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1070,18 +1070,20 @@ func (d *Database) SendToDeviceUpdatesForSync( return nil, fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err) } - // Start by cleaning up any send-to-device messages that have older sent-by-tokens. - // This means that they were sent in a previous /sync and the client has happily - // progressed onto newer sync tokens. + // Work out whether we need to update any of the database entries. toUpdate := []types.SendToDeviceNID{} toDelete := []types.SendToDeviceNID{} for pos, event := range events { if event.SentByToken == nil { - // Mark the event for update and keep it in our list of return events. + // If the event has no sent-by token yet then we haven't attempted to send + // it. Record the current requested sync token in the database. toUpdate = append(toUpdate, event.ID) event.SentByToken = &token } else if token.IsAfter(*event.SentByToken) { - // Mark the event for deletion and remove it from our list of return events. + // The event had a sync token, therefore we've sent it before. The current + // sync token is now after the stored one so we can assume that the client + // successfully completed the previous sync (it would re-request it otherwise) + // so we can remove the entry from the database. toDelete = append(toDelete, event.ID) events = append(events[:pos], events[pos+1:]...) } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index a2b361e64..9952825ed 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -245,6 +245,11 @@ func (rp *RequestPool) appendAccountData( func (rp *RequestPool) appendSendToDeviceMessages( data *types.Response, userID string, req syncRequest, currentPos types.StreamingToken, ) (*types.Response, error) { + nextPos, err := types.NewStreamTokenFromString(data.NextBatch) + if err != nil { + return nil, err + } + events, err := rp.db.SendToDeviceUpdatesForSync( context.TODO(), userID, @@ -257,7 +262,9 @@ func (rp *RequestPool) appendSendToDeviceMessages( for _, event := range events { data.ToDevice.Events = append(data.ToDevice.Events, event.SendToDeviceEvent) + nextPos.Positions[1]++ } + data.NextBatch = nextPos.String() return data, nil }