Query devices for last_active_ts

Fixes & tweaks
This commit is contained in:
S7evinK 2022-04-01 09:41:02 +02:00
parent 237a539e5b
commit f2c82aaf74
6 changed files with 38 additions and 16 deletions

View file

@ -108,6 +108,7 @@ func GetPresence(
statusMsg := presence.Header.Get("status_msg")
e := presence.Header.Get("error")
if e != "" {
log.Errorf("received error msg from nats: %s", e)
return util.JSONResponse{
Code: http.StatusOK,
JSON: types.PresenceClientResponse{

View file

@ -18,13 +18,13 @@ import (
"context"
"strconv"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
@ -41,7 +41,7 @@ type PresenceConsumer struct {
db storage.Database
stream types.StreamProvider
notifier *notifier.Notifier
rsAPI api.RoomserverInternalAPI
deviceAPI api.UserDeviceAPI
}
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
@ -54,7 +54,7 @@ func NewPresenceConsumer(
db storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
rsAPI api.RoomserverInternalAPI,
deviceAPI api.UserDeviceAPI,
) *PresenceConsumer {
return &PresenceConsumer{
ctx: process.Context(),
@ -66,7 +66,7 @@ func NewPresenceConsumer(
db: db,
notifier: notifier,
stream: stream,
rsAPI: rsAPI,
deviceAPI: deviceAPI,
}
}
@ -74,24 +74,41 @@ func NewPresenceConsumer(
func (s *PresenceConsumer) Start() error {
// Normal NATS subscription, used by Request/Reply
_, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) {
presence, err := s.db.GetPresence(context.Background(), msg.Header.Get(jetstream.UserID))
userID := msg.Header.Get(jetstream.UserID)
presence, err := s.db.GetPresence(context.Background(), userID)
m := &nats.Msg{
Header: nats.Header{},
}
if err != nil {
m.Header.Set("error", err.Error())
if err = msg.RespondMsg(m); err != nil {
return
logrus.WithError(err).Error("Unable to respond to messages")
}
return
}
deviceRes := api.QueryDevicesResponse{}
if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil {
m.Header.Set("error", err.Error())
if err = msg.RespondMsg(m); err != nil {
logrus.WithError(err).Error("Unable to respond to messages")
}
return
}
for i := range deviceRes.Devices {
if int64(presence.LastActiveTS) < deviceRes.Devices[i].LastSeenTS {
presence.LastActiveTS = gomatrixserverlib.Timestamp(deviceRes.Devices[i].LastSeenTS)
}
}
m.Header.Set(jetstream.UserID, presence.UserID)
m.Header.Set("presence", presence.ClientFields.Presence)
m.Header.Set("status_msg", *presence.ClientFields.StatusMsg)
m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS)))
if err = msg.RespondMsg(m); err != nil {
logrus.WithError(err).Error("Unable to respond to messages")
return
}
})

View file

@ -107,7 +107,8 @@ func (p *PresenceStreamProvider) IncrementalSync(
if !sharedUsers[presence.UserID] {
continue
}
pres, ok := p.cache.Load(req.Device.UserID + presence.UserID)
cacheKey := req.Device.UserID + req.Device.ID + presence.UserID
pres, ok := p.cache.Load(cacheKey)
if ok {
// skip already sent presence
prevPresence := pres.(*types.Presence)
@ -137,7 +138,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
if presence.StreamPos > lastPos {
lastPos = presence.StreamPos
}
p.cache.Store(req.Device.UserID+presence.UserID, presence)
p.cache.Store(cacheKey, presence)
}
return lastPos

View file

@ -122,6 +122,7 @@ func (rp *RequestPool) updatePresence(presence string, userID string) {
UserID: userID,
LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
}
defer rp.presence.Store(userID, newPresence)
// avoid spamming presence updates when syncing
existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
if ok {
@ -135,8 +136,6 @@ func (rp *RequestPool) updatePresence(presence string, userID string) {
logrus.WithError(err).Error("Unable to publish presence message from sync")
return
}
rp.presence.Store(userID, newPresence)
}
func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) {

View file

@ -137,7 +137,7 @@ func AddPublicRoutes(
presenceConsumer := consumers.NewPresenceConsumer(
process, cfg, js, natsClient, syncDB,
notifier, streams.PresenceStreamProvider,
rsAPI,
userAPI,
)
if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start presence consumer")

View file

@ -31,12 +31,10 @@ type UserInternalAPI interface {
UserRegisterAPI
UserAccountAPI
UserThreePIDAPI
UserDeviceAPI
InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error
PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error
PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error
PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error
PerformOpenIDTokenCreation(ctx context.Context, req *PerformOpenIDTokenCreationRequest, res *PerformOpenIDTokenCreationResponse) error
PerformKeyBackup(ctx context.Context, req *PerformKeyBackupRequest, res *PerformKeyBackupResponse) error
PerformPusherSet(ctx context.Context, req *PerformPusherSetRequest, res *struct{}) error
@ -45,15 +43,21 @@ type UserInternalAPI interface {
QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse)
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error
QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error
QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error
QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error
QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error
QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error
QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error
}
type UserDeviceAPI interface {
PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error
PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error
PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error
QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error
QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error
}
type UserDirectoryProvider interface {
QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error
}