Wire up send-to-device consumer, message formatting

This commit is contained in:
Neil Alexander 2020-07-14 11:28:35 +01:00
parent 9a1026ba24
commit fef9ed2a17
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -15,6 +15,7 @@ package consumers
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
@ -28,12 +29,13 @@ import (
// OutputEDUConsumer consumes events that originate in EDU server. // OutputEDUConsumer consumes events that originate in EDU server.
type OutputEDUConsumer struct { type OutputEDUConsumer struct {
consumer *internal.ContinualConsumer typingConsumer *internal.ContinualConsumer
db storage.Database sendToDeviceConsumer *internal.ContinualConsumer
queues *queue.OutgoingQueues db storage.Database
ServerName gomatrixserverlib.ServerName queues *queue.OutgoingQueues
TypingTopic string ServerName gomatrixserverlib.ServerName
SendToDeviceTopic string TypingTopic string
SendToDeviceTopic string
} }
// NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers. // NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
@ -43,39 +45,36 @@ func NewOutputEDUConsumer(
queues *queue.OutgoingQueues, queues *queue.OutgoingQueues,
store storage.Database, store storage.Database,
) *OutputEDUConsumer { ) *OutputEDUConsumer {
consumer := internal.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
c := &OutputEDUConsumer{ c := &OutputEDUConsumer{
consumer: &consumer, typingConsumer: &internal.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
sendToDeviceConsumer: &internal.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
queues: queues, queues: queues,
db: store, db: store,
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
TypingTopic: string(cfg.Kafka.Topics.OutputTypingEvent), TypingTopic: string(cfg.Kafka.Topics.OutputTypingEvent),
SendToDeviceTopic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent), SendToDeviceTopic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent),
} }
consumer.ProcessMessage = c.onMessage c.typingConsumer.ProcessMessage = c.onTypingEvent
c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent
return c return c
} }
// Start consuming from EDU servers // Start consuming from EDU servers
func (t *OutputEDUConsumer) Start() error { func (t *OutputEDUConsumer) Start() error {
return t.consumer.Start() if err := t.typingConsumer.Start(); err != nil {
} return fmt.Errorf("t.typingConsumer.Start: %w", err)
}
// onMessage is called for OutputTypingEvent received from the EDU servers. if err := t.sendToDeviceConsumer.Start(); err != nil {
// Parses the msg, creates a matrix federation EDU and sends it to joined hosts. return fmt.Errorf("t.sendToDeviceConsumer.Start: %w", err)
func (t *OutputEDUConsumer) onMessage(msg *sarama.ConsumerMessage) error {
switch msg.Topic {
case t.TypingTopic:
return t.onTypingEvent(msg)
case t.SendToDeviceTopic:
return t.onSendToDeviceEvent(msg)
default:
log.Warnf("Ignoring message with unexpected topic %q", msg.Topic)
} }
return nil return nil
} }
@ -86,7 +85,7 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *sarama.ConsumerMessage) err
// Extract the send-to-device event from msg. // Extract the send-to-device event from msg.
var ote api.OutputSendToDeviceEvent var ote api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Value, &ote); err != nil { if err := json.Unmarshal(msg.Value, &ote); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failed") log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
return nil return nil
} }
@ -107,15 +106,24 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *sarama.ConsumerMessage) err
return nil return nil
} }
edu := &gomatrixserverlib.EDU{Type: ote.Type} edu := &gomatrixserverlib.EDU{
if edu.Content, err = json.Marshal(map[string]interface{}{ Type: gomatrixserverlib.MDirectToDevice,
"sender": ote.SendToDeviceEvent.Sender, Origin: string(t.ServerName),
"user_id": ote.UserID, }
"device_id": ote.DeviceID, tdm := gomatrixserverlib.ToDeviceMessage{
}); err != nil { Sender: ote.Sender,
Type: ote.Type,
Messages: map[string]map[string]json.RawMessage{
ote.UserID: {
ote.DeviceID: ote.Content,
},
},
}
if edu.Content, err = json.Marshal(tdm); err != nil {
return err return err
} }
log.Infof("Sending send-to-device message into %q destination queue", destServerName)
return t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}) return t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName})
} }
@ -126,7 +134,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *sarama.ConsumerMessage) error {
var ote api.OutputTypingEvent var ote api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &ote); err != nil { if err := json.Unmarshal(msg.Value, &ote); err != nil {
// Skip this msg but continue processing messages. // Skip this msg but continue processing messages.
log.WithError(err).Errorf("eduserver output log: message parse failed") log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)")
return nil return nil
} }