Only send presence to joined hosts of the user

This commit is contained in:
Till Faelligen 2022-10-21 12:51:24 +02:00
parent eeabe892a9
commit f6f70150fb
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
2 changed files with 27 additions and 9 deletions

View file

@ -19,9 +19,11 @@ import (
"encoding/json" "encoding/json"
"strconv" "strconv"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/storage"
fedTypes "github.com/matrix-org/dendrite/federationapi/types" fedTypes "github.com/matrix-org/dendrite/federationapi/types"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
@ -33,12 +35,14 @@ import (
// OutputReceiptConsumer consumes events that originate in the clientapi. // OutputReceiptConsumer consumes events that originate in the clientapi.
type OutputPresenceConsumer struct { type OutputPresenceConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable string durable string
db storage.Database db storage.Database
queues *queue.OutgoingQueues queues *queue.OutgoingQueues
isLocalServerName func(gomatrixserverlib.ServerName) bool isLocalServerName func(gomatrixserverlib.ServerName) bool
rsAPI roomserverAPI.FederationRoomserverAPI
roomserverAPI.FederationRoomserverAPI
topic string topic string
outboundPresenceEnabled bool outboundPresenceEnabled bool
} }
@ -50,6 +54,7 @@ func NewOutputPresenceConsumer(
js nats.JetStreamContext, js nats.JetStreamContext,
queues *queue.OutgoingQueues, queues *queue.OutgoingQueues,
store storage.Database, store storage.Database,
rsAPI roomserverAPI.FederationRoomserverAPI,
) *OutputPresenceConsumer { ) *OutputPresenceConsumer {
return &OutputPresenceConsumer{ return &OutputPresenceConsumer{
ctx: process.Context(), ctx: process.Context(),
@ -60,6 +65,7 @@ func NewOutputPresenceConsumer(
durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"), durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound, outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound,
rsAPI: rsAPI,
} }
} }
@ -89,6 +95,16 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
return true return true
} }
var queryRes roomserverAPI.QueryRoomsForUserResponse
err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{
UserID: userID,
WantMembership: "join",
}, &queryRes)
if err != nil {
log.WithError(err).Error("failed to calculate joined rooms for user")
return true
}
presence := msg.Header.Get("presence") presence := msg.Header.Get("presence")
ts, err := strconv.Atoi(msg.Header.Get("last_active_ts")) ts, err := strconv.Atoi(msg.Header.Get("last_active_ts"))
@ -96,11 +112,13 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
return true return true
} }
joined, err := t.db.GetAllJoinedHosts(ctx) // send this key change to all servers who share rooms with this user.
joined, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
if err != nil { if err != nil {
log.WithError(err).Error("failed to get joined hosts") sentry.CaptureException(err)
return true return true
} }
if len(joined) == 0 { if len(joined) == 0 {
return true return true
} }

View file

@ -164,7 +164,7 @@ func NewInternalAPI(
} }
presenceConsumer := consumers.NewOutputPresenceConsumer( presenceConsumer := consumers.NewOutputPresenceConsumer(
base.ProcessContext, cfg, js, queues, federationDB, base.ProcessContext, cfg, js, queues, federationDB, rsAPI,
) )
if err = presenceConsumer.Start(); err != nil { if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start presence consumer") logrus.WithError(err).Panic("failed to start presence consumer")