This commit is contained in:
Till Faelligen 2021-11-19 10:38:17 +01:00
parent 09573f2f8f
commit e2afaab50c
3 changed files with 7 additions and 7 deletions

View file

@ -18,7 +18,7 @@ type Streams struct {
InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider
PresenceDataStreamProdiver types.StreamProvider
PresenceDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.PartitionedStreamProvider
}
@ -48,7 +48,7 @@ func NewSyncStreamProviders(
StreamProvider: StreamProvider{DB: d},
userAPI: userAPI,
},
PresenceDataStreamProdiver: &PresenceStreamProvider{
PresenceDataStreamProvider: &PresenceStreamProvider{
StreamProvider: StreamProvider{DB: d},
UserAPI: userAPI,
},
@ -66,7 +66,7 @@ func NewSyncStreamProviders(
streams.SendToDeviceStreamProvider.Setup()
streams.AccountDataStreamProvider.Setup()
streams.DeviceListStreamProvider.Setup()
streams.PresenceDataStreamProdiver.Setup()
streams.PresenceDataStreamProvider.Setup()
return streams
}
@ -80,6 +80,6 @@ func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx),
AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
PresenceDataPosition: s.PresenceDataStreamProdiver.LatestPosition(ctx),
PresenceDataPosition: s.PresenceDataStreamProvider.LatestPosition(ctx),
}
}

View file

@ -210,7 +210,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
PresenceDataPosition: rp.streams.PresenceDataStreamProdiver.CompleteSync(
PresenceDataPosition: rp.streams.PresenceDataStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
}
@ -245,7 +245,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
),
PresenceDataPosition: rp.streams.PresenceDataStreamProdiver.IncrementalSync(
PresenceDataPosition: rp.streams.PresenceDataStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.PresenceDataPosition, currentPos.PresenceDataPosition,
),

View file

@ -109,7 +109,7 @@ func AddPublicRoutes(
}
presenceConsumer := consumers.NewOutputPresenceDataConsumer(
process, cfg, consumer, syncDB, notifier, streams.PresenceDataStreamProdiver,
process, cfg, consumer, syncDB, notifier, streams.PresenceDataStreamProvider,
)
if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start presence consumer")