diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index c7e5d485b..c63dae14a 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -15,6 +15,7 @@ package consumers import ( "context" "encoding/json" + "fmt" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/eduserver/api" @@ -28,12 +29,13 @@ import ( // OutputEDUConsumer consumes events that originate in EDU server. type OutputEDUConsumer struct { - consumer *internal.ContinualConsumer - db storage.Database - queues *queue.OutgoingQueues - ServerName gomatrixserverlib.ServerName - TypingTopic string - SendToDeviceTopic string + typingConsumer *internal.ContinualConsumer + sendToDeviceConsumer *internal.ContinualConsumer + db storage.Database + queues *queue.OutgoingQueues + ServerName gomatrixserverlib.ServerName + TypingTopic string + SendToDeviceTopic string } // NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers. @@ -43,39 +45,36 @@ func NewOutputEDUConsumer( queues *queue.OutgoingQueues, store storage.Database, ) *OutputEDUConsumer { - consumer := internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputTypingEvent), - Consumer: kafkaConsumer, - PartitionStore: store, - } c := &OutputEDUConsumer{ - consumer: &consumer, + typingConsumer: &internal.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputTypingEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + }, + sendToDeviceConsumer: &internal.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + }, queues: queues, db: store, ServerName: cfg.Matrix.ServerName, TypingTopic: string(cfg.Kafka.Topics.OutputTypingEvent), SendToDeviceTopic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent), } - consumer.ProcessMessage = c.onMessage + c.typingConsumer.ProcessMessage = c.onTypingEvent + c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent return c } // Start consuming from EDU servers func (t *OutputEDUConsumer) Start() error { - return t.consumer.Start() -} - -// onMessage is called for OutputTypingEvent received from the EDU servers. -// Parses the msg, creates a matrix federation EDU and sends it to joined hosts. -func (t *OutputEDUConsumer) onMessage(msg *sarama.ConsumerMessage) error { - switch msg.Topic { - case t.TypingTopic: - return t.onTypingEvent(msg) - case t.SendToDeviceTopic: - return t.onSendToDeviceEvent(msg) - default: - log.Warnf("Ignoring message with unexpected topic %q", msg.Topic) + if err := t.typingConsumer.Start(); err != nil { + return fmt.Errorf("t.typingConsumer.Start: %w", err) + } + if err := t.sendToDeviceConsumer.Start(); err != nil { + return fmt.Errorf("t.sendToDeviceConsumer.Start: %w", err) } return nil } @@ -86,7 +85,7 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *sarama.ConsumerMessage) err // Extract the send-to-device event from msg. var ote api.OutputSendToDeviceEvent if err := json.Unmarshal(msg.Value, &ote); err != nil { - log.WithError(err).Errorf("eduserver output log: message parse failed") + log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)") return nil } @@ -107,15 +106,24 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *sarama.ConsumerMessage) err return nil } - edu := &gomatrixserverlib.EDU{Type: ote.Type} - if edu.Content, err = json.Marshal(map[string]interface{}{ - "sender": ote.SendToDeviceEvent.Sender, - "user_id": ote.UserID, - "device_id": ote.DeviceID, - }); err != nil { + edu := &gomatrixserverlib.EDU{ + Type: gomatrixserverlib.MDirectToDevice, + Origin: string(t.ServerName), + } + tdm := gomatrixserverlib.ToDeviceMessage{ + Sender: ote.Sender, + Type: ote.Type, + Messages: map[string]map[string]json.RawMessage{ + ote.UserID: { + ote.DeviceID: ote.Content, + }, + }, + } + if edu.Content, err = json.Marshal(tdm); err != nil { return err } + log.Infof("Sending send-to-device message into %q destination queue", destServerName) return t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}) } @@ -126,7 +134,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *sarama.ConsumerMessage) error { var ote api.OutputTypingEvent if err := json.Unmarshal(msg.Value, &ote); err != nil { // Skip this msg but continue processing messages. - log.WithError(err).Errorf("eduserver output log: message parse failed") + log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)") return nil }