From 7f6764aac6e158ef8bd31f806434e962bb754aa0 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Sat, 4 Aug 2018 18:58:21 +0530 Subject: [PATCH] Add a typing server consumer to federation sender --- .../consumers/typingserver.go | 85 +++++++++++++++++++ .../federationsender/federationsender.go | 9 +- 2 files changed, 92 insertions(+), 2 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go new file mode 100644 index 000000000..20ba350b2 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go @@ -0,0 +1,85 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/federationsender/queue" + "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/typingserver/dummy/api" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" + "gopkg.in/Shopify/sarama.v1" +) + +// OutputTypingEventConsumer consumes events that originate in typing server. +type OutputTypingEventConsumer struct { + consumer *common.ContinualConsumer + db *storage.Database + queues *queue.OutgoingQueues + ServerName gomatrixserverlib.ServerName +} + +// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers. +func NewOutputTypingEventConsumer( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + queues *queue.OutgoingQueues, + store *storage.Database, +) *OutputTypingEventConsumer { + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputTypingEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + c := &OutputTypingEventConsumer{ + consumer: &consumer, + queues: queues, + db: store, + ServerName: cfg.Matrix.ServerName, + } + consumer.ProcessMessage = c.onMessage + + return c +} + +// Start consuming from typing servers +func (t *OutputTypingEventConsumer) Start() error { + return t.consumer.Start() +} + +func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + // Extract the typing event from msg. + var ote api.OutputTypingEvent + if err := json.Unmarshal(msg.Value, &ote); err != nil { + // Skip this msg but continue processing messages. + log.WithError(err).Errorf("typingserver output log: message parse failed") + return nil + } + + joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID()) + if err != nil { + return err + } + + names := make([]gomatrixserverlib.ServerName, len(joined)) + for i := range joined { + names[i] = joined[i].ServerName + } + + return t.queues.SendEvent(&ote.Event, t.ServerName, names) +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/federationsender.go b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go index fa54a05c6..ed3a1a1f8 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/federationsender.go +++ b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go @@ -38,11 +38,16 @@ func SetupFederationSenderComponent( queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation) - consumer := consumers.NewOutputRoomEventConsumer( + rsConsumer := consumers.NewOutputRoomEventConsumer( base.Cfg, base.KafkaConsumer, queues, federationSenderDB, queryAPI, ) - if err = consumer.Start(); err != nil { + if err = rsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start room server consumer") } + + tsConsumer := consumers.NewOutputTypingEventConsumer(cfg, kafkaConsumer, queues, store) + if err := tsConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start typing server consumer") + } }