diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 5933ce1a8..2dc0c4843 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -21,12 +21,13 @@ import ( "strconv" "time" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/syncapi/types" - userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" ) // SyncAPIProducer produces events for the sync API server to consume @@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt( func (p *SyncAPIProducer) SendToDevice( ctx context.Context, sender, userID, deviceID, eventType string, - message interface{}, + message json.RawMessage, ) error { devices := []string{} _, domain, err := gomatrixserverlib.SplitID('@', userID) @@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice( devices = append(devices, deviceID) } - js, err := json.Marshal(message) - if err != nil { - return err - } - log.WithFields(log.Fields{ "user_id": userID, "num_devices": len(devices), "type": eventType, }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) - for _, device := range devices { + for i, device := range devices { ote := &types.OutputSendToDeviceEvent{ UserID: userID, DeviceID: device, SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ Sender: sender, Type: eventType, - Content: js, + Content: message, }, } @@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice( log.WithError(err).Error("sendToDevice failed json.Marshal") return err } - m := &nats.Msg{ - Subject: p.TopicSendToDeviceEvent, - Data: eventJSON, - Header: nats.Header{}, - } + m := nats.NewMsg(p.TopicSendToDeviceEvent) + m.Data = eventJSON m.Header.Set("sender", sender) m.Header.Set(jetstream.UserID, userID) + if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { - log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + if i < len(devices)-1 { + log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices") + continue + } + log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices") return err } } diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go index 86c8c10a3..4abd3fbe5 100644 --- a/federationapi/producers/syncapi.go +++ b/federationapi/producers/syncapi.go @@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt( func (p *SyncAPIProducer) SendToDevice( ctx context.Context, sender, userID, deviceID, eventType string, - message interface{}, + message json.RawMessage, ) error { devices := []string{} _, domain, err := gomatrixserverlib.SplitID('@', userID) @@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice( devices = append(devices, deviceID) } - js, err := json.Marshal(message) - if err != nil { - return err - } - log.WithFields(log.Fields{ "user_id": userID, "num_devices": len(devices), "type": eventType, }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) - for _, device := range devices { + for i, device := range devices { ote := &types.OutputSendToDeviceEvent{ UserID: userID, DeviceID: device, SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ Sender: sender, Type: eventType, - Content: js, + Content: message, }, } @@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice( log.WithError(err).Error("sendToDevice failed json.Marshal") return err } - m := &nats.Msg{ - Subject: p.TopicSendToDeviceEvent, - Data: eventJSON, - Header: nats.Header{}, - } + m := nats.NewMsg(p.TopicSendToDeviceEvent) + m.Data = eventJSON m.Header.Set("sender", sender) m.Header.Set(jetstream.UserID, userID) if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { - log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + if i < len(devices)-1 { + log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices") + continue + } + log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices") return err } } diff --git a/syncapi/consumers/sendtodevice.go b/syncapi/consumers/sendtodevice.go index 89b01d7e5..7d6aae597 100644 --- a/syncapi/consumers/sendtodevice.go +++ b/syncapi/consumers/sendtodevice.go @@ -19,16 +19,17 @@ import ( "encoding/json" "github.com/getsentry/sentry-go" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/nats-io/nats.go" - log "github.com/sirupsen/logrus" ) // OutputSendToDeviceEventConsumer consumes events that originated in the EDU server. @@ -79,16 +80,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs [] _, domain, err := gomatrixserverlib.SplitID('@', userID) if err != nil { sentry.CaptureException(err) + log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message") return true } if domain != s.serverName { + log.Tracef("ignoring send-to-device event with destination %s", domain) return true } var output types.OutputSendToDeviceEvent if err = json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("output log: message parse failure") + log.WithError(err).Errorf("send-to-device: message parse failure") sentry.CaptureException(err) return true } @@ -105,7 +108,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs [] ) if err != nil { sentry.CaptureException(err) - log.WithError(err).Errorf("failed to store send-to-device message") + log.WithError(err).Errorf("send-to-device: failed to store message") return false } diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index c81256aa7..a4985dbf4 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -624,9 +624,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { // Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}` for i := 0; i < tc.sendMessagesCount; i++ { msgCounter++ - msg := map[string]string{ - "dummy": fmt.Sprintf("message %d", msgCounter), - } + msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter)) if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil { t.Fatalf("unable to send to device message: %v", err) }