mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-20 21:33:19 -06:00
Update sync position
This commit is contained in:
parent
5487edf1d6
commit
cc456240fa
|
|
@ -1070,18 +1070,20 @@ func (d *Database) SendToDeviceUpdatesForSync(
|
||||||
return nil, fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err)
|
return nil, fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start by cleaning up any send-to-device messages that have older sent-by-tokens.
|
// Work out whether we need to update any of the database entries.
|
||||||
// This means that they were sent in a previous /sync and the client has happily
|
|
||||||
// progressed onto newer sync tokens.
|
|
||||||
toUpdate := []types.SendToDeviceNID{}
|
toUpdate := []types.SendToDeviceNID{}
|
||||||
toDelete := []types.SendToDeviceNID{}
|
toDelete := []types.SendToDeviceNID{}
|
||||||
for pos, event := range events {
|
for pos, event := range events {
|
||||||
if event.SentByToken == nil {
|
if event.SentByToken == nil {
|
||||||
// Mark the event for update and keep it in our list of return events.
|
// If the event has no sent-by token yet then we haven't attempted to send
|
||||||
|
// it. Record the current requested sync token in the database.
|
||||||
toUpdate = append(toUpdate, event.ID)
|
toUpdate = append(toUpdate, event.ID)
|
||||||
event.SentByToken = &token
|
event.SentByToken = &token
|
||||||
} else if token.IsAfter(*event.SentByToken) {
|
} else if token.IsAfter(*event.SentByToken) {
|
||||||
// Mark the event for deletion and remove it from our list of return events.
|
// The event had a sync token, therefore we've sent it before. The current
|
||||||
|
// sync token is now after the stored one so we can assume that the client
|
||||||
|
// successfully completed the previous sync (it would re-request it otherwise)
|
||||||
|
// so we can remove the entry from the database.
|
||||||
toDelete = append(toDelete, event.ID)
|
toDelete = append(toDelete, event.ID)
|
||||||
events = append(events[:pos], events[pos+1:]...)
|
events = append(events[:pos], events[pos+1:]...)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -245,6 +245,11 @@ func (rp *RequestPool) appendAccountData(
|
||||||
func (rp *RequestPool) appendSendToDeviceMessages(
|
func (rp *RequestPool) appendSendToDeviceMessages(
|
||||||
data *types.Response, userID string, req syncRequest, currentPos types.StreamingToken,
|
data *types.Response, userID string, req syncRequest, currentPos types.StreamingToken,
|
||||||
) (*types.Response, error) {
|
) (*types.Response, error) {
|
||||||
|
nextPos, err := types.NewStreamTokenFromString(data.NextBatch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
events, err := rp.db.SendToDeviceUpdatesForSync(
|
events, err := rp.db.SendToDeviceUpdatesForSync(
|
||||||
context.TODO(),
|
context.TODO(),
|
||||||
userID,
|
userID,
|
||||||
|
|
@ -257,7 +262,9 @@ func (rp *RequestPool) appendSendToDeviceMessages(
|
||||||
|
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
data.ToDevice.Events = append(data.ToDevice.Events, event.SendToDeviceEvent)
|
data.ToDevice.Events = append(data.ToDevice.Events, event.SendToDeviceEvent)
|
||||||
|
nextPos.Positions[1]++
|
||||||
}
|
}
|
||||||
|
data.NextBatch = nextPos.String()
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue