From f6f70150fb2b4152da14874c32611b68a62bd9be Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Fri, 21 Oct 2022 12:51:24 +0200 Subject: [PATCH] Only send presence to joined hosts of the user --- federationapi/consumers/presence.go | 34 ++++++++++++++++++++++------- federationapi/federationapi.go | 2 +- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/federationapi/consumers/presence.go b/federationapi/consumers/presence.go index 3445d34a9..ccd70d916 100644 --- a/federationapi/consumers/presence.go +++ b/federationapi/consumers/presence.go @@ -19,9 +19,11 @@ import ( "encoding/json" "strconv" + "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/storage" fedTypes "github.com/matrix-org/dendrite/federationapi/types" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" @@ -33,12 +35,14 @@ import ( // OutputReceiptConsumer consumes events that originate in the clientapi. type OutputPresenceConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - db storage.Database - queues *queue.OutgoingQueues - isLocalServerName func(gomatrixserverlib.ServerName) bool + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + isLocalServerName func(gomatrixserverlib.ServerName) bool + rsAPI roomserverAPI.FederationRoomserverAPI + roomserverAPI.FederationRoomserverAPI topic string outboundPresenceEnabled bool } @@ -50,6 +54,7 @@ func NewOutputPresenceConsumer( js nats.JetStreamContext, queues *queue.OutgoingQueues, store storage.Database, + rsAPI roomserverAPI.FederationRoomserverAPI, ) *OutputPresenceConsumer { return &OutputPresenceConsumer{ ctx: process.Context(), @@ -60,6 +65,7 @@ func NewOutputPresenceConsumer( durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound, + rsAPI: rsAPI, } } @@ -89,6 +95,16 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg return true } + var queryRes roomserverAPI.QueryRoomsForUserResponse + err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{ + UserID: userID, + WantMembership: "join", + }, &queryRes) + if err != nil { + log.WithError(err).Error("failed to calculate joined rooms for user") + return true + } + presence := msg.Header.Get("presence") ts, err := strconv.Atoi(msg.Header.Get("last_active_ts")) @@ -96,11 +112,13 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg return true } - joined, err := t.db.GetAllJoinedHosts(ctx) + // send this key change to all servers who share rooms with this user. + joined, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true) if err != nil { - log.WithError(err).Error("failed to get joined hosts") + sentry.CaptureException(err) return true } + if len(joined) == 0 { return true } diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index a58cba1b1..202da6c51 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -164,7 +164,7 @@ func NewInternalAPI( } presenceConsumer := consumers.NewOutputPresenceConsumer( - base.ProcessContext, cfg, js, queues, federationDB, + base.ProcessContext, cfg, js, queues, federationDB, rsAPI, ) if err = presenceConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start presence consumer")