diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go index 76927cc36..bba911022 100644 --- a/syncapi/streams/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -77,9 +77,9 @@ func (p *ReceiptStreamProvider) IncrementalSync( continue } - jr := types.NewJoinResponse() - if existing, ok := req.Response.Rooms.Join[roomID]; ok { - jr = existing + jr, ok := req.Response.Rooms.Join[roomID] + if !ok { + jr = types.NewJoinResponse() } ev := gomatrixserverlib.ClientEvent{ diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index a71d32ab8..29d92b293 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -407,7 +407,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.PDUStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.PDUPosition, currentPos.PDUPosition, + syncReq.Since.PDUPosition, rp.Notifier.CurrentPosition().PDUPosition, ) }, ), @@ -416,7 +416,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.TypingStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.TypingPosition, currentPos.TypingPosition, + syncReq.Since.TypingPosition, rp.Notifier.CurrentPosition().TypingPosition, ) }, ), @@ -425,7 +425,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.ReceiptStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition, + syncReq.Since.ReceiptPosition, rp.Notifier.CurrentPosition().ReceiptPosition, ) }, ), @@ -434,7 +434,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.InviteStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.InvitePosition, currentPos.InvitePosition, + syncReq.Since.InvitePosition, rp.Notifier.CurrentPosition().InvitePosition, ) }, ), @@ -443,7 +443,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.SendToDeviceStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition, + syncReq.Since.SendToDevicePosition, rp.Notifier.CurrentPosition().SendToDevicePosition, ) }, ), @@ -452,7 +452,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.AccountDataStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition, + syncReq.Since.AccountDataPosition, rp.Notifier.CurrentPosition().AccountDataPosition, ) }, ), @@ -461,7 +461,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.NotificationDataStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition, + syncReq.Since.NotificationDataPosition, rp.Notifier.CurrentPosition().NotificationDataPosition, ) }, ), @@ -470,7 +470,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.DeviceListStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition, + syncReq.Since.DeviceListPosition, rp.Notifier.CurrentPosition().DeviceListPosition, ) }, ), @@ -479,7 +479,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.PresenceStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.PresencePosition, currentPos.PresencePosition, + syncReq.Since.PresencePosition, rp.Notifier.CurrentPosition().PresencePosition, ) }, ),