diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index 46de8ad29..51ad1897f 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -50,7 +50,8 @@ func NewSyncStreamProviders( }, PresenceDataStreamProvider: &PresenceStreamProvider{ StreamProvider: StreamProvider{DB: d}, - UserAPI: userAPI, + userAPI: userAPI, + rsAPI: rsAPI, }, DeviceListStreamProvider: &DeviceListStreamProvider{ PartitionedStreamProvider: PartitionedStreamProvider{DB: d}, diff --git a/syncapi/streams/streams_presence.go b/syncapi/streams/streams_presence.go index 4db542d11..076aaf957 100644 --- a/syncapi/streams/streams_presence.go +++ b/syncapi/streams/streams_presence.go @@ -5,6 +5,7 @@ import ( "encoding/json" "time" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -12,14 +13,15 @@ import ( type PresenceStreamProvider struct { StreamProvider - UserAPI userapi.UserInternalAPI + userAPI userapi.UserInternalAPI + rsAPI api.RoomserverInternalAPI } func (p *PresenceStreamProvider) Setup() { p.StreamProvider.Setup() res := userapi.QueryMaxPresenceIDResponse{} - if err := p.UserAPI.QueryMaxPresenceID(context.Background(), &userapi.QueryMaxPresenceIDRequest{}, &res); err != nil { + if err := p.userAPI.QueryMaxPresenceID(context.Background(), &userapi.QueryMaxPresenceIDRequest{}, &res); err != nil { panic(err) } p.latest = types.StreamPosition(res.ID) @@ -39,7 +41,7 @@ type outputPresence struct { func (p *PresenceStreamProvider) IncrementalSync(ctx context.Context, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition { res := userapi.QueryPresenceAfterResponse{} - if err := p.UserAPI.QueryPresenceAfter(ctx, &userapi.QueryPresenceAfterRequest{StreamPos: int64(from)}, &res); err != nil { + if err := p.userAPI.QueryPresenceAfter(ctx, &userapi.QueryPresenceAfterRequest{StreamPos: int64(from)}, &res); err != nil { req.Log.WithError(err).Error("unable to fetch presence after") return from } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index eee453e5c..d55caeb9a 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -150,7 +150,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. activeSyncRequests.Inc() defer activeSyncRequests.Dec() - rp.updatePresence(req, device) + defer rp.updatePresence(req, device) rp.updateLastSeen(req, device) waitingSyncRequests.Inc() @@ -273,7 +273,7 @@ func (rp *RequestPool) updatePresence(req *http.Request, device *userapi.Device) Presence: types2.ToPresenceStatus(presence), LastActiveTS: time.Now().Unix(), } - go rp.userAPI.InputPresenceData(req.Context(), pReq, &userapi.InputPresenceResponse{}) // nolint:errcheck + rp.userAPI.InputPresenceData(req.Context(), pReq, &userapi.InputPresenceResponse{}) // nolint:errcheck }