|
|
|
@ -397,13 +397,19 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// Incremental sync
|
|
|
|
|
reasonablePositions := findReasonableIncrementalSyncWindow(
|
|
|
|
|
syncReq.Since, rp.Notifier.CurrentPosition(),
|
|
|
|
|
)
|
|
|
|
|
// Also update the currentPos, which is used for the retry logic below.
|
|
|
|
|
// Otherwise we may skip over some events.
|
|
|
|
|
currentPos = reasonablePositions
|
|
|
|
|
syncReq.Response.NextBatch = types.StreamingToken{
|
|
|
|
|
PDUPosition: withTransaction(
|
|
|
|
|
syncReq.Since.PDUPosition,
|
|
|
|
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
|
|
|
|
return rp.streams.PDUStreamProvider.IncrementalSync(
|
|
|
|
|
syncReq.Context, txn, syncReq,
|
|
|
|
|
syncReq.Since.PDUPosition, rp.Notifier.CurrentPosition().PDUPosition,
|
|
|
|
|
syncReq.Since.PDUPosition, reasonablePositions.PDUPosition,
|
|
|
|
|
)
|
|
|
|
|
},
|
|
|
|
|
),
|
|
|
|
@ -412,7 +418,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|
|
|
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
|
|
|
|
return rp.streams.TypingStreamProvider.IncrementalSync(
|
|
|
|
|
syncReq.Context, txn, syncReq,
|
|
|
|
|
syncReq.Since.TypingPosition, rp.Notifier.CurrentPosition().TypingPosition,
|
|
|
|
|
syncReq.Since.TypingPosition, reasonablePositions.TypingPosition,
|
|
|
|
|
)
|
|
|
|
|
},
|
|
|
|
|
),
|
|
|
|
@ -421,7 +427,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|
|
|
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
|
|
|
|
return rp.streams.ReceiptStreamProvider.IncrementalSync(
|
|
|
|
|
syncReq.Context, txn, syncReq,
|
|
|
|
|
syncReq.Since.ReceiptPosition, rp.Notifier.CurrentPosition().ReceiptPosition,
|
|
|
|
|
syncReq.Since.ReceiptPosition, reasonablePositions.ReceiptPosition,
|
|
|
|
|
)
|
|
|
|
|
},
|
|
|
|
|
),
|
|
|
|
@ -430,7 +436,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|
|
|
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
|
|
|
|
return rp.streams.InviteStreamProvider.IncrementalSync(
|
|
|
|
|
syncReq.Context, txn, syncReq,
|
|
|
|
|
syncReq.Since.InvitePosition, rp.Notifier.CurrentPosition().InvitePosition,
|
|
|
|
|
syncReq.Since.InvitePosition, reasonablePositions.InvitePosition,
|
|
|
|
|
)
|
|
|
|
|
},
|
|
|
|
|
),
|
|
|
|
@ -439,7 +445,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|
|
|
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
|
|
|
|
return rp.streams.SendToDeviceStreamProvider.IncrementalSync(
|
|
|
|
|
syncReq.Context, txn, syncReq,
|
|
|
|
|
syncReq.Since.SendToDevicePosition, rp.Notifier.CurrentPosition().SendToDevicePosition,
|
|
|
|
|
syncReq.Since.SendToDevicePosition, reasonablePositions.SendToDevicePosition,
|
|
|
|
|
)
|
|
|
|
|
},
|
|
|
|
|
),
|
|
|
|
@ -448,7 +454,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|
|
|
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
|
|
|
|
return rp.streams.AccountDataStreamProvider.IncrementalSync(
|
|
|
|
|
syncReq.Context, txn, syncReq,
|
|
|
|
|
syncReq.Since.AccountDataPosition, rp.Notifier.CurrentPosition().AccountDataPosition,
|
|
|
|
|
syncReq.Since.AccountDataPosition, reasonablePositions.AccountDataPosition,
|
|
|
|
|
)
|
|
|
|
|
},
|
|
|
|
|
),
|
|
|
|
@ -457,7 +463,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|
|
|
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
|
|
|
|
return rp.streams.NotificationDataStreamProvider.IncrementalSync(
|
|
|
|
|
syncReq.Context, txn, syncReq,
|
|
|
|
|
syncReq.Since.NotificationDataPosition, rp.Notifier.CurrentPosition().NotificationDataPosition,
|
|
|
|
|
syncReq.Since.NotificationDataPosition, reasonablePositions.NotificationDataPosition,
|
|
|
|
|
)
|
|
|
|
|
},
|
|
|
|
|
),
|
|
|
|
@ -466,7 +472,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|
|
|
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
|
|
|
|
return rp.streams.DeviceListStreamProvider.IncrementalSync(
|
|
|
|
|
syncReq.Context, txn, syncReq,
|
|
|
|
|
syncReq.Since.DeviceListPosition, rp.Notifier.CurrentPosition().DeviceListPosition,
|
|
|
|
|
syncReq.Since.DeviceListPosition, reasonablePositions.DeviceListPosition,
|
|
|
|
|
)
|
|
|
|
|
},
|
|
|
|
|
),
|
|
|
|
@ -475,7 +481,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|
|
|
|
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
|
|
|
|
return rp.streams.PresenceStreamProvider.IncrementalSync(
|
|
|
|
|
syncReq.Context, txn, syncReq,
|
|
|
|
|
syncReq.Since.PresencePosition, rp.Notifier.CurrentPosition().PresencePosition,
|
|
|
|
|
syncReq.Since.PresencePosition, reasonablePositions.PresencePosition,
|
|
|
|
|
)
|
|
|
|
|
},
|
|
|
|
|
),
|
|
|
|
@ -585,3 +591,23 @@ func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest, curre
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func findReasonableIncrementalSyncWindow(since, limit types.StreamingToken) types.StreamingToken {
|
|
|
|
|
const windowSize = 100 // TODO: reasonable number?
|
|
|
|
|
for s, l := range map[*types.StreamPosition]types.StreamPosition{
|
|
|
|
|
&since.AccountDataPosition: limit.AccountDataPosition,
|
|
|
|
|
&since.DeviceListPosition: limit.DeviceListPosition,
|
|
|
|
|
&since.InvitePosition: limit.InvitePosition,
|
|
|
|
|
&since.NotificationDataPosition: limit.NotificationDataPosition,
|
|
|
|
|
&since.PDUPosition: limit.PDUPosition,
|
|
|
|
|
&since.PresencePosition: limit.PresencePosition,
|
|
|
|
|
&since.ReceiptPosition: limit.ReceiptPosition,
|
|
|
|
|
&since.SendToDevicePosition: limit.SendToDevicePosition,
|
|
|
|
|
&since.TypingPosition: limit.TypingPosition,
|
|
|
|
|
} {
|
|
|
|
|
if *s += windowSize; *s > l {
|
|
|
|
|
*s = l
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return since
|
|
|
|
|
}
|
|
|
|
|