Limit fixes, amongst other things

This commit is contained in:
Neil Alexander 2021-01-07 16:59:35 +00:00
parent ac525fba47
commit 4506b50828
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 28 additions and 17 deletions

View file

@ -30,10 +30,14 @@ func (p *PDUStreamProvider) CompleteSync(
req *types.SyncRequest, req *types.SyncRequest,
) types.StreamPosition { ) types.StreamPosition {
to := p.LatestPosition(ctx) to := p.LatestPosition(ctx)
from := to - 20
if from < 0 {
from = 0
}
// Get the current sync position which we will base the sync response on. // Get the current sync position which we will base the sync response on.
r := types.Range{ r := types.Range{
From: 0, From: from,
To: to, To: to,
} }
@ -50,7 +54,7 @@ func (p *PDUStreamProvider) CompleteSync(
for _, roomID := range joinedRoomIDs { for _, roomID := range joinedRoomIDs {
var jr *types.JoinResponse var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync( jr, err = p.getJoinResponseForCompleteSync(
ctx, roomID, r, &stateFilter, 20, req.Device, ctx, roomID, r, &stateFilter, req.Limit, req.Device,
) )
if err != nil { if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -70,7 +74,7 @@ func (p *PDUStreamProvider) CompleteSync(
if !peek.Deleted { if !peek.Deleted {
var jr *types.JoinResponse var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync( jr, err = p.getJoinResponseForCompleteSync(
ctx, peek.RoomID, r, &stateFilter, 20, req.Device, ctx, peek.RoomID, r, &stateFilter, req.Limit, req.Device,
) )
if err != nil { if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -80,7 +84,7 @@ func (p *PDUStreamProvider) CompleteSync(
} }
} }
return p.LatestPosition(ctx) return to
} }
// nolint:gocyclo // nolint:gocyclo

View file

@ -76,6 +76,8 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
} }
} }
filter := gomatrixserverlib.DefaultEventFilter()
filter.Limit = timelineLimit
// TODO: Additional query params: set_presence, filter // TODO: Additional query params: set_presence, filter
logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{ logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{
@ -87,16 +89,16 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
}) })
return &types.SyncRequest{ return &types.SyncRequest{
Context: req.Context(), // Context: req.Context(), //
Log: logger, // Log: logger, //
Device: &device, // Device: &device, //
Response: types.NewResponse(), // Populated by all streams Response: types.NewResponse(), // Populated by all streams
Filter: gomatrixserverlib.DefaultEventFilter(), // Filter: filter, //
Since: since, // Since: since, //
Timeout: timeout, // Timeout: timeout, //
Limit: timelineLimit, // Limit: timelineLimit, //
Rooms: make(map[string]string), // Populated by the PDU stream Rooms: make(map[string]string), // Populated by the PDU stream
WantFullState: wantFullState, // WantFullState: wantFullState, //
}, nil }, nil
} }

View file

@ -176,15 +176,21 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
return giveup() return giveup()
case <-rp.streams.PDUStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.PDUPosition): case <-rp.streams.PDUStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.PDUPosition):
syncReq.Log.Debugln("Responding to sync after PDU")
case <-rp.streams.TypingStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.TypingPosition): case <-rp.streams.TypingStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.TypingPosition):
syncReq.Log.Debugln("Responding to sync after typing notification")
case <-rp.streams.ReceiptStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.ReceiptPosition): case <-rp.streams.ReceiptStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.ReceiptPosition):
syncReq.Log.Debugln("Responding to sync after read receipt")
case <-rp.streams.InviteStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.InvitePosition): case <-rp.streams.InviteStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.InvitePosition):
syncReq.Log.Debugln("Responding to sync after invite")
case <-rp.streams.SendToDeviceStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.SendToDevicePosition): case <-rp.streams.SendToDeviceStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.SendToDevicePosition):
syncReq.Log.Debugln("Responding to sync after send-to-device message")
case <-rp.streams.AccountDataStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.AccountDataPosition): case <-rp.streams.AccountDataStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.AccountDataPosition):
syncReq.Log.Debugln("Responding to sync after account data")
case <-rp.streams.DeviceListStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.DeviceListPosition): case <-rp.streams.DeviceListStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.DeviceListPosition):
syncReq.Log.Debugln("Responding to sync after device list update")
} }
syncReq.Log.Debugln("Responding to sync after wakeup")
waitcancel() waitcancel()
} else { } else {
syncReq.Log.Debugln("Responding to sync immediately") syncReq.Log.Debugln("Responding to sync immediately")
@ -311,6 +317,5 @@ func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest) bool
if syncReq.Since.IsEmpty() || syncReq.Timeout == 0 || syncReq.WantFullState { if syncReq.Since.IsEmpty() || syncReq.Timeout == 0 || syncReq.WantFullState {
return true return true
} }
waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.Device.UserID, syncReq.Device.ID) return false
return werr == nil && waiting
} }