From 37ad7297ccee5a85c9a9657d90e545c9db268f87 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Tue, 5 Apr 2022 15:10:19 +0200 Subject: [PATCH] Change options to allow inbound/outbound presence --- clientapi/routing/presence.go | 2 +- cmd/generate-config/main.go | 4 ++++ federationapi/consumers/presence.go | 37 ++++++++++++++++------------- federationapi/routing/send.go | 36 +++++++++++++++------------- setup/config/config_global.go | 8 +++++-- syncapi/consumers/presence.go | 9 +++++-- syncapi/sync/requestpool.go | 3 ++- syncapi/sync/requestpool_test.go | 5 +++- 8 files changed, 65 insertions(+), 39 deletions(-) diff --git a/clientapi/routing/presence.go b/clientapi/routing/presence.go index 2d0672e91..80314bfef 100644 --- a/clientapi/routing/presence.go +++ b/clientapi/routing/presence.go @@ -45,7 +45,7 @@ func SetPresence( producer *producers.SyncAPIProducer, userID string, ) util.JSONResponse { - if cfg.Matrix.DisablePresence { + if !cfg.Matrix.Presence.EnableInbound { return util.JSONResponse{ Code: http.StatusOK, JSON: struct{}{}, diff --git a/cmd/generate-config/main.go b/cmd/generate-config/main.go index ba5a87a7a..24085afaa 100644 --- a/cmd/generate-config/main.go +++ b/cmd/generate-config/main.go @@ -91,6 +91,10 @@ func main() { cfg.UserAPI.BCryptCost = bcrypt.MinCost cfg.Global.JetStream.InMemory = true cfg.ClientAPI.RegistrationSharedSecret = "complement" + cfg.Global.Presence = config.PresenceOptions{ + EnableInbound: true, + EnableOutbound: true, + } } j, err := yaml.Marshal(cfg) diff --git a/federationapi/consumers/presence.go b/federationapi/consumers/presence.go index f52d9a772..bfce1b28b 100644 --- a/federationapi/consumers/presence.go +++ b/federationapi/consumers/presence.go @@ -33,16 +33,17 @@ import ( // OutputReceiptConsumer consumes events that originate in the clientapi. type OutputPresenceConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - db storage.Database - queues *queue.OutgoingQueues - ServerName gomatrixserverlib.ServerName - topic string + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + ServerName gomatrixserverlib.ServerName + topic string + outboundPresenceEnabled bool } -// NewOutputReceiptConsumer creates a new OutputReceiptConsumer. Call Start() to begin consuming typing events. +// NewOutputPresenceConsumer creates a new OutputPresenceConsumer. Call Start() to begin consuming events. func NewOutputPresenceConsumer( process *process.ProcessContext, cfg *config.FederationAPI, @@ -51,25 +52,29 @@ func NewOutputPresenceConsumer( store storage.Database, ) *OutputPresenceConsumer { return &OutputPresenceConsumer{ - ctx: process.Context(), - jetstream: js, - queues: queues, - db: store, - ServerName: cfg.Matrix.ServerName, - durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"), - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + ctx: process.Context(), + jetstream: js, + queues: queues, + db: store, + ServerName: cfg.Matrix.ServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound, } } // Start consuming from the clientapi func (t *OutputPresenceConsumer) Start() error { + if !t.outboundPresenceEnabled { + return nil + } return jetstream.JetStreamConsumer( t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), ) } -// onMessage is called in response to a message received on the receipt +// onMessage is called in response to a message received on the presence // events topic from the client api. func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { // only send presence events which originated from us diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index a575040e2..1bba632b5 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -128,13 +128,14 @@ func Send( defer inFlightTxnsPerOrigin.Delete(index) t := txnReq{ - rsAPI: rsAPI, - keys: keys, - federation: federation, - servers: servers, - keyAPI: keyAPI, - roomsMu: mu, - producer: producer, + rsAPI: rsAPI, + keys: keys, + federation: federation, + servers: servers, + keyAPI: keyAPI, + roomsMu: mu, + producer: producer, + inboundPresenceEnabled: cfg.Matrix.Presence.EnableInbound, } var txnEvents struct { @@ -186,13 +187,14 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - rsAPI api.RoomserverInternalAPI - keyAPI keyapi.KeyInternalAPI - keys gomatrixserverlib.JSONVerifier - federation txnFederationClient - roomsMu *internal.MutexByRoom - servers federationAPI.ServersInRoomProvider - producer *producers.SyncAPIProducer + rsAPI api.RoomserverInternalAPI + keyAPI keyapi.KeyInternalAPI + keys gomatrixserverlib.JSONVerifier + federation txnFederationClient + roomsMu *internal.MutexByRoom + servers federationAPI.ServersInRoomProvider + producer *producers.SyncAPIProducer + inboundPresenceEnabled bool } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -391,8 +393,10 @@ func (t *txnReq) processEDUs(ctx context.Context) { logrus.WithError(err).Errorf("Failed to process signing key update") } case gomatrixserverlib.MPresence: - if err := t.processPresence(ctx, e); err != nil { - logrus.WithError(err).Errorf("Failed to process presence update") + if t.inboundPresenceEnabled { + if err := t.processPresence(ctx, e); err != nil { + logrus.WithError(err).Errorf("Failed to process presence update") + } } default: util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") diff --git a/setup/config/config_global.go b/setup/config/config_global.go index 61086a4dd..870d1e6bd 100644 --- a/setup/config/config_global.go +++ b/setup/config/config_global.go @@ -42,7 +42,7 @@ type Global struct { DisableFederation bool `yaml:"disable_federation"` // Disable presence. Dendrite will not handle presence events. - DisablePresence bool `yaml:"disable_presence"` + Presence PresenceOptions `yaml:"presence"` // List of domains that the server will trust as identity servers to // verify third-party identifiers. @@ -71,7 +71,6 @@ func (c *Global) Defaults(generate bool) { c.PrivateKeyPath = "matrix_key.pem" _, c.PrivateKey, _ = ed25519.GenerateKey(rand.New(rand.NewSource(0))) c.KeyID = "ed25519:auto" - c.DisablePresence = false } c.KeyValidityPeriod = time.Hour * 24 * 7 @@ -229,3 +228,8 @@ func (c *DNSCacheOptions) Verify(configErrs *ConfigErrors, isMonolith bool) { checkPositive(configErrs, "cache_size", int64(c.CacheSize)) checkPositive(configErrs, "cache_lifetime", int64(c.CacheLifetime)) } + +type PresenceOptions struct { + EnableInbound bool `yaml:"enable_inbound"` + EnableOutbound bool `yaml:"enable_outbound"` +} diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go index 2eb5784e5..b198b2292 100644 --- a/syncapi/consumers/presence.go +++ b/syncapi/consumers/presence.go @@ -42,10 +42,11 @@ type PresenceConsumer struct { stream types.StreamProvider notifier *notifier.Notifier deviceAPI api.UserDeviceAPI + cfg *config.SyncAPI } -// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. -// Call Start() to begin consuming from the EDU server. +// NewPresenceConsumer creates a new PresenceConsumer. +// Call Start() to begin consuming events. func NewPresenceConsumer( process *process.ProcessContext, cfg *config.SyncAPI, @@ -67,6 +68,7 @@ func NewPresenceConsumer( notifier: notifier, stream: stream, deviceAPI: deviceAPI, + cfg: cfg, } } @@ -115,6 +117,9 @@ func (s *PresenceConsumer) Start() error { if err != nil { return err } + if !s.cfg.Matrix.Presence.EnableInbound && !s.cfg.Matrix.Presence.EnableOutbound { + return nil + } return jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage, nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 4ca6e4563..7af685506 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -80,6 +80,7 @@ func NewRequestPool( producer: producer, } go rp.cleanLastSeen() + go rp.cleanPresence(db, time.Minute*5) return rp } @@ -110,7 +111,7 @@ func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Durat // updatePresence sends presence updates to the SyncAPI and FederationAPI func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) { - if rp.cfg.Matrix.DisablePresence { + if !rp.cfg.Matrix.Presence.EnableInbound { return } if presence == "" { diff --git a/syncapi/sync/requestpool_test.go b/syncapi/sync/requestpool_test.go index 81022b50d..a80089945 100644 --- a/syncapi/sync/requestpool_test.go +++ b/syncapi/sync/requestpool_test.go @@ -106,7 +106,10 @@ func TestRequestPool_updatePresence(t *testing.T) { JetStream: config.JetStream{ TopicPrefix: "Dendrite", }, - DisablePresence: false, + Presence: config.PresenceOptions{ + EnableInbound: true, + EnableOutbound: true, + }, }, }, }