syncapi: don't return early for no-op incremental syncs (#2473)

* syncapi: don't return early for no-op incremental syncs

Comments explain why, but basically it's an inefficient use
of bandwidth and some sytests rely on /sync to block.

* Honour timeouts

* Actually return a response with timeout=0
This commit is contained in:
kegsay 2022-05-19 09:00:56 +01:00 committed by GitHub
parent f321a7d55e
commit 21dd5a7176
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 149 additions and 110 deletions

View file

@ -251,125 +251,151 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
waitingSyncRequests.Inc() waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec() defer waitingSyncRequests.Dec()
currentPos := rp.Notifier.CurrentPosition() // loop until we get some data
for {
startTime := time.Now()
currentPos := rp.Notifier.CurrentPosition()
if !rp.shouldReturnImmediately(syncReq, currentPos) { // if the since token matches the current positions, wait via the notifier
timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above if !rp.shouldReturnImmediately(syncReq, currentPos) {
defer timer.Stop() timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
defer timer.Stop()
userStreamListener := rp.Notifier.GetListener(*syncReq) userStreamListener := rp.Notifier.GetListener(*syncReq)
defer userStreamListener.Close() defer userStreamListener.Close()
giveup := func() util.JSONResponse { giveup := func() util.JSONResponse {
syncReq.Log.Debugln("Responding to sync since client gave up or timeout was reached") syncReq.Log.Debugln("Responding to sync since client gave up or timeout was reached")
syncReq.Response.NextBatch = syncReq.Since syncReq.Response.NextBatch = syncReq.Since
// We should always try to include OTKs in sync responses, otherwise clients might upload keys // We should always try to include OTKs in sync responses, otherwise clients might upload keys
// even if that's not required. See also: // even if that's not required. See also:
// https://github.com/matrix-org/synapse/blob/29f06704b8871a44926f7c99e73cf4a978fb8e81/synapse/rest/client/sync.py#L276-L281 // https://github.com/matrix-org/synapse/blob/29f06704b8871a44926f7c99e73cf4a978fb8e81/synapse/rest/client/sync.py#L276-L281
// Only try to get OTKs if the context isn't already done. // Only try to get OTKs if the context isn't already done.
if syncReq.Context.Err() == nil { if syncReq.Context.Err() == nil {
err = internal.DeviceOTKCounts(syncReq.Context, rp.keyAPI, syncReq.Device.UserID, syncReq.Device.ID, syncReq.Response) err = internal.DeviceOTKCounts(syncReq.Context, rp.keyAPI, syncReq.Device.UserID, syncReq.Device.ID, syncReq.Response)
if err != nil && err != context.Canceled { if err != nil && err != context.Canceled {
syncReq.Log.WithError(err).Warn("failed to get OTK counts") syncReq.Log.WithError(err).Warn("failed to get OTK counts")
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncReq.Response,
} }
} }
return util.JSONResponse{
Code: http.StatusOK, select {
JSON: syncReq.Response, case <-syncReq.Context.Done(): // Caller gave up
return giveup()
case <-timer.C: // Timeout reached
return giveup()
case <-userStreamListener.GetNotifyChannel(syncReq.Since):
syncReq.Log.Debugln("Responding to sync after wake-up")
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
}
} else {
syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")
}
if syncReq.Since.IsEmpty() {
// Complete sync
syncReq.Response.NextBatch = types.StreamingToken{
PDUPosition: rp.streams.PDUStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
TypingPosition: rp.streams.TypingStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
InvitePosition: rp.streams.InviteStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
}
} else {
// Incremental sync
syncReq.Response.NextBatch = types.StreamingToken{
PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.PDUPosition, currentPos.PDUPosition,
),
TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.TypingPosition, currentPos.TypingPosition,
),
ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
),
InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.InvitePosition, currentPos.InvitePosition,
),
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
),
AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
),
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
),
DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
),
PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.PresencePosition, currentPos.PresencePosition,
),
}
// it's possible for there to be no updates for this user even though since < current pos,
// e.g busy servers with a quiet user. In this scenario, we don't want to return a no-op
// response immediately, so let's try this again but pretend they bumped their since token.
// If the incremental sync was processed very quickly then we expect the next loop to block
// with a notifier, but if things are slow it's entirely possible that currentPos is no
// longer the current position so we will hit this code path again. We need to do this and
// not return a no-op response because:
// - It's an inefficient use of bandwidth.
// - Some sytests which test 'waking up' sync rely on some sync requests to block, which
// they weren't always doing, resulting in flakey tests.
if !syncReq.Response.HasUpdates() {
syncReq.Since = currentPos
// do not loop again if the ?timeout= is 0 as that means "return immediately"
if syncReq.Timeout > 0 {
syncReq.Timeout = syncReq.Timeout - time.Since(startTime)
if syncReq.Timeout < 0 {
syncReq.Timeout = 0
}
continue
}
} }
} }
select { return util.JSONResponse{
case <-syncReq.Context.Done(): // Caller gave up Code: http.StatusOK,
return giveup() JSON: syncReq.Response,
case <-timer.C: // Timeout reached
return giveup()
case <-userStreamListener.GetNotifyChannel(syncReq.Since):
syncReq.Log.Debugln("Responding to sync after wake-up")
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
} }
} else {
syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")
}
if syncReq.Since.IsEmpty() {
// Complete sync
syncReq.Response.NextBatch = types.StreamingToken{
PDUPosition: rp.streams.PDUStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
TypingPosition: rp.streams.TypingStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
InvitePosition: rp.streams.InviteStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
}
} else {
// Incremental sync
syncReq.Response.NextBatch = types.StreamingToken{
PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.PDUPosition, currentPos.PDUPosition,
),
TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.TypingPosition, currentPos.TypingPosition,
),
ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
),
InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.InvitePosition, currentPos.InvitePosition,
),
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
),
AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
),
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
),
DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
),
PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.PresencePosition, currentPos.PresencePosition,
),
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncReq.Response,
} }
} }

View file

@ -350,6 +350,19 @@ type Response struct {
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"` DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
} }
func (r *Response) HasUpdates() bool {
// purposefully exclude DeviceListsOTKCount as we always include them
return (len(r.AccountData.Events) > 0 ||
len(r.Presence.Events) > 0 ||
len(r.Rooms.Invite) > 0 ||
len(r.Rooms.Join) > 0 ||
len(r.Rooms.Leave) > 0 ||
len(r.Rooms.Peek) > 0 ||
len(r.ToDevice.Events) > 0 ||
len(r.DeviceLists.Changed) > 0 ||
len(r.DeviceLists.Left) > 0)
}
// NewResponse creates an empty response with initialised maps. // NewResponse creates an empty response with initialised maps.
func NewResponse() *Response { func NewResponse() *Response {
res := Response{} res := Response{}