Move SendToDevice to producer

This commit is contained in:
Till Faelligen 2022-03-22 11:15:20 +01:00
parent 8e3d4d573a
commit 9f2f4ca7d7
16 changed files with 199 additions and 192 deletions

View file

@ -57,6 +57,8 @@ func AddPublicRoutes(
JetStream: js, JetStream: js,
TopicClientData: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), TopicClientData: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
UserAPI: userAPI,
} }
routing.Setup( routing.Setup(

View file

@ -19,8 +19,10 @@ import (
"encoding/json" "encoding/json"
"strconv" "strconv"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -30,7 +32,10 @@ import (
type SyncAPIProducer struct { type SyncAPIProducer struct {
TopicClientData string TopicClientData string
TopicReceiptEvent string TopicReceiptEvent string
TopicSendToDeviceEvent string
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
ServerName gomatrixserverlib.ServerName
UserAPI userapi.UserInternalAPI
} }
// SendData sends account data to the sync API server // SendData sends account data to the sync API server
@ -80,3 +85,74 @@ func (p *SyncAPIProducer) SendReceipt(
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err return err
} }
func (p *SyncAPIProducer) SendToDevice(
ctx context.Context, sender, userID, deviceID, eventType string,
message interface{},
) error {
devices := []string{}
_, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return err
}
// If the event is targeted locally then we want to expand the wildcard
// out into individual device IDs so that we can send them to each respective
// device. If the event isn't targeted locally then we can't expand the
// wildcard as we don't know about the remote devices, so instead we leave it
// as-is, so that the federation sender can send it on with the wildcard intact.
if domain == p.ServerName && deviceID == "*" {
var res userapi.QueryDevicesResponse
err = p.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{
UserID: userID,
}, &res)
if err != nil {
return err
}
for _, dev := range res.Devices {
devices = append(devices, dev.ID)
}
} else {
devices = append(devices, deviceID)
}
js, err := json.Marshal(message)
if err != nil {
return err
}
log.WithFields(log.Fields{
"user_id": userID,
"num_devices": len(devices),
"type": eventType,
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
for _, device := range devices {
ote := &api.OutputSendToDeviceEvent{
UserID: userID,
DeviceID: device,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
Sender: sender,
Type: eventType,
Content: js,
},
}
eventJSON, err := json.Marshal(ote)
if err != nil {
log.WithError(err).Error("sendToDevice failed json.Marshal")
return err
}
m := &nats.Msg{
Subject: p.TopicSendToDeviceEvent,
Data: eventJSON,
Header: nats.Header{},
}
m.Header.Set("sender", sender)
m.Header.Set(jetstream.UserID, userID)
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
return err
}
}
return nil
}

View file

@ -497,7 +497,7 @@ func Setup(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
txnID := vars["txnID"] txnID := vars["txnID"]
return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) return SendToDevice(req, device, syncProducer, transactionsCache, vars["eventType"], &txnID)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
@ -511,7 +511,7 @@ func Setup(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
txnID := vars["txnID"] txnID := vars["txnID"]
return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) return SendToDevice(req, device, syncProducer, transactionsCache, vars["eventType"], &txnID)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)

View file

