Use the stream positions of the notifier (#2768)

Use the stream positions of the notifier, which might have advanced
since setting it at the beginning of the loop. This possibly helps in
reducing roundtrips to the SyncAPI, just because we didn't fetch the
latest data.
Also fixes a minor oversight in the receipts stream.
This commit is contained in:
Till 2022-10-06 12:57:13 +02:00 committed by GitHub
parent d605d928bc
commit 8c5b166784
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 12 deletions

View file

@ -77,9 +77,9 @@ func (p *ReceiptStreamProvider) IncrementalSync(
continue continue
} }
jr := types.NewJoinResponse() jr, ok := req.Response.Rooms.Join[roomID]
if existing, ok := req.Response.Rooms.Join[roomID]; ok { if !ok {
jr = existing jr = types.NewJoinResponse()
} }
ev := gomatrixserverlib.ClientEvent{ ev := gomatrixserverlib.ClientEvent{

View file

@ -407,7 +407,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition { func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.PDUStreamProvider.IncrementalSync( return rp.streams.PDUStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq, syncReq.Context, txn, syncReq,
syncReq.Since.PDUPosition, currentPos.PDUPosition, syncReq.Since.PDUPosition, rp.Notifier.CurrentPosition().PDUPosition,
) )
}, },
), ),
@ -416,7 +416,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition { func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.TypingStreamProvider.IncrementalSync( return rp.streams.TypingStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq, syncReq.Context, txn, syncReq,
syncReq.Since.TypingPosition, currentPos.TypingPosition, syncReq.Since.TypingPosition, rp.Notifier.CurrentPosition().TypingPosition,
) )
}, },
), ),
@ -425,7 +425,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition { func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.ReceiptStreamProvider.IncrementalSync( return rp.streams.ReceiptStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq, syncReq.Context, txn, syncReq,
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition, syncReq.Since.ReceiptPosition, rp.Notifier.CurrentPosition().ReceiptPosition,
) )
}, },
), ),
@ -434,7 +434,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition { func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.InviteStreamProvider.IncrementalSync( return rp.streams.InviteStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq, syncReq.Context, txn, syncReq,
syncReq.Since.InvitePosition, currentPos.InvitePosition, syncReq.Since.InvitePosition, rp.Notifier.CurrentPosition().InvitePosition,
) )
}, },
), ),
@ -443,7 +443,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition { func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.SendToDeviceStreamProvider.IncrementalSync( return rp.streams.SendToDeviceStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq, syncReq.Context, txn, syncReq,
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition, syncReq.Since.SendToDevicePosition, rp.Notifier.CurrentPosition().SendToDevicePosition,
) )
}, },
), ),
@ -452,7 +452,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition { func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.AccountDataStreamProvider.IncrementalSync( return rp.streams.AccountDataStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq, syncReq.Context, txn, syncReq,
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition, syncReq.Since.AccountDataPosition, rp.Notifier.CurrentPosition().AccountDataPosition,
) )
}, },
), ),
@ -461,7 +461,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition { func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.NotificationDataStreamProvider.IncrementalSync( return rp.streams.NotificationDataStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq, syncReq.Context, txn, syncReq,
syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition, syncReq.Since.NotificationDataPosition, rp.Notifier.CurrentPosition().NotificationDataPosition,
) )
}, },
), ),
@ -470,7 +470,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition { func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.DeviceListStreamProvider.IncrementalSync( return rp.streams.DeviceListStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq, syncReq.Context, txn, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition, syncReq.Since.DeviceListPosition, rp.Notifier.CurrentPosition().DeviceListPosition,
) )
}, },
), ),
@ -479,7 +479,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition { func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.PresenceStreamProvider.IncrementalSync( return rp.streams.PresenceStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq, syncReq.Context, txn, syncReq,
syncReq.Since.PresencePosition, currentPos.PresencePosition, syncReq.Since.PresencePosition, rp.Notifier.CurrentPosition().PresencePosition,
) )
}, },
), ),