diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 1a2bf3833..cb4094453 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -54,9 +54,11 @@ func AddPublicRoutes( js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ - JetStream: js, - TopicClientData: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), - TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + JetStream: js, + TopicClientData: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + TopicSendToDeviceEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + UserAPI: userAPI, } routing.Setup( diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 034fbb317..128eaeb35 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -19,8 +19,10 @@ import ( "encoding/json" "strconv" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/jetstream" + userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" @@ -28,9 +30,12 @@ import ( // SyncAPIProducer produces events for the sync API server to consume type SyncAPIProducer struct { - TopicClientData string - TopicReceiptEvent string - JetStream nats.JetStreamContext + TopicClientData string + TopicReceiptEvent string + TopicSendToDeviceEvent string + JetStream nats.JetStreamContext + ServerName gomatrixserverlib.ServerName + UserAPI userapi.UserInternalAPI } // SendData sends account data to the sync API server @@ -80,3 +85,74 @@ func (p *SyncAPIProducer) SendReceipt( _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) return err } + +func (p *SyncAPIProducer) SendToDevice( + ctx context.Context, sender, userID, deviceID, eventType string, + message interface{}, +) error { + devices := []string{} + _, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return err + } + + // If the event is targeted locally then we want to expand the wildcard + // out into individual device IDs so that we can send them to each respective + // device. If the event isn't targeted locally then we can't expand the + // wildcard as we don't know about the remote devices, so instead we leave it + // as-is, so that the federation sender can send it on with the wildcard intact. + if domain == p.ServerName && deviceID == "*" { + var res userapi.QueryDevicesResponse + err = p.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{ + UserID: userID, + }, &res) + if err != nil { + return err + } + for _, dev := range res.Devices { + devices = append(devices, dev.ID) + } + } else { + devices = append(devices, deviceID) + } + + js, err := json.Marshal(message) + if err != nil { + return err + } + + log.WithFields(log.Fields{ + "user_id": userID, + "num_devices": len(devices), + "type": eventType, + }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) + for _, device := range devices { + ote := &api.OutputSendToDeviceEvent{ + UserID: userID, + DeviceID: device, + SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ + Sender: sender, + Type: eventType, + Content: js, + }, + } + + eventJSON, err := json.Marshal(ote) + if err != nil { + log.WithError(err).Error("sendToDevice failed json.Marshal") + return err + } + m := &nats.Msg{ + Subject: p.TopicSendToDeviceEvent, + Data: eventJSON, + Header: nats.Header{}, + } + m.Header.Set("sender", sender) + m.Header.Set(jetstream.UserID, userID) + if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { + log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + return err + } + } + return nil +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index a53328ec6..b2178f2de 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -497,7 +497,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, syncProducer, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -511,7 +511,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, syncProducer, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go index 768e8e0e7..9aa15c1be 100644 --- a/clientapi/routing/sendtodevice.go +++ b/clientapi/routing/sendtodevice.go @@ -18,7 +18,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal/transactions" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/util" @@ -28,7 +28,7 @@ import ( // sends the device events to the EDU Server func SendToDevice( req *http.Request, device *userapi.Device, - eduAPI api.EDUServerInputAPI, + syncProducer *producers.SyncAPIProducer, txnCache *transactions.Cache, eventType string, txnID *string, ) util.JSONResponse { @@ -48,8 +48,8 @@ func SendToDevice( for userID, byUser := range httpReq.Messages { for deviceID, message := range byUser { - if err := api.SendToDevice( - req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message, + if err := syncProducer.SendToDevice( + req.Context(), device.UserID, userID, deviceID, eventType, message, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed") return jsonerror.InternalServerError() diff --git a/eduserver/api/input.go b/eduserver/api/input.go index 85c367d78..19ddc87ad 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -37,12 +37,6 @@ type InputTypingEvent struct { OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"` } -type InputSendToDeviceEvent struct { - UserID string `json:"user_id"` - DeviceID string `json:"device_id"` - gomatrixserverlib.SendToDeviceEvent -} - // InputTypingEventRequest is a request to EDUServerInputAPI type InputTypingEventRequest struct { InputTypingEvent InputTypingEvent `json:"input_typing_event"` @@ -51,14 +45,6 @@ type InputTypingEventRequest struct { // InputTypingEventResponse is a response to InputTypingEvents type InputTypingEventResponse struct{} -// InputSendToDeviceEventRequest is a request to EDUServerInputAPI -type InputSendToDeviceEventRequest struct { - InputSendToDeviceEvent InputSendToDeviceEvent `json:"input_send_to_device_event"` -} - -// InputSendToDeviceEventResponse is a response to InputSendToDeviceEventRequest -type InputSendToDeviceEventResponse struct{} - type InputCrossSigningKeyUpdateRequest struct { CrossSigningKeyUpdate `json:"signing_keys"` } @@ -72,10 +58,4 @@ type EDUServerInputAPI interface { request *InputTypingEventRequest, response *InputTypingEventResponse, ) error - - InputSendToDeviceEvent( - ctx context.Context, - request *InputSendToDeviceEventRequest, - response *InputSendToDeviceEventResponse, - ) error } diff --git a/eduserver/api/wrapper.go b/eduserver/api/wrapper.go index c2c4596de..a77983b49 100644 --- a/eduserver/api/wrapper.go +++ b/eduserver/api/wrapper.go @@ -16,7 +16,6 @@ package api import ( "context" - "encoding/json" "time" "github.com/matrix-org/gomatrixserverlib" @@ -42,28 +41,3 @@ func SendTyping( return err } - -// SendToDevice sends a typing event to EDU server -func SendToDevice( - ctx context.Context, eduAPI EDUServerInputAPI, sender, userID, deviceID, eventType string, - message interface{}, -) error { - js, err := json.Marshal(message) - if err != nil { - return err - } - requestData := InputSendToDeviceEvent{ - UserID: userID, - DeviceID: deviceID, - SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ - Sender: sender, - Type: eventType, - Content: js, - }, - } - request := InputSendToDeviceEventRequest{ - InputSendToDeviceEvent: requestData, - } - response := InputSendToDeviceEventResponse{} - return eduAPI.InputSendToDeviceEvent(ctx, &request, &response) -} diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 88f6ebef7..6ba1b462a 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -45,11 +45,10 @@ func NewInternalAPI( js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) return &input.EDUServerInputAPI{ - Cache: eduCache, - UserAPI: userAPI, - JetStream: js, - OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), - OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), - ServerName: cfg.Matrix.ServerName, + Cache: eduCache, + UserAPI: userAPI, + JetStream: js, + OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + ServerName: cfg.Matrix.ServerName, } } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index ad8ac320c..9f410ffd4 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -35,8 +35,6 @@ type EDUServerInputAPI struct { Cache *cache.EDUCache // The kafka topic to output new typing events to. OutputTypingEventTopic string - // The kafka topic to output new send to device events to. - OutputSendToDeviceEventTopic string // kafka producer JetStream nats.JetStreamContext // Internal user query API @@ -65,16 +63,6 @@ func (t *EDUServerInputAPI) InputTypingEvent( return t.sendTypingEvent(ite) } -// InputTypingEvent implements api.EDUServerInputAPI -func (t *EDUServerInputAPI) InputSendToDeviceEvent( - ctx context.Context, - request *api.InputSendToDeviceEventRequest, - response *api.InputSendToDeviceEventResponse, -) error { - ise := &request.InputSendToDeviceEvent - return t.sendToDeviceEvent(ise) -} - func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { ev := &api.TypingEvent{ Type: gomatrixserverlib.MTyping, @@ -110,60 +98,3 @@ func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { }) return err } - -func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error { - devices := []string{} - _, domain, err := gomatrixserverlib.SplitID('@', ise.UserID) - if err != nil { - return err - } - - // If the event is targeted locally then we want to expand the wildcard - // out into individual device IDs so that we can send them to each respective - // device. If the event isn't targeted locally then we can't expand the - // wildcard as we don't know about the remote devices, so instead we leave it - // as-is, so that the federation sender can send it on with the wildcard intact. - if domain == t.ServerName && ise.DeviceID == "*" { - var res userapi.QueryDevicesResponse - err = t.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{ - UserID: ise.UserID, - }, &res) - if err != nil { - return err - } - for _, dev := range res.Devices { - devices = append(devices, dev.ID) - } - } else { - devices = append(devices, ise.DeviceID) - } - - logrus.WithFields(logrus.Fields{ - "user_id": ise.UserID, - "num_devices": len(devices), - "type": ise.Type, - }).Tracef("Producing to topic '%s'", t.OutputSendToDeviceEventTopic) - for _, device := range devices { - ote := &api.OutputSendToDeviceEvent{ - UserID: ise.UserID, - DeviceID: device, - SendToDeviceEvent: ise.SendToDeviceEvent, - } - - eventJSON, err := json.Marshal(ote) - if err != nil { - logrus.WithError(err).Error("sendToDevice failed json.Marshal") - return err - } - - if _, err = t.JetStream.PublishMsg(&nats.Msg{ - Subject: t.OutputSendToDeviceEventTopic, - Data: eventJSON, - }); err != nil { - logrus.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") - return err - } - } - - return nil -} diff --git a/eduserver/inthttp/client.go b/eduserver/inthttp/client.go index 7d0bc1603..0e312e960 100644 --- a/eduserver/inthttp/client.go +++ b/eduserver/inthttp/client.go @@ -12,8 +12,7 @@ import ( // HTTP paths for the internal HTTP APIs const ( - EDUServerInputTypingEventPath = "/eduserver/input" - EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" + EDUServerInputTypingEventPath = "/eduserver/input" ) // NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. @@ -41,16 +40,3 @@ func (h *httpEDUServerInputAPI) InputTypingEvent( apiURL := h.eduServerURL + EDUServerInputTypingEventPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } - -// InputSendToDeviceEvent implements EDUServerInputAPI -func (h *httpEDUServerInputAPI) InputSendToDeviceEvent( - ctx context.Context, - request *api.InputSendToDeviceEventRequest, - response *api.InputSendToDeviceEventResponse, -) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "InputSendToDeviceEvent") - defer span.Finish() - - apiURL := h.eduServerURL + EDUServerInputSendToDeviceEventPath - return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) -} diff --git a/eduserver/inthttp/server.go b/eduserver/inthttp/server.go index e374513a3..85b9abba4 100644 --- a/eduserver/inthttp/server.go +++ b/eduserver/inthttp/server.go @@ -25,17 +25,4 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) - internalAPIMux.Handle(EDUServerInputSendToDeviceEventPath, - httputil.MakeInternalAPI("inputSendToDeviceEvents", func(req *http.Request) util.JSONResponse { - var request api.InputSendToDeviceEventRequest - var response api.InputSendToDeviceEventResponse - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - if err := t.InputSendToDeviceEvent(req.Context(), &request, &response); err != nil { - return util.ErrorResponse(err) - } - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) } diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go index 8ea31b68a..c121d21a5 100644 --- a/federationapi/consumers/eduserver.go +++ b/federationapi/consumers/eduserver.go @@ -92,23 +92,23 @@ func (t *OutputEDUConsumer) Start() error { // onSendToDeviceEvent is called in response to a message received on the // send-to-device events topic from the EDU server. func (t *OutputEDUConsumer) onSendToDeviceEvent(ctx context.Context, msg *nats.Msg) bool { - // Extract the send-to-device event from msg. - var ote api.OutputSendToDeviceEvent - if err := json.Unmarshal(msg.Data, &ote); err != nil { - log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)") - return true - } - + sender := msg.Header.Get("sender") // only send send-to-device events which originated from us - _, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender) + _, originServerName, err := gomatrixserverlib.SplitID('@', sender) if err != nil { - log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender") + log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender") return true } if originServerName != t.ServerName { log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere") return true } + // Extract the send-to-device event from msg. + var ote api.OutputSendToDeviceEvent + if err := json.Unmarshal(msg.Data, &ote); err != nil { + log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)") + return true + } _, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID) if err != nil { diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 057d76e8d..22905d374 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -64,8 +64,9 @@ func AddPublicRoutes( js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) producer := &producers.SyncAPIProducer{ - JetStream: js, - TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + JetStream: js, + TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + TopicSendToDeviceEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), } routing.Setup( diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go index 5d05f36fd..61bc050d9 100644 --- a/federationapi/producers/syncapi.go +++ b/federationapi/producers/syncapi.go @@ -16,9 +16,12 @@ package producers import ( "context" + "encoding/json" "strconv" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/setup/jetstream" + userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" @@ -26,8 +29,11 @@ import ( // SyncAPIProducer produces events for the sync API server to consume type SyncAPIProducer struct { - TopicReceiptEvent string - JetStream nats.JetStreamContext + TopicReceiptEvent string + TopicSendToDeviceEvent string + JetStream nats.JetStreamContext + ServerName gomatrixserverlib.ServerName + UserAPI userapi.UserInternalAPI } func (p *SyncAPIProducer) SendReceipt( @@ -48,3 +54,75 @@ func (p *SyncAPIProducer) SendReceipt( _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) return err } + +func (p *SyncAPIProducer) SendToDevice( + ctx context.Context, sender, userID, deviceID, eventType string, + message interface{}, +) error { + devices := []string{} + _, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return err + } + + // If the event is targeted locally then we want to expand the wildcard + // out into individual device IDs so that we can send them to each respective + // device. If the event isn't targeted locally then we can't expand the + // wildcard as we don't know about the remote devices, so instead we leave it + // as-is, so that the federation sender can send it on with the wildcard intact. + if domain == p.ServerName && deviceID == "*" { + var res userapi.QueryDevicesResponse + err = p.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{ + UserID: userID, + }, &res) + if err != nil { + return err + } + for _, dev := range res.Devices { + devices = append(devices, dev.ID) + } + } else { + devices = append(devices, deviceID) + } + + js, err := json.Marshal(message) + if err != nil { + return err + } + + log.WithFields(log.Fields{ + "user_id": userID, + "num_devices": len(devices), + "type": eventType, + }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) + for _, device := range devices { + ote := &api.OutputSendToDeviceEvent{ + UserID: userID, + DeviceID: device, + SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ + Sender: sender, + Type: eventType, + Content: js, + }, + } + + eventJSON, err := json.Marshal(ote) + if err != nil { + log.WithError(err).Error("sendToDevice failed json.Marshal") + return err + } + m := &nats.Msg{ + Subject: p.TopicSendToDeviceEvent, + Data: eventJSON, + Header: nats.Header{}, + } + m.Header.Set("sender", sender) + m.Header.Set(jetstream.UserID, userID) + + if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { + log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + return err + } + } + return nil +} diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 95f2bc4cc..0dea23f67 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -346,7 +346,7 @@ func (t *txnReq) processEDUs(ctx context.Context) { for userID, byUser := range directPayload.Messages { for deviceID, message := range byUser { // TODO: check that the user and the device actually exist here - if err := eduserverAPI.SendToDevice(ctx, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { + if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ "sender": directPayload.Sender, "user_id": userID, diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index cd9f8e454..c41be548a 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -67,14 +67,6 @@ func (p *testEDUProducer) InputTypingEvent( return nil } -func (p *testEDUProducer) InputSendToDeviceEvent( - ctx context.Context, - request *eduAPI.InputSendToDeviceEventRequest, - response *eduAPI.InputSendToDeviceEventResponse, -) error { - return nil -} - func (o *testEDUProducer) InputCrossSigningKeyUpdate( ctx context.Context, request *eduAPI.InputCrossSigningKeyUpdateRequest, diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/sendtodevice.go similarity index 97% rename from syncapi/consumers/eduserver_sendtodevice.go rename to syncapi/consumers/sendtodevice.go index b0beef063..75c36bf09 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/sendtodevice.go @@ -75,15 +75,8 @@ func (s *OutputSendToDeviceEventConsumer) Start() error { } func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var output api.OutputSendToDeviceEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) - return true - } - - _, domain, err := gomatrixserverlib.SplitID('@', output.UserID) + userID := msg.Header.Get(jetstream.UserID) + _, domain, err := gomatrixserverlib.SplitID('@', userID) if err != nil { sentry.CaptureException(err) return true @@ -92,6 +85,14 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *na return true } + var output api.OutputSendToDeviceEvent + if err := json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("EDU server output log: message parse failure") + sentry.CaptureException(err) + return true + } + util.GetLogger(context.TODO()).WithFields(log.Fields{ "sender": output.Sender, "user_id": output.UserID,