Use sync position supplied from the user stream listener rather than racing with the current position

This commit is contained in:
Neil Alexander 2022-05-05 13:43:34 +01:00
parent 42f35a57ac
commit 16667fd5aa
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -212,6 +212,8 @@ var waitingSyncRequests = prometheus.NewGauge(
// called in a dedicated goroutine for this request. This function will block the goroutine // called in a dedicated goroutine for this request. This function will block the goroutine
// until a response is ready, or it times out. // until a response is ready, or it times out.
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.Device) util.JSONResponse { func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
currentPos := rp.Notifier.CurrentPosition()
// Extract values from request // Extract values from request
syncReq, err := newSyncRequest(req, *device, rp.db) syncReq, err := newSyncRequest(req, *device, rp.db)
if err != nil { if err != nil {
@ -236,8 +238,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
waitingSyncRequests.Inc() waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec() defer waitingSyncRequests.Dec()
currentPos := rp.Notifier.CurrentPosition()
if !rp.shouldReturnImmediately(syncReq, currentPos) { if !rp.shouldReturnImmediately(syncReq, currentPos) {
timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
defer timer.Stop() defer timer.Stop()
@ -262,7 +262,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
case <-userStreamListener.GetNotifyChannel(syncReq.Since): case <-userStreamListener.GetNotifyChannel(syncReq.Since):
syncReq.Log.Debugln("Responding to sync after wake-up") syncReq.Log.Debugln("Responding to sync after wake-up")
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition()) currentPos = userStreamListener.GetSyncPosition()
} }
} else { } else {
syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately") syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")