@ -18,7 +18,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/internal/transactions"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -28,7 +28,7 @@ import (
// sends the device events to the EDU Server // sends the device events to the EDU Server
func SendToDevice( func SendToDevice(
req *http.Request, device *userapi.Device, req *http.Request, device *userapi.Device,
eduAPI api.EDUServerInputAPI, syncProducer *producers.SyncAPIProducer,
txnCache *transactions.Cache, txnCache *transactions.Cache,
eventType string, txnID *string, eventType string, txnID *string,
) util.JSONResponse { ) util.JSONResponse {
@ -48,8 +48,8 @@ func SendToDevice(
for userID, byUser := range httpReq.Messages { for userID, byUser := range httpReq.Messages {
for deviceID, message := range byUser { for deviceID, message := range byUser {
if err := api.SendToDevice( if err := syncProducer.SendToDevice(
req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message, req.Context(), device.UserID, userID, deviceID, eventType, message,
); err != nil { ); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed") util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()

View file

@ -37,12 +37,6 @@ type InputTypingEvent struct {
OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"` OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"`
} }
type InputSendToDeviceEvent struct {
UserID string `json:"user_id"`
DeviceID string `json:"device_id"`
gomatrixserverlib.SendToDeviceEvent
}
// InputTypingEventRequest is a request to EDUServerInputAPI // InputTypingEventRequest is a request to EDUServerInputAPI
type InputTypingEventRequest struct { type InputTypingEventRequest struct {
InputTypingEvent InputTypingEvent `json:"input_typing_event"` InputTypingEvent InputTypingEvent `json:"input_typing_event"`
@ -51,14 +45,6 @@ type InputTypingEventRequest struct {
// InputTypingEventResponse is a response to InputTypingEvents // InputTypingEventResponse is a response to InputTypingEvents
type InputTypingEventResponse struct{} type InputTypingEventResponse struct{}
// InputSendToDeviceEventRequest is a request to EDUServerInputAPI
type InputSendToDeviceEventRequest struct {
InputSendToDeviceEvent InputSendToDeviceEvent `json:"input_send_to_device_event"`
}
// InputSendToDeviceEventResponse is a response to InputSendToDeviceEventRequest
type InputSendToDeviceEventResponse struct{}
type InputCrossSigningKeyUpdateRequest struct { type InputCrossSigningKeyUpdateRequest struct {
CrossSigningKeyUpdate `json:"signing_keys"` CrossSigningKeyUpdate `json:"signing_keys"`
} }
@ -72,10 +58,4 @@ type EDUServerInputAPI interface {
request *InputTypingEventRequest, request *InputTypingEventRequest,
response *InputTypingEventResponse, response *InputTypingEventResponse,
) error ) error
InputSendToDeviceEvent(
ctx context.Context,
request *InputSendToDeviceEventRequest,
response *InputSendToDeviceEventResponse,
) error
} }

View file

@ -16,7 +16,6 @@ package api
import ( import (
"context" "context"
"encoding/json"
"time" "time"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -42,28 +41,3 @@ func SendTyping(
return err return err
} }
// SendToDevice sends a typing event to EDU server
func SendToDevice(
ctx context.Context, eduAPI EDUServerInputAPI, sender, userID, deviceID, eventType string,
message interface{},
) error {
js, err := json.Marshal(message)
if err != nil {
return err
}
requestData := InputSendToDeviceEvent{
UserID: userID,
DeviceID: deviceID,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
Sender: sender,
Type: eventType,
Content: js,
},
}
request := InputSendToDeviceEventRequest{
InputSendToDeviceEvent: requestData,
}
response := InputSendToDeviceEventResponse{}
return eduAPI.InputSendToDeviceEvent(ctx, &request, &response)
}

View file

@ -49,7 +49,6 @@ func NewInternalAPI(
UserAPI: userAPI, UserAPI: userAPI,
JetStream: js, JetStream: js,
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
} }
} }

View file

@ -35,8 +35,6 @@ type EDUServerInputAPI struct {
Cache *cache.EDUCache Cache *cache.EDUCache
// The kafka topic to output new typing events to. // The kafka topic to output new typing events to.
OutputTypingEventTopic string OutputTypingEventTopic string
// The kafka topic to output new send to device events to.
OutputSendToDeviceEventTopic string
// kafka producer // kafka producer
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
// Internal user query API // Internal user query API
@ -65,16 +63,6 @@ func (t *EDUServerInputAPI) InputTypingEvent(
return t.sendTypingEvent(ite) return t.sendTypingEvent(ite)
} }
// InputTypingEvent implements api.EDUServerInputAPI
func (t *EDUServerInputAPI) InputSendToDeviceEvent(
ctx context.Context,
request *api.InputSendToDeviceEventRequest,
response *api.InputSendToDeviceEventResponse,
) error {
ise := &request.InputSendToDeviceEvent
return t.sendToDeviceEvent(ise)
}
func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
ev := &api.TypingEvent{ ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping, Type: gomatrixserverlib.MTyping,
@ -110,60 +98,3 @@ func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
}) })
return err return err
} }
func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error {
devices := []string{}
_, domain, err := gomatrixserverlib.SplitID('@', ise.UserID)
if err != nil {
return err
}
// If the event is targeted locally then we want to expand the wildcard
// out into individual device IDs so that we can send them to each respective
// device. If the event isn't targeted locally then we can't expand the
// wildcard as we don't know about the remote devices, so instead we leave it
// as-is, so that the federation sender can send it on with the wildcard intact.
if domain == t.ServerName && ise.DeviceID == "*" {
var res userapi.QueryDevicesResponse
err = t.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{
UserID: ise.UserID,
}, &res)
if err != nil {
return err
}
for _, dev := range res.Devices {
devices = append(devices, dev.ID)
}
} else {
devices = append(devices, ise.DeviceID)
}
logrus.WithFields(logrus.Fields{
"user_id": ise.UserID,
"num_devices": len(devices),
"type": ise.Type,
}).Tracef("Producing to topic '%s'", t.OutputSendToDeviceEventTopic)
for _, device := range devices {
ote := &api.OutputSendToDeviceEvent{
UserID: ise.UserID,
DeviceID: device,
SendToDeviceEvent: ise.SendToDeviceEvent,
}
eventJSON, err := json.Marshal(ote)
if err != nil {
logrus.WithError(err).Error("sendToDevice failed json.Marshal")
return err
}
if _, err = t.JetStream.PublishMsg(&nats.Msg{
Subject: t.OutputSendToDeviceEventTopic,
Data: eventJSON,
}); err != nil {
logrus.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
return err
}
}
return nil
}

View file

@ -13,7 +13,6 @@ import (
// HTTP paths for the internal HTTP APIs // HTTP paths for the internal HTTP APIs
const ( const (
EDUServerInputTypingEventPath = "/eduserver/input" EDUServerInputTypingEventPath = "/eduserver/input"
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
) )
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. // NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
@ -41,16 +40,3 @@ func (h *httpEDUServerInputAPI) InputTypingEvent(
apiURL := h.eduServerURL + EDUServerInputTypingEventPath apiURL := h.eduServerURL + EDUServerInputTypingEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// InputSendToDeviceEvent implements EDUServerInputAPI
func (h *httpEDUServerInputAPI) InputSendToDeviceEvent(
ctx context.Context,
request *api.InputSendToDeviceEventRequest,
response *api.InputSendToDeviceEventResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputSendToDeviceEvent")
defer span.Finish()
apiURL := h.eduServerURL + EDUServerInputSendToDeviceEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -25,17 +25,4 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(EDUServerInputSendToDeviceEventPath,
httputil.MakeInternalAPI("inputSendToDeviceEvents", func(req *http.Request) util.JSONResponse {
var request api.InputSendToDeviceEventRequest
var response api.InputSendToDeviceEventResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := t.InputSendToDeviceEvent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
} }

View file

@ -92,23 +92,23 @@ func (t *OutputEDUConsumer) Start() error {
// onSendToDeviceEvent is called in response to a message received on the // onSendToDeviceEvent is called in response to a message received on the
// send-to-device events topic from the EDU server. // send-to-device events topic from the EDU server.
func (t *OutputEDUConsumer) onSendToDeviceEvent(ctx context.Context, msg *nats.Msg) bool { func (t *OutputEDUConsumer) onSendToDeviceEvent(ctx context.Context, msg *nats.Msg) bool {
// Extract the send-to-device event from msg. sender := msg.Header.Get("sender")
var ote api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &ote); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
return true
}
// only send send-to-device events which originated from us // only send send-to-device events which originated from us
_, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender) _, originServerName, err := gomatrixserverlib.SplitID('@', sender)
if err != nil { if err != nil {
log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender") log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender")
return true return true
} }
if originServerName != t.ServerName { if originServerName != t.ServerName {
log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere") log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere")
return true return true
} }
// Extract the send-to-device event from msg.
var ote api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &ote); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
return true
}
_, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID) _, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
if err != nil { if err != nil {

View file

@ -66,6 +66,7 @@ func AddPublicRoutes(
producer := &producers.SyncAPIProducer{ producer := &producers.SyncAPIProducer{
JetStream: js, JetStream: js,
TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
} }
routing.Setup( routing.Setup(

View file

@ -16,9 +16,12 @@ package producers
import ( import (
"context" "context"
"encoding/json"
"strconv" "strconv"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -27,7 +30,10 @@ import (
// SyncAPIProducer produces events for the sync API server to consume // SyncAPIProducer produces events for the sync API server to consume
type SyncAPIProducer struct { type SyncAPIProducer struct {
TopicReceiptEvent string TopicReceiptEvent string
TopicSendToDeviceEvent string
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
ServerName gomatrixserverlib.ServerName
UserAPI userapi.UserInternalAPI
} }
func (p *SyncAPIProducer) SendReceipt( func (p *SyncAPIProducer) SendReceipt(
@ -48,3 +54,75 @@ func (p *SyncAPIProducer) SendReceipt(
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err return err
} }
func (p *SyncAPIProducer) SendToDevice(
ctx context.Context, sender, userID, deviceID, eventType string,
message interface{},
) error {
devices := []string{}
_, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return err
}
// If the event is targeted locally then we want to expand the wildcard
// out into individual device IDs so that we can send them to each respective
// device. If the event isn't targeted locally then we can't expand the
// wildcard as we don't know about the remote devices, so instead we leave it
// as-is, so that the federation sender can send it on with the wildcard intact.
if domain == p.ServerName && deviceID == "*" {
var res userapi.QueryDevicesResponse
err = p.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{
UserID: userID,
}, &res)
if err != nil {
return err
}
for _, dev := range res.Devices {
devices = append(devices, dev.ID)
}
} else {
devices = append(devices, deviceID)
}
js, err := json.Marshal(message)
if err != nil {
return err
}
log.WithFields(log.Fields{
"user_id": userID,
"num_devices": len(devices),
"type": eventType,
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
for _, device := range devices {
ote := &api.OutputSendToDeviceEvent{
UserID: userID,
DeviceID: device,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
Sender: sender,
Type: eventType,
Content: js,
},
}
eventJSON, err := json.Marshal(ote)
if err != nil {
log.WithError(err).Error("sendToDevice failed json.Marshal")
return err
}
m := &nats.Msg{
Subject: p.TopicSendToDeviceEvent,
Data: eventJSON,
Header: nats.Header{},
}
m.Header.Set("sender", sender)
m.Header.Set(jetstream.UserID, userID)
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
return err
}
}
return nil
}

View file

@ -346,7 +346,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
for userID, byUser := range directPayload.Messages { for userID, byUser := range directPayload.Messages {
for deviceID, message := range byUser { for deviceID, message := range byUser {
// TODO: check that the user and the device actually exist here // TODO: check that the user and the device actually exist here
if err := eduserverAPI.SendToDevice(ctx, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"sender": directPayload.Sender, "sender": directPayload.Sender,
"user_id": userID, "user_id": userID,

View file

@ -67,14 +67,6 @@ func (p *testEDUProducer) InputTypingEvent(
return nil return nil
} }
func (p *testEDUProducer) InputSendToDeviceEvent(
ctx context.Context,
request *eduAPI.InputSendToDeviceEventRequest,
response *eduAPI.InputSendToDeviceEventResponse,
) error {
return nil
}
func (o *testEDUProducer) InputCrossSigningKeyUpdate( func (o *testEDUProducer) InputCrossSigningKeyUpdate(
ctx context.Context, ctx context.Context,
request *eduAPI.InputCrossSigningKeyUpdateRequest, request *eduAPI.InputCrossSigningKeyUpdateRequest,

View file

@ -75,15 +75,8 @@ func (s *OutputSendToDeviceEventConsumer) Start() error {
} }
func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var output api.OutputSendToDeviceEvent userID := msg.Header.Get(jetstream.UserID)
if err := json.Unmarshal(msg.Data, &output); err != nil { _, domain, err := gomatrixserverlib.SplitID('@', userID)
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return true
}
_, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil { if err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
return true return true
@ -92,6 +85,14 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *na
return true return true
} }
var output api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return true
}
util.GetLogger(context.TODO()).WithFields(log.Fields{ util.GetLogger(context.TODO()).WithFields(log.Fields{
"sender": output.Sender, "sender": output.Sender,
"user_id": output.UserID, "user_id": output.UserID,