Initial work to send send-to-device messages over federation

This commit is contained in:
Neil Alexander 2020-07-14 10:04:10 +01:00
parent 396219ef53
commit 666a9061fc
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 75 additions and 18 deletions

View file

@ -26,31 +26,35 @@ import (
log "github.com/sirupsen/logrus"
)
// OutputTypingEventConsumer consumes events that originate in EDU server.
type OutputTypingEventConsumer struct {
// OutputEDUConsumer consumes events that originate in EDU server.
type OutputEDUConsumer struct {
consumer *internal.ContinualConsumer
db storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
TypingTopic string
SendToDeviceTopic string
}
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputTypingEventConsumer(
// NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputEDUConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store storage.Database,
) *OutputTypingEventConsumer {
) *OutputEDUConsumer {
consumer := internal.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
c := &OutputTypingEventConsumer{
c := &OutputEDUConsumer{
consumer: &consumer,
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
TypingTopic: string(cfg.Kafka.Topics.OutputTypingEvent),
SendToDeviceTopic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent),
}
consumer.ProcessMessage = c.onMessage
@ -58,13 +62,66 @@ func NewOutputTypingEventConsumer(
}
// Start consuming from EDU servers
func (t *OutputTypingEventConsumer) Start() error {
func (t *OutputEDUConsumer) Start() error {
return t.consumer.Start()
}
// onMessage is called for OutputTypingEvent received from the EDU servers.
// Parses the msg, creates a matrix federation EDU and sends it to joined hosts.
func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (t *OutputEDUConsumer) onMessage(msg *sarama.ConsumerMessage) error {
switch msg.Topic {
case t.TypingTopic:
return t.onTypingEvent(msg)
case t.SendToDeviceTopic:
return t.onSendToDeviceEvent(msg)
default:
log.Warnf("Ignoring message with unexpected topic %q", msg.Topic)
}
return nil
}
// onSendToDeviceEvent is called in response to a message received on the
// send-to-device events topic from the EDU server.
func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *sarama.ConsumerMessage) error {
// Extract the send-to-device event from msg.
var ote api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Value, &ote); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failed")
return nil
}
// only send typing events which originated from us
_, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender)
if err != nil {
log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender")
return nil
}
if originServerName != t.ServerName {
log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere")
return nil
}
_, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
if err != nil {
log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination")
return nil
}
edu := &gomatrixserverlib.EDU{Type: ote.Type}
if edu.Content, err = json.Marshal(map[string]interface{}{
"sender": ote.SendToDeviceEvent.Sender,
"user_id": ote.UserID,
"device_id": ote.DeviceID,
}); err != nil {
return err
}
return t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName})
}
// onTypingEvent is called in response to a message received on the typing
// events topic from the EDU server.
func (t *OutputEDUConsumer) onTypingEvent(msg *sarama.ConsumerMessage) error {
// Extract the typing event from msg.
var ote api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &ote); err != nil {

View file

@ -66,7 +66,7 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start room server consumer")
}
tsConsumer := consumers.NewOutputTypingEventConsumer(
tsConsumer := consumers.NewOutputEDUConsumer(
base.Cfg, base.KafkaConsumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {

View file

@ -174,7 +174,7 @@ func (oqs *OutgoingQueues) SendInvite(
return nil
}
// SendEDU sends an EDU event to the destinations
// SendEDU sends an EDU event to the destinations.
func (oqs *OutgoingQueues) SendEDU(
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,
destinations []gomatrixserverlib.ServerName,