From 5cf900428c51575b01e9cbe44fde36050eb6c5e9 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 29 May 2020 17:37:13 +0100 Subject: [PATCH] Only care about wildcard when targeted locally --- cmd/dendrite-client-api-server/main.go | 2 +- cmd/dendrite-demo-libp2p/main.go | 2 +- cmd/dendrite-edu-server/main.go | 3 +- cmd/dendrite-federation-api-server/main.go | 2 +- cmd/dendrite-monolith-server/main.go | 2 +- eduserver/eduserver.go | 4 ++ eduserver/input/input.go | 71 ++++++++++++++------- syncapi/consumers/eduserver_sendtodevice.go | 21 +----- syncapi/syncapi.go | 2 +- sytest-whitelist | 1 + 10 files changed, 63 insertions(+), 47 deletions(-) diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go index e06adf8ff..f919243de 100644 --- a/cmd/dendrite-client-api-server/main.go +++ b/cmd/dendrite-client-api-server/main.go @@ -39,7 +39,7 @@ func main() { rsAPI := base.CreateHTTPRoomserverAPIs() fsAPI := base.CreateHTTPFederationSenderAPIs() rsAPI.SetFederationSenderAPI(fsAPI) - eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New(), deviceDB) clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, federation, keyRing, diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index fc56b9bbf..e9d01fd95 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -148,7 +148,7 @@ func main() { &base.Base, keyRing, federation, ) eduInputAPI := eduserver.SetupEDUServerComponent( - &base.Base, cache.New(), + &base.Base, cache.New(), deviceDB, ) asAPI := appservice.SetupAppServiceAPIComponent( &base.Base, accountDB, deviceDB, federation, rsAPI, transactions.New(), diff --git a/cmd/dendrite-edu-server/main.go b/cmd/dendrite-edu-server/main.go index 66e17e577..ca0460f8d 100644 --- a/cmd/dendrite-edu-server/main.go +++ b/cmd/dendrite-edu-server/main.go @@ -29,8 +29,9 @@ func main() { logrus.WithError(err).Warn("BaseDendrite close failed") } }() + deviceDB := base.CreateDeviceDB() - eduserver.SetupEDUServerComponent(base, cache.New()) + eduserver.SetupEDUServerComponent(base, cache.New(), deviceDB) base.SetupAndServeHTTP(string(base.Cfg.Bind.EDUServer), string(base.Cfg.Listen.EDUServer)) diff --git a/cmd/dendrite-federation-api-server/main.go b/cmd/dendrite-federation-api-server/main.go index 5425d117f..af63b549a 100644 --- a/cmd/dendrite-federation-api-server/main.go +++ b/cmd/dendrite-federation-api-server/main.go @@ -39,7 +39,7 @@ func main() { rsAPI := base.CreateHTTPRoomserverAPIs() asAPI := base.CreateHTTPAppServiceAPIs() rsAPI.SetFederationSenderAPI(fsAPI) - eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New(), deviceDB) eduProducer := producers.NewEDUServerProducer(eduInputAPI) federationapi.SetupFederationAPIComponent( diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 8367cd9da..ef114ccd1 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -87,7 +87,7 @@ func main() { } eduInputAPI := eduserver.SetupEDUServerComponent( - base, cache.New(), + base, cache.New(), deviceDB, ) if base.EnableHTTPAPIs { eduInputAPI = base.CreateHTTPEDUServerAPIs() diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index d8faaa3e1..c78ee12b6 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -13,6 +13,7 @@ package eduserver import ( + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/input" @@ -26,12 +27,15 @@ import ( func SetupEDUServerComponent( base *basecomponent.BaseDendrite, eduCache *cache.EDUCache, + deviceDB devices.Database, ) api.EDUServerInputAPI { inputAPI := &input.EDUServerInputAPI{ Cache: eduCache, + DeviceDB: deviceDB, Producer: base.KafkaProducer, OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), OutputSendToDeviceEventTopic: string(base.Cfg.Kafka.Topics.OutputSendToDeviceEventTopic), + ServerName: base.Cfg.Matrix.ServerName, } inputAPI.SetupHTTP(base.InternalAPIMux) diff --git a/eduserver/input/input.go b/eduserver/input/input.go index a90461672..e5ee1af58 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -20,6 +20,7 @@ import ( "github.com/Shopify/sarama" "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/internal" @@ -38,6 +39,10 @@ type EDUServerInputAPI struct { OutputSendToDeviceEventTopic string // kafka producer Producer sarama.SyncProducer + // device database + DeviceDB devices.Database + // our server name + ServerName gomatrixserverlib.ServerName } // InputTypingEvent implements api.EDUServerInputAPI @@ -104,35 +109,57 @@ func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { } func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error { - ote := &api.OutputSendToDeviceEvent{ - UserID: ise.UserID, - DeviceID: ise.DeviceID, - SendToDeviceEvent: ise.SendToDeviceEvent, - } - - logrus.WithFields(logrus.Fields{ - "user_id": ise.UserID, - "device_id": ise.DeviceID, - "event_type": ise.Type, - }).Info("handling send-to-device message") - - eventJSON, err := json.Marshal(ote) + devices := []string{} + localpart, domain, err := gomatrixserverlib.SplitID('@', ise.UserID) if err != nil { - logrus.WithError(err).Error("sendToDevice failed json.Marshal") return err } - m := &sarama.ProducerMessage{ - Topic: string(t.OutputSendToDeviceEventTopic), - Key: sarama.StringEncoder(ote.UserID), - Value: sarama.ByteEncoder(eventJSON), + if domain == t.ServerName && ise.DeviceID == "*" { + devs, err := t.DeviceDB.GetDevicesByLocalpart(context.TODO(), localpart) + if err != nil { + return err + } + for _, dev := range devs { + devices = append(devices, dev.ID) + } + } else { + devices = append(devices, ise.DeviceID) } - _, _, err = t.Producer.SendMessage(m) - if err != nil { - logrus.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + for _, device := range devices { + ote := &api.OutputSendToDeviceEvent{ + UserID: ise.UserID, + DeviceID: device, + SendToDeviceEvent: ise.SendToDeviceEvent, + } + + logrus.WithFields(logrus.Fields{ + "user_id": ise.UserID, + "device_id": ise.DeviceID, + "event_type": ise.Type, + }).Info("handling send-to-device message") + + eventJSON, err := json.Marshal(ote) + if err != nil { + logrus.WithError(err).Error("sendToDevice failed json.Marshal") + return err + } + + m := &sarama.ProducerMessage{ + Topic: string(t.OutputSendToDeviceEventTopic), + Key: sarama.StringEncoder(ote.UserID), + Value: sarama.ByteEncoder(eventJSON), + } + + _, _, err = t.Producer.SendMessage(m) + if err != nil { + logrus.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + return err + } } - return err + + return nil } // SetupHTTP adds the EDUServerInputAPI handlers to the http.ServeMux. diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index ab91cc119..be503f20a 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -19,7 +19,6 @@ import ( "encoding/json" "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -35,7 +34,6 @@ import ( type OutputSendToDeviceEventConsumer struct { sendToDeviceConsumer *internal.ContinualConsumer db storage.Database - deviceDB devices.Database serverName gomatrixserverlib.ServerName // our server name notifier *sync.Notifier } @@ -47,7 +45,6 @@ func NewOutputSendToDeviceEventConsumer( kafkaConsumer sarama.Consumer, n *sync.Notifier, store storage.Database, - deviceDB devices.Database, ) *OutputSendToDeviceEventConsumer { consumer := internal.ContinualConsumer{ @@ -59,7 +56,6 @@ func NewOutputSendToDeviceEventConsumer( s := &OutputSendToDeviceEventConsumer{ sendToDeviceConsumer: &consumer, db: store, - deviceDB: deviceDB, serverName: cfg.Matrix.ServerName, notifier: n, } @@ -82,7 +78,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } - localpart, domain, err := gomatrixserverlib.SplitID('@', output.UserID) + _, domain, err := gomatrixserverlib.SplitID('@', output.UserID) if err != nil { return err } @@ -107,22 +103,9 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } - devices := []string{} - if output.DeviceID == "*" { - devs, err := s.deviceDB.GetDevicesByLocalpart(context.TODO(), localpart) - if err != nil { - return err - } - for _, dev := range devs { - devices = append(devices, dev.ID) - } - } else { - devices = append(devices, output.DeviceID) - } - s.notifier.OnNewSendToDevice( output.UserID, - devices, + []string{output.DeviceID}, types.NewStreamToken(0, streamPos), ) diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index a7b0a1045..762f4e9d3 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -82,7 +82,7 @@ func SetupSyncAPIComponent( } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - base.Cfg, base.KafkaConsumer, notifier, syncDB, deviceDB, + base.Cfg, base.KafkaConsumer, notifier, syncDB, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer") diff --git a/sytest-whitelist b/sytest-whitelist index bfaba622a..0946758f4 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -300,3 +300,4 @@ Can send messages with a wildcard device id Can send messages with a wildcard device id to two devices Wildcard device messages wake up /sync Wildcard device messages over federation wake up /sync +Can send a to-device message to two users which both receive it using /sync