Change options to allow inbound/outbound presence

This commit is contained in:
Till Faelligen 2022-04-05 15:10:19 +02:00
parent 02d417c5c6
commit 37ad7297cc
8 changed files with 65 additions and 39 deletions

View file

@ -45,7 +45,7 @@ func SetPresence(
producer *producers.SyncAPIProducer, producer *producers.SyncAPIProducer,
userID string, userID string,
) util.JSONResponse { ) util.JSONResponse {
if cfg.Matrix.DisablePresence { if !cfg.Matrix.Presence.EnableInbound {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: struct{}{}, JSON: struct{}{},

View file

@ -91,6 +91,10 @@ func main() {
cfg.UserAPI.BCryptCost = bcrypt.MinCost cfg.UserAPI.BCryptCost = bcrypt.MinCost
cfg.Global.JetStream.InMemory = true cfg.Global.JetStream.InMemory = true
cfg.ClientAPI.RegistrationSharedSecret = "complement" cfg.ClientAPI.RegistrationSharedSecret = "complement"
cfg.Global.Presence = config.PresenceOptions{
EnableInbound: true,
EnableOutbound: true,
}
} }
j, err := yaml.Marshal(cfg) j, err := yaml.Marshal(cfg)

View file

@ -33,16 +33,17 @@ import (
// OutputReceiptConsumer consumes events that originate in the clientapi. // OutputReceiptConsumer consumes events that originate in the clientapi.
type OutputPresenceConsumer struct { type OutputPresenceConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable string durable string
db storage.Database db storage.Database
queues *queue.OutgoingQueues queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
topic string topic string
outboundPresenceEnabled bool
} }
// NewOutputReceiptConsumer creates a new OutputReceiptConsumer. Call Start() to begin consuming typing events. // NewOutputPresenceConsumer creates a new OutputPresenceConsumer. Call Start() to begin consuming events.
func NewOutputPresenceConsumer( func NewOutputPresenceConsumer(
process *process.ProcessContext, process *process.ProcessContext,
cfg *config.FederationAPI, cfg *config.FederationAPI,
@ -51,25 +52,29 @@ func NewOutputPresenceConsumer(
store storage.Database, store storage.Database,
) *OutputPresenceConsumer { ) *OutputPresenceConsumer {
return &OutputPresenceConsumer{ return &OutputPresenceConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
queues: queues, queues: queues,
db: store, db: store,
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"), durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound,
} }
} }
// Start consuming from the clientapi // Start consuming from the clientapi
func (t *OutputPresenceConsumer) Start() error { func (t *OutputPresenceConsumer) Start() error {
if !t.outboundPresenceEnabled {
return nil
}
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
) )
} }
// onMessage is called in response to a message received on the receipt // onMessage is called in response to a message received on the presence
// events topic from the client api. // events topic from the client api.
func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
// only send presence events which originated from us // only send presence events which originated from us

View file

