From 062325b6c6650284ffd17d4e16d237ab9ba3b27f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 27 May 2020 14:37:13 +0100 Subject: [PATCH] Send to device consumer in sync API --- syncapi/consumers/eduserver_sendtodevice.go | 82 +++++++++++++++++++ .../{eduserver.go => eduserver_typing.go} | 0 syncapi/syncapi.go | 9 +- 3 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 syncapi/consumers/eduserver_sendtodevice.go rename syncapi/consumers/{eduserver.go => eduserver_typing.go} (100%) diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go new file mode 100644 index 000000000..0382c6083 --- /dev/null +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -0,0 +1,82 @@ +// Copyright 2019 Alex Chen +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "encoding/json" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/sync" + log "github.com/sirupsen/logrus" +) + +// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server. +type OutputSendToDeviceEventConsumer struct { + sendToDeviceConsumer *internal.ContinualConsumer + db storage.Database + notifier *sync.Notifier +} + +// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer. +// Call Start() to begin consuming from the EDU server. +func NewOutputSendToDeviceEventConsumer( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + n *sync.Notifier, + store storage.Database, +) *OutputSendToDeviceEventConsumer { + + consumer := internal.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputSendToDeviceEventTopic), + Consumer: kafkaConsumer, + PartitionStore: store, + } + + s := &OutputSendToDeviceEventConsumer{ + sendToDeviceConsumer: &consumer, + db: store, + notifier: n, + } + + consumer.ProcessMessage = s.onMessage + + return s +} + +// Start consuming from EDU api +func (s *OutputSendToDeviceEventConsumer) Start() error { + return s.sendToDeviceConsumer.Start() +} + +func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + var output api.OutputSendToDeviceEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("EDU server output log: message parse failure") + return nil + } + + log.WithFields(log.Fields{ + "user_id": output.UserID, + "device_id": output.DeviceID, + "event_type": output.EventType, + }).Debug("received send-to-device event from EDU server") + + return nil +} diff --git a/syncapi/consumers/eduserver.go b/syncapi/consumers/eduserver_typing.go similarity index 100% rename from syncapi/consumers/eduserver.go rename to syncapi/consumers/eduserver_typing.go diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 9251f6182..1dcf3118f 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -78,7 +78,14 @@ func SetupSyncAPIComponent( base.Cfg, base.KafkaConsumer, notifier, syncDB, ) if err = typingConsumer.Start(); err != nil { - logrus.WithError(err).Panicf("failed to start typing server consumer") + logrus.WithError(err).Panicf("failed to start typing consumer") + } + + sendToDeviceConsumer := consumers.NewOutputClientDataConsumer( + base.Cfg, base.KafkaConsumer, notifier, syncDB, + ) + if err = sendToDeviceConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start send-to-device consumer") } routing.Setup(base.PublicAPIMux, requestPool, syncDB, deviceDB, federation, rsAPI, cfg)