Update federation API consumers
This commit is contained in:
parent
f6dea712d2
commit
5298dd1133
|
@ -35,14 +35,14 @@ import (
|
||||||
|
|
||||||
// KeyChangeConsumer consumes events that originate in key server.
|
// KeyChangeConsumer consumes events that originate in key server.
|
||||||
type KeyChangeConsumer struct {
|
type KeyChangeConsumer struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
jetstream nats.JetStreamContext
|
jetstream nats.JetStreamContext
|
||||||
durable string
|
durable string
|
||||||
db storage.Database
|
db storage.Database
|
||||||
queues *queue.OutgoingQueues
|
queues *queue.OutgoingQueues
|
||||||
serverName gomatrixserverlib.ServerName
|
isLocalServerName func(gomatrixserverlib.ServerName) bool
|
||||||
rsAPI roomserverAPI.FederationRoomserverAPI
|
rsAPI roomserverAPI.FederationRoomserverAPI
|
||||||
topic string
|
topic string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
|
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
|
||||||
|
@ -55,14 +55,14 @@ func NewKeyChangeConsumer(
|
||||||
rsAPI roomserverAPI.FederationRoomserverAPI,
|
rsAPI roomserverAPI.FederationRoomserverAPI,
|
||||||
) *KeyChangeConsumer {
|
) *KeyChangeConsumer {
|
||||||
return &KeyChangeConsumer{
|
return &KeyChangeConsumer{
|
||||||
ctx: process.Context(),
|
ctx: process.Context(),
|
||||||
jetstream: js,
|
jetstream: js,
|
||||||
durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"),
|
durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"),
|
||||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
|
||||||
queues: queues,
|
queues: queues,
|
||||||
db: store,
|
db: store,
|
||||||
serverName: cfg.Matrix.ServerName,
|
isLocalServerName: cfg.Matrix.IsLocalServerName,
|
||||||
rsAPI: rsAPI,
|
rsAPI: rsAPI,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
||||||
logger.WithError(err).Error("Failed to extract domain from key change event")
|
logger.WithError(err).Error("Failed to extract domain from key change event")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if originServerName != t.serverName {
|
if !t.isLocalServerName(originServerName) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
||||||
// Pack the EDU and marshal it
|
// Pack the EDU and marshal it
|
||||||
edu := &gomatrixserverlib.EDU{
|
edu := &gomatrixserverlib.EDU{
|
||||||
Type: gomatrixserverlib.MDeviceListUpdate,
|
Type: gomatrixserverlib.MDeviceListUpdate,
|
||||||
Origin: string(t.serverName),
|
Origin: string(originServerName),
|
||||||
}
|
}
|
||||||
event := gomatrixserverlib.DeviceListUpdateEvent{
|
event := gomatrixserverlib.DeviceListUpdateEvent{
|
||||||
UserID: m.UserID,
|
UserID: m.UserID,
|
||||||
|
@ -159,7 +159,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debugf("Sending device list update message to %q", destinations)
|
logger.Debugf("Sending device list update message to %q", destinations)
|
||||||
err = t.queues.SendEDU(edu, t.serverName, destinations)
|
err = t.queues.SendEDU(edu, originServerName, destinations)
|
||||||
return err == nil
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,7 +171,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
||||||
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
|
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if host != gomatrixserverlib.ServerName(t.serverName) {
|
if !t.isLocalServerName(host) {
|
||||||
// Ignore any messages that didn't originate locally, otherwise we'll
|
// Ignore any messages that didn't originate locally, otherwise we'll
|
||||||
// end up parroting information we received from other servers.
|
// end up parroting information we received from other servers.
|
||||||
return true
|
return true
|
||||||
|
@ -203,7 +203,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
||||||
// Pack the EDU and marshal it
|
// Pack the EDU and marshal it
|
||||||
edu := &gomatrixserverlib.EDU{
|
edu := &gomatrixserverlib.EDU{
|
||||||
Type: types.MSigningKeyUpdate,
|
Type: types.MSigningKeyUpdate,
|
||||||
Origin: string(t.serverName),
|
Origin: string(host),
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(output); err != nil {
|
if edu.Content, err = json.Marshal(output); err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
@ -212,7 +212,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debugf("Sending cross-signing update message to %q", destinations)
|
logger.Debugf("Sending cross-signing update message to %q", destinations)
|
||||||
err = t.queues.SendEDU(edu, t.serverName, destinations)
|
err = t.queues.SendEDU(edu, host, destinations)
|
||||||
return err == nil
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,7 @@ type OutputPresenceConsumer struct {
|
||||||
durable string
|
durable string
|
||||||
db storage.Database
|
db storage.Database
|
||||||
queues *queue.OutgoingQueues
|
queues *queue.OutgoingQueues
|
||||||
ServerName gomatrixserverlib.ServerName
|
isLocalServerName func(gomatrixserverlib.ServerName) bool
|
||||||
topic string
|
topic string
|
||||||
outboundPresenceEnabled bool
|
outboundPresenceEnabled bool
|
||||||
}
|
}
|
||||||
|
@ -56,7 +56,7 @@ func NewOutputPresenceConsumer(
|
||||||
jetstream: js,
|
jetstream: js,
|
||||||
queues: queues,
|
queues: queues,
|
||||||
db: store,
|
db: store,
|
||||||
ServerName: cfg.Matrix.ServerName,
|
isLocalServerName: cfg.Matrix.IsLocalServerName,
|
||||||
durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"),
|
durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"),
|
||||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
||||||
outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound,
|
outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound,
|
||||||
|
@ -85,7 +85,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
|
||||||
log.WithError(err).WithField("user_id", userID).Error("failed to extract domain from receipt sender")
|
log.WithError(err).WithField("user_id", userID).Error("failed to extract domain from receipt sender")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if serverName != t.ServerName {
|
if !t.isLocalServerName(serverName) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,7 +127,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
|
||||||
|
|
||||||
edu := &gomatrixserverlib.EDU{
|
edu := &gomatrixserverlib.EDU{
|
||||||
Type: gomatrixserverlib.MPresence,
|
Type: gomatrixserverlib.MPresence,
|
||||||
Origin: string(t.ServerName),
|
Origin: string(serverName),
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(content); err != nil {
|
if edu.Content, err = json.Marshal(content); err != nil {
|
||||||
log.WithError(err).Error("failed to marshal EDU JSON")
|
log.WithError(err).Error("failed to marshal EDU JSON")
|
||||||
|
@ -135,7 +135,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Tracef("sending presence EDU to %d servers", len(joined))
|
log.Tracef("sending presence EDU to %d servers", len(joined))
|
||||||
if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil {
|
if err = t.queues.SendEDU(edu, serverName, joined); err != nil {
|
||||||
log.WithError(err).Error("failed to send EDU")
|
log.WithError(err).Error("failed to send EDU")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,13 +34,13 @@ import (
|
||||||
|
|
||||||
// OutputReceiptConsumer consumes events that originate in the clientapi.
|
// OutputReceiptConsumer consumes events that originate in the clientapi.
|
||||||
type OutputReceiptConsumer struct {
|
type OutputReceiptConsumer struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
jetstream nats.JetStreamContext
|
jetstream nats.JetStreamContext
|
||||||
durable string
|
durable string
|
||||||
db storage.Database
|
db storage.Database
|
||||||
queues *queue.OutgoingQueues
|
queues *queue.OutgoingQueues
|
||||||
ServerName gomatrixserverlib.ServerName
|
isLocalServerName func(gomatrixserverlib.ServerName) bool
|
||||||
topic string
|
topic string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputReceiptConsumer creates a new OutputReceiptConsumer. Call Start() to begin consuming typing events.
|
// NewOutputReceiptConsumer creates a new OutputReceiptConsumer. Call Start() to begin consuming typing events.
|
||||||
|
@ -52,13 +52,13 @@ func NewOutputReceiptConsumer(
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
) *OutputReceiptConsumer {
|
) *OutputReceiptConsumer {
|
||||||
return &OutputReceiptConsumer{
|
return &OutputReceiptConsumer{
|
||||||
ctx: process.Context(),
|
ctx: process.Context(),
|
||||||
jetstream: js,
|
jetstream: js,
|
||||||
queues: queues,
|
queues: queues,
|
||||||
db: store,
|
db: store,
|
||||||
ServerName: cfg.Matrix.ServerName,
|
isLocalServerName: cfg.Matrix.IsLocalServerName,
|
||||||
durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"),
|
durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"),
|
||||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,7 +95,7 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
|
||||||
log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
|
log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if receiptServerName != t.ServerName {
|
if !t.isLocalServerName(receiptServerName) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,14 +134,14 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
|
||||||
|
|
||||||
edu := &gomatrixserverlib.EDU{
|
edu := &gomatrixserverlib.EDU{
|
||||||
Type: gomatrixserverlib.MReceipt,
|
Type: gomatrixserverlib.MReceipt,
|
||||||
Origin: string(t.ServerName),
|
Origin: string(receiptServerName),
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(content); err != nil {
|
if edu.Content, err = json.Marshal(content); err != nil {
|
||||||
log.WithError(err).Error("failed to marshal EDU JSON")
|
log.WithError(err).Error("failed to marshal EDU JSON")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
|
if err := t.queues.SendEDU(edu, receiptServerName, names); err != nil {
|
||||||
log.WithError(err).Error("failed to send EDU")
|
log.WithError(err).Error("failed to send EDU")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,13 +34,13 @@ import (
|
||||||
|
|
||||||
// OutputSendToDeviceConsumer consumes events that originate in the clientapi.
|
// OutputSendToDeviceConsumer consumes events that originate in the clientapi.
|
||||||
type OutputSendToDeviceConsumer struct {
|
type OutputSendToDeviceConsumer struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
jetstream nats.JetStreamContext
|
jetstream nats.JetStreamContext
|
||||||
durable string
|
durable string
|
||||||
db storage.Database
|
db storage.Database
|
||||||
queues *queue.OutgoingQueues
|
queues *queue.OutgoingQueues
|
||||||
ServerName gomatrixserverlib.ServerName
|
isLocalServerName func(gomatrixserverlib.ServerName) bool
|
||||||
topic string
|
topic string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputSendToDeviceConsumer creates a new OutputSendToDeviceConsumer. Call Start() to begin consuming send-to-device events.
|
// NewOutputSendToDeviceConsumer creates a new OutputSendToDeviceConsumer. Call Start() to begin consuming send-to-device events.
|
||||||
|
@ -52,13 +52,13 @@ func NewOutputSendToDeviceConsumer(
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
) *OutputSendToDeviceConsumer {
|
) *OutputSendToDeviceConsumer {
|
||||||
return &OutputSendToDeviceConsumer{
|
return &OutputSendToDeviceConsumer{
|
||||||
ctx: process.Context(),
|
ctx: process.Context(),
|
||||||
jetstream: js,
|
jetstream: js,
|
||||||
queues: queues,
|
queues: queues,
|
||||||
db: store,
|
db: store,
|
||||||
ServerName: cfg.Matrix.ServerName,
|
isLocalServerName: cfg.Matrix.IsLocalServerName,
|
||||||
durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"),
|
durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"),
|
||||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
|
||||||
log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender")
|
log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if originServerName != t.ServerName {
|
if !t.isLocalServerName(originServerName) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
// Extract the send-to-device event from msg.
|
// Extract the send-to-device event from msg.
|
||||||
|
@ -101,14 +101,14 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
|
||||||
}
|
}
|
||||||
|
|
||||||
// The SyncAPI is already handling sendToDevice for the local server
|
// The SyncAPI is already handling sendToDevice for the local server
|
||||||
if destServerName == t.ServerName {
|
if t.isLocalServerName(destServerName) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pack the EDU and marshal it
|
// Pack the EDU and marshal it
|
||||||
edu := &gomatrixserverlib.EDU{
|
edu := &gomatrixserverlib.EDU{
|
||||||
Type: gomatrixserverlib.MDirectToDevice,
|
Type: gomatrixserverlib.MDirectToDevice,
|
||||||
Origin: string(t.ServerName),
|
Origin: string(originServerName),
|
||||||
}
|
}
|
||||||
tdm := gomatrixserverlib.ToDeviceMessage{
|
tdm := gomatrixserverlib.ToDeviceMessage{
|
||||||
Sender: ote.Sender,
|
Sender: ote.Sender,
|
||||||
|
@ -127,7 +127,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("Sending send-to-device message into %q destination queue", destServerName)
|
log.Debugf("Sending send-to-device message into %q destination queue", destServerName)
|
||||||
if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
|
if err := t.queues.SendEDU(edu, originServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
|
||||||
log.WithError(err).Error("failed to send EDU")
|
log.WithError(err).Error("failed to send EDU")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,13 +31,13 @@ import (
|
||||||
|
|
||||||
// OutputTypingConsumer consumes events that originate in the clientapi.
|
// OutputTypingConsumer consumes events that originate in the clientapi.
|
||||||
type OutputTypingConsumer struct {
|
type OutputTypingConsumer struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
jetstream nats.JetStreamContext
|
jetstream nats.JetStreamContext
|
||||||
durable string
|
durable string
|
||||||
db storage.Database
|
db storage.Database
|
||||||
queues *queue.OutgoingQueues
|
queues *queue.OutgoingQueues
|
||||||
ServerName gomatrixserverlib.ServerName
|
isLocalServerName func(gomatrixserverlib.ServerName) bool
|
||||||
topic string
|
topic string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputTypingConsumer creates a new OutputTypingConsumer. Call Start() to begin consuming typing events.
|
// NewOutputTypingConsumer creates a new OutputTypingConsumer. Call Start() to begin consuming typing events.
|
||||||
|
@ -49,13 +49,13 @@ func NewOutputTypingConsumer(
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
) *OutputTypingConsumer {
|
) *OutputTypingConsumer {
|
||||||
return &OutputTypingConsumer{
|
return &OutputTypingConsumer{
|
||||||
ctx: process.Context(),
|
ctx: process.Context(),
|
||||||
jetstream: js,
|
jetstream: js,
|
||||||
queues: queues,
|
queues: queues,
|
||||||
db: store,
|
db: store,
|
||||||
ServerName: cfg.Matrix.ServerName,
|
isLocalServerName: cfg.Matrix.IsLocalServerName,
|
||||||
durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"),
|
durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"),
|
||||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
|
||||||
_ = msg.Ack()
|
_ = msg.Ack()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if typingServerName != t.ServerName {
|
if !t.isLocalServerName(typingServerName) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,7 +111,7 @@ func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
|
||||||
log.WithError(err).Error("failed to marshal EDU JSON")
|
log.WithError(err).Error("failed to marshal EDU JSON")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
|
if err := t.queues.SendEDU(edu, typingServerName, names); err != nil {
|
||||||
log.WithError(err).Error("failed to send EDU")
|
log.WithError(err).Error("failed to send EDU")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue