From 4664f02b4d43c94441fa9822409bcd125f6b6363 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 29 May 2020 16:59:32 +0100 Subject: [PATCH] Implement wildcard --- syncapi/consumers/eduserver_sendtodevice.go | 21 +++++++++++++++++++-- syncapi/syncapi.go | 2 +- sytest-whitelist | 1 + 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 40257ed56..ab91cc119 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -19,6 +19,7 @@ import ( "encoding/json" "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -34,6 +35,7 @@ import ( type OutputSendToDeviceEventConsumer struct { sendToDeviceConsumer *internal.ContinualConsumer db storage.Database + deviceDB devices.Database serverName gomatrixserverlib.ServerName // our server name notifier *sync.Notifier } @@ -45,6 +47,7 @@ func NewOutputSendToDeviceEventConsumer( kafkaConsumer sarama.Consumer, n *sync.Notifier, store storage.Database, + deviceDB devices.Database, ) *OutputSendToDeviceEventConsumer { consumer := internal.ContinualConsumer{ @@ -56,6 +59,7 @@ func NewOutputSendToDeviceEventConsumer( s := &OutputSendToDeviceEventConsumer{ sendToDeviceConsumer: &consumer, db: store, + deviceDB: deviceDB, serverName: cfg.Matrix.ServerName, notifier: n, } @@ -78,7 +82,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } - _, domain, err := gomatrixserverlib.SplitID('@', output.UserID) + localpart, domain, err := gomatrixserverlib.SplitID('@', output.UserID) if err != nil { return err } @@ -103,9 +107,22 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } + devices := []string{} + if output.DeviceID == "*" { + devs, err := s.deviceDB.GetDevicesByLocalpart(context.TODO(), localpart) + if err != nil { + return err + } + for _, dev := range devs { + devices = append(devices, dev.ID) + } + } else { + devices = append(devices, output.DeviceID) + } + s.notifier.OnNewSendToDevice( output.UserID, - []string{output.DeviceID}, // TODO: support wildcard here as per spec + devices, types.NewStreamToken(0, streamPos), ) diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 762f4e9d3..a7b0a1045 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -82,7 +82,7 @@ func SetupSyncAPIComponent( } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - base.Cfg, base.KafkaConsumer, notifier, syncDB, + base.Cfg, base.KafkaConsumer, notifier, syncDB, deviceDB, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer") diff --git a/sytest-whitelist b/sytest-whitelist index 56ed2c183..22bb0b27c 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -295,3 +295,4 @@ Can send a to-device message to two users which both receive it using /sync Can recv device messages until they are acknowledged Device messages wake up /sync Device messages over federation wake up /sync +Wildcard device messages wake up /sync