mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-01-18 01:44:27 -06:00
Send-to-device consumer/producer tweaks (#2713)
Some tweaks for the send-to-device consumers/producers: - use `json.RawMessage` without marshalling it first - try further devices (if available) if we failed to `PublishMsg` in the producers - some logging changes (to better debug E2EE issues)
This commit is contained in:
parent
100fa9b235
commit
c366ccdfca
|
@ -21,12 +21,13 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
)
|
||||
|
||||
// SyncAPIProducer produces events for the sync API server to consume
|
||||
|
@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
|||
|
||||
func (p *SyncAPIProducer) SendToDevice(
|
||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||
message interface{},
|
||||
message json.RawMessage,
|
||||
) error {
|
||||
devices := []string{}
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
|
@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
devices = append(devices, deviceID)
|
||||
}
|
||||
|
||||
js, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"user_id": userID,
|
||||
"num_devices": len(devices),
|
||||
"type": eventType,
|
||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||
for _, device := range devices {
|
||||
for i, device := range devices {
|
||||
ote := &types.OutputSendToDeviceEvent{
|
||||
UserID: userID,
|
||||
DeviceID: device,
|
||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||
Sender: sender,
|
||||
Type: eventType,
|
||||
Content: js,
|
||||
Content: message,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||
return err
|
||||
}
|
||||
m := &nats.Msg{
|
||||
Subject: p.TopicSendToDeviceEvent,
|
||||
Data: eventJSON,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||
m.Data = eventJSON
|
||||
m.Header.Set("sender", sender)
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
|
||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
||||
if i < len(devices)-1 {
|
||||
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||
continue
|
||||
}
|
||||
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
|||
|
||||
func (p *SyncAPIProducer) SendToDevice(
|
||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||
message interface{},
|
||||
message json.RawMessage,
|
||||
) error {
|
||||
devices := []string{}
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
|
@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
devices = append(devices, deviceID)
|
||||
}
|
||||
|
||||
js, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"user_id": userID,
|
||||
"num_devices": len(devices),
|
||||
"type": eventType,
|
||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||
for _, device := range devices {
|
||||
for i, device := range devices {
|
||||
ote := &types.OutputSendToDeviceEvent{
|
||||
UserID: userID,
|
||||
DeviceID: device,
|
||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||
Sender: sender,
|
||||
Type: eventType,
|
||||
Content: js,
|
||||
Content: message,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||
return err
|
||||
}
|
||||
m := &nats.Msg{
|
||||
Subject: p.TopicSendToDeviceEvent,
|
||||
Data: eventJSON,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||
m.Data = eventJSON
|
||||
m.Header.Set("sender", sender)
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
|
||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
||||
if i < len(devices)-1 {
|
||||
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||
continue
|
||||
}
|
||||
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,16 +19,17 @@ import (
|
|||
"encoding/json"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
|
||||
|
@ -79,16 +80,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
|||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message")
|
||||
return true
|
||||
}
|
||||
if domain != s.serverName {
|
||||
log.Tracef("ignoring send-to-device event with destination %s", domain)
|
||||
return true
|
||||
}
|
||||
|
||||
var output types.OutputSendToDeviceEvent
|
||||
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("output log: message parse failure")
|
||||
log.WithError(err).Errorf("send-to-device: message parse failure")
|
||||
sentry.CaptureException(err)
|
||||
return true
|
||||
}
|
||||
|
@ -105,7 +108,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
|||
)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).Errorf("failed to store send-to-device message")
|
||||
log.WithError(err).Errorf("send-to-device: failed to store message")
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -624,9 +624,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
|
|||
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
|
||||
for i := 0; i < tc.sendMessagesCount; i++ {
|
||||
msgCounter++
|
||||
msg := map[string]string{
|
||||
"dummy": fmt.Sprintf("message %d", msgCounter),
|
||||
}
|
||||
msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
|
||||
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
|
||||
t.Fatalf("unable to send to device message: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue