From 642ac3e4ad1edcaf3399fed1cac9ae9603820f34 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Tue, 3 Aug 2021 15:32:34 +0200 Subject: [PATCH] Add Setup method to PresenceStreamProvider --- syncapi/streams/streams.go | 2 ++ syncapi/streams/streams_presence.go | 14 +++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index a181fcb31..48f19dcdb 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -66,6 +66,7 @@ func NewSyncStreamProviders( streams.SendToDeviceStreamProvider.Setup() streams.AccountDataStreamProvider.Setup() streams.DeviceListStreamProvider.Setup() + streams.PresenceDataStreamProdiver.Setup() return streams } @@ -79,5 +80,6 @@ func (s *Streams) Latest(ctx context.Context) types.StreamingToken { SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx), AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx), DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx), + PresenceDataPosition: s.PresenceDataStreamProdiver.LatestPosition(ctx), } } diff --git a/syncapi/streams/streams_presence.go b/syncapi/streams/streams_presence.go index 5989a0409..acc27cb13 100644 --- a/syncapi/streams/streams_presence.go +++ b/syncapi/streams/streams_presence.go @@ -15,8 +15,17 @@ type PresenceStreamProvider struct { UserAPI userapi.UserInternalAPI } +func (p *PresenceStreamProvider) Setup() { + p.StreamProvider.Setup() + + res := userapi.QueryMaxPresenceIDResponse{} + if err := p.UserAPI.QueryMaxPresenceID(context.Background(), &userapi.QueryMaxPresenceIDRequest{}, &res); err != nil { + panic(err) + } + p.latest = types.StreamPosition(res.ID) +} + func (p *PresenceStreamProvider) CompleteSync(ctx context.Context, req *types.SyncRequest) types.StreamPosition { - req.Log.Debug(" CompleteSyncrequested for presence!") return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) } @@ -34,6 +43,9 @@ func (p *PresenceStreamProvider) IncrementalSync(ctx context.Context, req *types req.Log.WithError(err).Error("unable to fetch presence after") return from } + if len(res.Presences) == 0 { + return to + } evs := []gomatrixserverlib.ClientEvent{} var maxPos int64 for _, presence := range res.Presences {