mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-06 13:43:09 -06:00
Tweak ordering in OnIncomingSyncRequest
This commit is contained in:
parent
d9e71b93b6
commit
3d4985ef10
|
|
@ -105,7 +105,7 @@ func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Durat
|
||||||
rp.presence.Range(func(key interface{}, v interface{}) bool {
|
rp.presence.Range(func(key interface{}, v interface{}) bool {
|
||||||
p := v.(types.PresenceInternal)
|
p := v.(types.PresenceInternal)
|
||||||
if time.Since(p.LastActiveTS.Time()) > cleanupTime {
|
if time.Since(p.LastActiveTS.Time()) > cleanupTime {
|
||||||
rp.updatePresence(db, types.PresenceUnavailable.String(), p.UserID)
|
rp.updatePresence(context.Background(), db, types.PresenceUnavailable.String(), p.UserID)
|
||||||
rp.presence.Delete(key)
|
rp.presence.Delete(key)
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
|
@ -115,7 +115,7 @@ func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Durat
|
||||||
}
|
}
|
||||||
|
|
||||||
// updatePresence sends presence updates to the SyncAPI and FederationAPI
|
// updatePresence sends presence updates to the SyncAPI and FederationAPI
|
||||||
func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) {
|
func (rp *RequestPool) updatePresence(ctx context.Context, db storage.Presence, presence string, userID string) {
|
||||||
if !rp.cfg.Matrix.Presence.EnableOutbound {
|
if !rp.cfg.Matrix.Presence.EnableOutbound {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -135,7 +135,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure we also send the current status_msg to federated servers and not nil
|
// ensure we also send the current status_msg to federated servers and not nil
|
||||||
dbPresence, err := db.GetPresence(context.Background(), userID)
|
dbPresence, err := db.GetPresence(ctx, userID)
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -212,6 +212,8 @@ var waitingSyncRequests = prometheus.NewGauge(
|
||||||
// called in a dedicated goroutine for this request. This function will block the goroutine
|
// called in a dedicated goroutine for this request. This function will block the goroutine
|
||||||
// until a response is ready, or it times out.
|
// until a response is ready, or it times out.
|
||||||
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
|
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||||
|
currentPos := rp.Notifier.CurrentPosition()
|
||||||
|
|
||||||
// Extract values from request
|
// Extract values from request
|
||||||
syncReq, err := newSyncRequest(req, *device, rp.db)
|
syncReq, err := newSyncRequest(req, *device, rp.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -230,15 +232,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
activeSyncRequests.Inc()
|
activeSyncRequests.Inc()
|
||||||
defer activeSyncRequests.Dec()
|
defer activeSyncRequests.Dec()
|
||||||
|
|
||||||
rp.updateLastSeen(req, device)
|
|
||||||
rp.updatePresence(rp.db, req.FormValue("set_presence"), device.UserID)
|
|
||||||
|
|
||||||
waitingSyncRequests.Inc()
|
|
||||||
defer waitingSyncRequests.Dec()
|
|
||||||
|
|
||||||
currentPos := rp.Notifier.CurrentPosition()
|
|
||||||
|
|
||||||
if !rp.shouldReturnImmediately(syncReq, currentPos) {
|
if !rp.shouldReturnImmediately(syncReq, currentPos) {
|
||||||
|
waitingSyncRequests.Inc()
|
||||||
|
defer waitingSyncRequests.Dec()
|
||||||
|
|
||||||
timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
|
timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
|
|
||||||
|
|
@ -341,6 +338,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rp.updateLastSeen(req, device)
|
||||||
|
rp.updatePresence(req.Context(), rp.db, req.FormValue("set_presence"), device.UserID)
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: syncReq.Response,
|
JSON: syncReq.Response,
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,7 @@ func TestRequestPool_updatePresence(t *testing.T) {
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
beforeCount := publisher.count
|
beforeCount := publisher.count
|
||||||
rp.updatePresence(db, tt.args.presence, tt.args.userID)
|
rp.updatePresence(context.Background(), db, tt.args.presence, tt.args.userID)
|
||||||
if tt.wantIncrease && publisher.count <= beforeCount {
|
if tt.wantIncrease && publisher.count <= beforeCount {
|
||||||
t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount)
|
t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue