mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-15 10:03:09 -06:00
Use json.RawMessage directly; try more devices, if available
This commit is contained in:
parent
af9a204cc0
commit
0796e170bd
|
|
@ -21,12 +21,13 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SyncAPIProducer produces events for the sync API server to consume
|
// SyncAPIProducer produces events for the sync API server to consume
|
||||||
|
|
@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
||||||
|
|
||||||
func (p *SyncAPIProducer) SendToDevice(
|
func (p *SyncAPIProducer) SendToDevice(
|
||||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||||
message interface{},
|
message json.RawMessage,
|
||||||
) error {
|
) error {
|
||||||
devices := []string{}
|
devices := []string{}
|
||||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
|
|
@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
devices = append(devices, deviceID)
|
devices = append(devices, deviceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
js, err := json.Marshal(message)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"user_id": userID,
|
"user_id": userID,
|
||||||
"num_devices": len(devices),
|
"num_devices": len(devices),
|
||||||
"type": eventType,
|
"type": eventType,
|
||||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||||
for _, device := range devices {
|
for i, device := range devices {
|
||||||
ote := &types.OutputSendToDeviceEvent{
|
ote := &types.OutputSendToDeviceEvent{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
DeviceID: device,
|
DeviceID: device,
|
||||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||||
Sender: sender,
|
Sender: sender,
|
||||||
Type: eventType,
|
Type: eventType,
|
||||||
Content: js,
|
Content: message,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m := &nats.Msg{
|
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||||
Subject: p.TopicSendToDeviceEvent,
|
m.Data = eventJSON
|
||||||
Data: eventJSON,
|
|
||||||
Header: nats.Header{},
|
|
||||||
}
|
|
||||||
m.Header.Set("sender", sender)
|
m.Header.Set("sender", sender)
|
||||||
m.Header.Set(jetstream.UserID, userID)
|
m.Header.Set(jetstream.UserID, userID)
|
||||||
|
|
||||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
if i < len(devices)-1 {
|
||||||
|
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
||||||
|
|
||||||
func (p *SyncAPIProducer) SendToDevice(
|
func (p *SyncAPIProducer) SendToDevice(
|
||||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||||
message interface{},
|
message json.RawMessage,
|
||||||
) error {
|
) error {
|
||||||
devices := []string{}
|
devices := []string{}
|
||||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
|
|
@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
devices = append(devices, deviceID)
|
devices = append(devices, deviceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
js, err := json.Marshal(message)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"user_id": userID,
|
"user_id": userID,
|
||||||
"num_devices": len(devices),
|
"num_devices": len(devices),
|
||||||
"type": eventType,
|
"type": eventType,
|
||||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||||
for _, device := range devices {
|
for i, device := range devices {
|
||||||
ote := &types.OutputSendToDeviceEvent{
|
ote := &types.OutputSendToDeviceEvent{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
DeviceID: device,
|
DeviceID: device,
|
||||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||||
Sender: sender,
|
Sender: sender,
|
||||||
Type: eventType,
|
Type: eventType,
|
||||||
Content: js,
|
Content: message,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m := &nats.Msg{
|
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||||
Subject: p.TopicSendToDeviceEvent,
|
m.Data = eventJSON
|
||||||
Data: eventJSON,
|
|
||||||
Header: nats.Header{},
|
|
||||||
}
|
|
||||||
m.Header.Set("sender", sender)
|
m.Header.Set("sender", sender)
|
||||||
m.Header.Set(jetstream.UserID, userID)
|
m.Header.Set(jetstream.UserID, userID)
|
||||||
|
|
||||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
if i < len(devices)-1 {
|
||||||
|
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue