Implement wildcard

This commit is contained in:
Neil Alexander 2020-05-29 16:59:32 +01:00
parent 43a83d375e
commit 4664f02b4d
3 changed files with 21 additions and 3 deletions

View file

@ -19,6 +19,7 @@ import (
"encoding/json"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@ -34,6 +35,7 @@ import (
type OutputSendToDeviceEventConsumer struct {
sendToDeviceConsumer *internal.ContinualConsumer
db storage.Database
deviceDB devices.Database
serverName gomatrixserverlib.ServerName // our server name
notifier *sync.Notifier
}
@ -45,6 +47,7 @@ func NewOutputSendToDeviceEventConsumer(
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
deviceDB devices.Database,
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
@ -56,6 +59,7 @@ func NewOutputSendToDeviceEventConsumer(
s := &OutputSendToDeviceEventConsumer{
sendToDeviceConsumer: &consumer,
db: store,
deviceDB: deviceDB,
serverName: cfg.Matrix.ServerName,
notifier: n,
}
@ -78,7 +82,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
return err
}
_, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
localpart, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
return err
}
@ -103,9 +107,22 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
return err
}
devices := []string{}
if output.DeviceID == "*" {
devs, err := s.deviceDB.GetDevicesByLocalpart(context.TODO(), localpart)
if err != nil {
return err
}
for _, dev := range devs {
devices = append(devices, dev.ID)
}
} else {
devices = append(devices, output.DeviceID)
}
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID}, // TODO: support wildcard here as per spec
devices,
types.NewStreamToken(0, streamPos),
)

View file

@ -82,7 +82,7 @@ func SetupSyncAPIComponent(
}
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
base.Cfg, base.KafkaConsumer, notifier, syncDB,
base.Cfg, base.KafkaConsumer, notifier, syncDB, deviceDB,
)
if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer")

View file

@ -295,3 +295,4 @@ Can send a to-device message to two users which both receive it using /sync
Can recv device messages until they are acknowledged
Device messages wake up /sync
Device messages over federation wake up /sync
Wildcard device messages wake up /sync