diff --git a/clientapi/routing/presence.go b/clientapi/routing/presence.go index 076c7c571..83ff92e8c 100644 --- a/clientapi/routing/presence.go +++ b/clientapi/routing/presence.go @@ -108,6 +108,7 @@ func GetPresence( statusMsg := presence.Header.Get("status_msg") e := presence.Header.Get("error") if e != "" { + log.Errorf("received error msg from nats: %s", e) return util.JSONResponse{ Code: http.StatusOK, JSON: types.PresenceClientResponse{ diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go index 0f07076e6..f768d72cd 100644 --- a/syncapi/consumers/presence.go +++ b/syncapi/consumers/presence.go @@ -18,13 +18,13 @@ import ( "context" "strconv" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" @@ -41,7 +41,7 @@ type PresenceConsumer struct { db storage.Database stream types.StreamProvider notifier *notifier.Notifier - rsAPI api.RoomserverInternalAPI + deviceAPI api.UserDeviceAPI } // NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. @@ -54,7 +54,7 @@ func NewPresenceConsumer( db storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, - rsAPI api.RoomserverInternalAPI, + deviceAPI api.UserDeviceAPI, ) *PresenceConsumer { return &PresenceConsumer{ ctx: process.Context(), @@ -66,7 +66,7 @@ func NewPresenceConsumer( db: db, notifier: notifier, stream: stream, - rsAPI: rsAPI, + deviceAPI: deviceAPI, } } @@ -74,24 +74,41 @@ func NewPresenceConsumer( func (s *PresenceConsumer) Start() error { // Normal NATS subscription, used by Request/Reply _, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) { - presence, err := s.db.GetPresence(context.Background(), msg.Header.Get(jetstream.UserID)) + userID := msg.Header.Get(jetstream.UserID) + presence, err := s.db.GetPresence(context.Background(), userID) m := &nats.Msg{ Header: nats.Header{}, } if err != nil { m.Header.Set("error", err.Error()) if err = msg.RespondMsg(m); err != nil { - return + logrus.WithError(err).Error("Unable to respond to messages") } return } + deviceRes := api.QueryDevicesResponse{} + if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil { + m.Header.Set("error", err.Error()) + if err = msg.RespondMsg(m); err != nil { + logrus.WithError(err).Error("Unable to respond to messages") + } + return + } + + for i := range deviceRes.Devices { + if int64(presence.LastActiveTS) < deviceRes.Devices[i].LastSeenTS { + presence.LastActiveTS = gomatrixserverlib.Timestamp(deviceRes.Devices[i].LastSeenTS) + } + } + m.Header.Set(jetstream.UserID, presence.UserID) m.Header.Set("presence", presence.ClientFields.Presence) m.Header.Set("status_msg", *presence.ClientFields.StatusMsg) m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS))) if err = msg.RespondMsg(m); err != nil { + logrus.WithError(err).Error("Unable to respond to messages") return } }) diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 0262178b2..babecbe32 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -107,7 +107,8 @@ func (p *PresenceStreamProvider) IncrementalSync( if !sharedUsers[presence.UserID] { continue } - pres, ok := p.cache.Load(req.Device.UserID + presence.UserID) + cacheKey := req.Device.UserID + req.Device.ID + presence.UserID + pres, ok := p.cache.Load(cacheKey) if ok { // skip already sent presence prevPresence := pres.(*types.Presence) @@ -137,7 +138,7 @@ func (p *PresenceStreamProvider) IncrementalSync( if presence.StreamPos > lastPos { lastPos = presence.StreamPos } - p.cache.Store(req.Device.UserID+presence.UserID, presence) + p.cache.Store(cacheKey, presence) } return lastPos diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index b45498eab..7fa943b3d 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -122,6 +122,7 @@ func (rp *RequestPool) updatePresence(presence string, userID string) { UserID: userID, LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()), } + defer rp.presence.Store(userID, newPresence) // avoid spamming presence updates when syncing existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence) if ok { @@ -135,8 +136,6 @@ func (rp *RequestPool) updatePresence(presence string, userID string) { logrus.WithError(err).Error("Unable to publish presence message from sync") return } - - rp.presence.Store(userID, newPresence) } func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) { diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 42c570469..c83f8d9d9 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -137,7 +137,7 @@ func AddPublicRoutes( presenceConsumer := consumers.NewPresenceConsumer( process, cfg, js, natsClient, syncDB, notifier, streams.PresenceStreamProvider, - rsAPI, + userAPI, ) if err = presenceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start presence consumer") diff --git a/userapi/api/api.go b/userapi/api/api.go index a9544f00d..b86774d14 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -31,12 +31,10 @@ type UserInternalAPI interface { UserRegisterAPI UserAccountAPI UserThreePIDAPI + UserDeviceAPI InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error - PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error - PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error - PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error PerformOpenIDTokenCreation(ctx context.Context, req *PerformOpenIDTokenCreationRequest, res *PerformOpenIDTokenCreationResponse) error PerformKeyBackup(ctx context.Context, req *PerformKeyBackupRequest, res *PerformKeyBackupResponse) error PerformPusherSet(ctx context.Context, req *PerformPusherSetRequest, res *struct{}) error @@ -45,15 +43,21 @@ type UserInternalAPI interface { QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse) QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error - QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error - QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error } +type UserDeviceAPI interface { + PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error + PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error + PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error + QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error + QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error +} + type UserDirectoryProvider interface { QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error }