@ -128,13 +128,14 @@ func Send(
defer inFlightTxnsPerOrigin.Delete(index) defer inFlightTxnsPerOrigin.Delete(index)
t := txnReq{ t := txnReq{
rsAPI: rsAPI, rsAPI: rsAPI,
keys: keys, keys: keys,
federation: federation, federation: federation,
servers: servers, servers: servers,
keyAPI: keyAPI, keyAPI: keyAPI,
roomsMu: mu, roomsMu: mu,
producer: producer, producer: producer,
inboundPresenceEnabled: cfg.Matrix.Presence.EnableInbound,
} }
var txnEvents struct { var txnEvents struct {
@ -186,13 +187,14 @@ func Send(
type txnReq struct { type txnReq struct {
gomatrixserverlib.Transaction gomatrixserverlib.Transaction
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
keyAPI keyapi.KeyInternalAPI keyAPI keyapi.KeyInternalAPI
keys gomatrixserverlib.JSONVerifier keys gomatrixserverlib.JSONVerifier
federation txnFederationClient federation txnFederationClient
roomsMu *internal.MutexByRoom roomsMu *internal.MutexByRoom
servers federationAPI.ServersInRoomProvider servers federationAPI.ServersInRoomProvider
producer *producers.SyncAPIProducer producer *producers.SyncAPIProducer
inboundPresenceEnabled bool
} }
// A subset of FederationClient functionality that txn requires. Useful for testing. // A subset of FederationClient functionality that txn requires. Useful for testing.
@ -391,8 +393,10 @@ func (t *txnReq) processEDUs(ctx context.Context) {
logrus.WithError(err).Errorf("Failed to process signing key update") logrus.WithError(err).Errorf("Failed to process signing key update")
} }
case gomatrixserverlib.MPresence: case gomatrixserverlib.MPresence:
if err := t.processPresence(ctx, e); err != nil { if t.inboundPresenceEnabled {
logrus.WithError(err).Errorf("Failed to process presence update") if err := t.processPresence(ctx, e); err != nil {
logrus.WithError(err).Errorf("Failed to process presence update")
}
} }
default: default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")

View file

@ -42,7 +42,7 @@ type Global struct {
DisableFederation bool `yaml:"disable_federation"` DisableFederation bool `yaml:"disable_federation"`
// Disable presence. Dendrite will not handle presence events. // Disable presence. Dendrite will not handle presence events.
DisablePresence bool `yaml:"disable_presence"` Presence PresenceOptions `yaml:"presence"`
// List of domains that the server will trust as identity servers to // List of domains that the server will trust as identity servers to
// verify third-party identifiers. // verify third-party identifiers.
@ -71,7 +71,6 @@ func (c *Global) Defaults(generate bool) {
c.PrivateKeyPath = "matrix_key.pem" c.PrivateKeyPath = "matrix_key.pem"
_, c.PrivateKey, _ = ed25519.GenerateKey(rand.New(rand.NewSource(0))) _, c.PrivateKey, _ = ed25519.GenerateKey(rand.New(rand.NewSource(0)))
c.KeyID = "ed25519:auto" c.KeyID = "ed25519:auto"
c.DisablePresence = false
} }
c.KeyValidityPeriod = time.Hour * 24 * 7 c.KeyValidityPeriod = time.Hour * 24 * 7
@ -229,3 +228,8 @@ func (c *DNSCacheOptions) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkPositive(configErrs, "cache_size", int64(c.CacheSize)) checkPositive(configErrs, "cache_size", int64(c.CacheSize))
checkPositive(configErrs, "cache_lifetime", int64(c.CacheLifetime)) checkPositive(configErrs, "cache_lifetime", int64(c.CacheLifetime))
} }
type PresenceOptions struct {
EnableInbound bool `yaml:"enable_inbound"`
EnableOutbound bool `yaml:"enable_outbound"`
}

View file

@ -42,10 +42,11 @@ type PresenceConsumer struct {
stream types.StreamProvider stream types.StreamProvider
notifier *notifier.Notifier notifier *notifier.Notifier
deviceAPI api.UserDeviceAPI deviceAPI api.UserDeviceAPI
cfg *config.SyncAPI
} }
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. // NewPresenceConsumer creates a new PresenceConsumer.
// Call Start() to begin consuming from the EDU server. // Call Start() to begin consuming events.
func NewPresenceConsumer( func NewPresenceConsumer(
process *process.ProcessContext, process *process.ProcessContext,
cfg *config.SyncAPI, cfg *config.SyncAPI,
@ -67,6 +68,7 @@ func NewPresenceConsumer(
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,
deviceAPI: deviceAPI, deviceAPI: deviceAPI,
cfg: cfg,
} }
} }
@ -115,6 +117,9 @@ func (s *PresenceConsumer) Start() error {
if err != nil { if err != nil {
return err return err
} }
if !s.cfg.Matrix.Presence.EnableInbound && !s.cfg.Matrix.Presence.EnableOutbound {
return nil
}
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage, s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),

View file

@ -80,6 +80,7 @@ func NewRequestPool(
producer: producer, producer: producer,
} }
go rp.cleanLastSeen() go rp.cleanLastSeen()
go rp.cleanPresence(db, time.Minute*5) go rp.cleanPresence(db, time.Minute*5)
return rp return rp
} }
@ -110,7 +111,7 @@ func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Durat
// updatePresence sends presence updates to the SyncAPI and FederationAPI // updatePresence sends presence updates to the SyncAPI and FederationAPI
func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) { func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) {
if rp.cfg.Matrix.DisablePresence { if !rp.cfg.Matrix.Presence.EnableInbound {
return return
} }
if presence == "" { if presence == "" {

View file

@ -106,7 +106,10 @@ func TestRequestPool_updatePresence(t *testing.T) {
JetStream: config.JetStream{ JetStream: config.JetStream{
TopicPrefix: "Dendrite", TopicPrefix: "Dendrite",
}, },
DisablePresence: false, Presence: config.PresenceOptions{
EnableInbound: true,
EnableOutbound: true,
},
}, },
}, },
} }