diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index e9db33061..7e0fa7094 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -23,7 +23,7 @@ func (p *AccountDataStreamProvider) Setup( p.latestMutex.Lock() defer p.latestMutex.Unlock() - id, err := snapshot.MaxStreamPositionForAccountData(context.Background()) + id, err := snapshot.MaxStreamPositionForAccountData(ctx) if err != nil { panic(err) } diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go index 029302262..08dccf63f 100644 --- a/syncapi/streams/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -25,7 +25,7 @@ func (p *InviteStreamProvider) Setup( p.latestMutex.Lock() defer p.latestMutex.Unlock() - id, err := snapshot.MaxStreamPositionForInvites(context.Background()) + id, err := snapshot.MaxStreamPositionForInvites(ctx) if err != nil { panic(err) } diff --git a/syncapi/streams/stream_notificationdata.go b/syncapi/streams/stream_notificationdata.go index 6944640e9..2535cce53 100644 --- a/syncapi/streams/stream_notificationdata.go +++ b/syncapi/streams/stream_notificationdata.go @@ -16,7 +16,10 @@ func (p *NotificationDataStreamProvider) Setup( ) { p.DefaultStreamProvider.Setup(ctx, snapshot) - id, err := snapshot.MaxStreamPositionForNotificationData(context.Background()) + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := snapshot.MaxStreamPositionForNotificationData(ctx) if err != nil { panic(err) } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 4779558bd..19b877d76 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -47,7 +47,7 @@ func (p *PDUStreamProvider) Setup( p.latestMutex.Lock() defer p.latestMutex.Unlock() - id, err := snapshot.MaxStreamPositionForPDUs(context.Background()) + id, err := snapshot.MaxStreamPositionForPDUs(ctx) if err != nil { panic(err) } diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index b2f9a0b47..a1e24f993 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -39,7 +39,10 @@ func (p *PresenceStreamProvider) Setup( ) { p.DefaultStreamProvider.Setup(ctx, snapshot) - id, err := snapshot.MaxStreamPositionForPresence(context.Background()) + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := snapshot.MaxStreamPositionForPresence(ctx) if err != nil { panic(err) } diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go index e7140b7e1..73554d678 100644 --- a/syncapi/streams/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -18,7 +18,10 @@ func (p *ReceiptStreamProvider) Setup( ) { p.DefaultStreamProvider.Setup(ctx, snapshot) - id, err := snapshot.MaxStreamPositionForReceipts(context.Background()) + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := snapshot.MaxStreamPositionForReceipts(ctx) if err != nil { panic(err) } diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go index 0bd2e2c6d..4132e2e15 100644 --- a/syncapi/streams/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -16,7 +16,10 @@ func (p *SendToDeviceStreamProvider) Setup( ) { p.DefaultStreamProvider.Setup(ctx, snapshot) - id, err := snapshot.MaxStreamPositionForSendToDeviceMessages(context.Background()) + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := snapshot.MaxStreamPositionForSendToDeviceMessages(ctx) if err != nil { panic(err) }