diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 9d293b4b7..7bfd94891 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -30,10 +30,14 @@ func (p *PDUStreamProvider) CompleteSync( req *types.SyncRequest, ) types.StreamPosition { to := p.LatestPosition(ctx) + from := to - 20 + if from < 0 { + from = 0 + } // Get the current sync position which we will base the sync response on. r := types.Range{ - From: 0, + From: from, To: to, } @@ -50,7 +54,7 @@ func (p *PDUStreamProvider) CompleteSync( for _, roomID := range joinedRoomIDs { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, roomID, r, &stateFilter, 20, req.Device, + ctx, roomID, r, &stateFilter, req.Limit, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -70,7 +74,7 @@ func (p *PDUStreamProvider) CompleteSync( if !peek.Deleted { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, peek.RoomID, r, &stateFilter, 20, req.Device, + ctx, peek.RoomID, r, &stateFilter, req.Limit, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -80,7 +84,7 @@ func (p *PDUStreamProvider) CompleteSync( } } - return p.LatestPosition(ctx) + return to } // nolint:gocyclo diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index dba5fff7f..5f89ffc33 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -76,6 +76,8 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat } } + filter := gomatrixserverlib.DefaultEventFilter() + filter.Limit = timelineLimit // TODO: Additional query params: set_presence, filter logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{ @@ -87,16 +89,16 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat }) return &types.SyncRequest{ - Context: req.Context(), // - Log: logger, // - Device: &device, // - Response: types.NewResponse(), // Populated by all streams - Filter: gomatrixserverlib.DefaultEventFilter(), // - Since: since, // - Timeout: timeout, // - Limit: timelineLimit, // - Rooms: make(map[string]string), // Populated by the PDU stream - WantFullState: wantFullState, // + Context: req.Context(), // + Log: logger, // + Device: &device, // + Response: types.NewResponse(), // Populated by all streams + Filter: filter, // + Since: since, // + Timeout: timeout, // + Limit: timelineLimit, // + Rooms: make(map[string]string), // Populated by the PDU stream + WantFullState: wantFullState, // }, nil } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index c2f80b7f4..43ac01ec5 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -176,15 +176,21 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. return giveup() case <-rp.streams.PDUStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.PDUPosition): + syncReq.Log.Debugln("Responding to sync after PDU") case <-rp.streams.TypingStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.TypingPosition): + syncReq.Log.Debugln("Responding to sync after typing notification") case <-rp.streams.ReceiptStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.ReceiptPosition): + syncReq.Log.Debugln("Responding to sync after read receipt") case <-rp.streams.InviteStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.InvitePosition): + syncReq.Log.Debugln("Responding to sync after invite") case <-rp.streams.SendToDeviceStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.SendToDevicePosition): + syncReq.Log.Debugln("Responding to sync after send-to-device message") case <-rp.streams.AccountDataStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.AccountDataPosition): + syncReq.Log.Debugln("Responding to sync after account data") case <-rp.streams.DeviceListStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.DeviceListPosition): + syncReq.Log.Debugln("Responding to sync after device list update") } - syncReq.Log.Debugln("Responding to sync after wakeup") waitcancel() } else { syncReq.Log.Debugln("Responding to sync immediately") @@ -311,6 +317,5 @@ func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest) bool if syncReq.Since.IsEmpty() || syncReq.Timeout == 0 || syncReq.WantFullState { return true } - waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.Device.UserID, syncReq.Device.ID) - return werr == nil && waiting + return false }