From 4fff7bf2463cf95ad846923fd071138a5d3be24e Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Thu, 6 Jan 2022 10:00:21 +0100 Subject: [PATCH] Add required changes for JetStream --- eduserver/input/input.go | 10 +-- federationapi/consumers/eduserver.go | 97 ++++++++++++++----------- federationapi/federationapi.go | 2 +- setup/jetstream/streams.go | 6 ++ syncapi/consumers/eduserver_presence.go | 71 +++++++++--------- syncapi/syncapi.go | 2 +- 6 files changed, 98 insertions(+), 90 deletions(-) diff --git a/eduserver/input/input.go b/eduserver/input/input.go index 63788f117..fd5184eb2 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -256,11 +256,9 @@ func (t *EDUServerInputAPI) InputPresence( if err != nil { return err } - m := &sarama.ProducerMessage{ - Topic: t.OutputPresenceTopic, - Key: sarama.StringEncoder(request.UserID), - Value: sarama.ByteEncoder(js), - } - _, _, err = t.Producer.SendMessage(m) + _, err = t.JetStream.PublishMsg(&nats.Msg{ + Subject: t.OutputPresenceTopic, + Data: js, + }) return err } diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go index 9f14ca88b..e600e9a5e 100644 --- a/federationapi/consumers/eduserver.go +++ b/federationapi/consumers/eduserver.go @@ -17,6 +17,7 @@ package consumers import ( "context" "encoding/json" + "time" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationapi/queue" @@ -40,6 +41,7 @@ type OutputEDUConsumer struct { typingTopic string sendToDeviceTopic string receiptTopic string + presenceTopic string } // NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers. @@ -59,6 +61,7 @@ func NewOutputEDUConsumer( typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + presenceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputPresenceData), } } @@ -73,8 +76,8 @@ func (t *OutputEDUConsumer) Start() error { if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent); err != nil { return err } - if err := t.presenceConsumer.Start(); err != nil { - return fmt.Errorf("t.presenceConsumer.Start: %w", err) + if _, err := t.jetstream.Subscribe(t.presenceTopic, t.onPresenceData); err != nil { + return err } return nil } @@ -256,52 +259,58 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) { // onPresenceData is called in response to a message received on the presence // data topic from the EDU server. -func (t *OutputEDUConsumer) onPresenceData(msg *sarama.ConsumerMessage) error { - // Extract the presence data from msg. - var presence api.OutputPresenceData - if err := json.Unmarshal(msg.Value, &presence); err != nil { - // Skip this msg but continue processing messages. - log.WithError(err).Errorf("eduserver output log: message parse failed (expected presence)") - return nil - } +func (t *OutputEDUConsumer) onPresenceData(msg *nats.Msg) { + jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool { + // Extract the presence data from msg. + var presence api.OutputPresenceData + if err := json.Unmarshal(msg.Data, &presence); err != nil { + // Skip this msg but continue processing messages. + log.WithError(err).Errorf("eduserver output log: message parse failed (expected presence)") + return true + } - // only send presence events which originated from us - _, senderServerName, err := gomatrixserverlib.SplitID('@', presence.UserID) - if err != nil { - log.WithError(err).WithField("user_id", presence.UserID).Error("Failed to extract domain from presence sender") - return nil - } - if senderServerName != t.ServerName { - return nil // don't log, very spammy as it logs for each remote presence - } + // only send presence events which originated from us + _, senderServerName, err := gomatrixserverlib.SplitID('@', presence.UserID) + if err != nil { + log.WithError(err).WithField("user_id", presence.UserID).Error("Failed to extract domain from presence sender") + return true + } + if senderServerName != t.ServerName { + return true // don't log, very spammy as it logs for each remote presence + } - joined, err := t.db.GetAllJoinedHosts(context.TODO()) - if err != nil { - return err - } + joined, err := t.db.GetAllJoinedHosts(context.TODO()) + if err != nil { + log.WithError(err).Error("failed to get all joined hosts") + return true + } - lastActiveTS := time.Since(presence.LastActiveTS.Time()) - content := api.FederationPresenceData{ - Push: []api.FederationPresenceSingle{ - { - CurrentlyActive: lastActiveTS < time.Minute*5, - LastActiveAgo: int(lastActiveTS.Milliseconds()), - Presence: presence.Presence.String(), - UserID: presence.UserID, - StatusMsg: presence.StatusMsg, + lastActiveTS := time.Since(presence.LastActiveTS.Time()) + content := api.FederationPresenceData{ + Push: []api.FederationPresenceSingle{ + { + CurrentlyActive: lastActiveTS < time.Minute*5, + LastActiveAgo: int(lastActiveTS.Milliseconds()), + Presence: presence.Presence.String(), + UserID: presence.UserID, + StatusMsg: presence.StatusMsg, + }, }, - }, - } + } - edu := &gomatrixserverlib.EDU{ - Type: gomatrixserverlib.MPresence, - Origin: string(t.ServerName), - } - if edu.Content, err = json.Marshal(content); err != nil { - return err - } + edu := &gomatrixserverlib.EDU{ + Type: gomatrixserverlib.MPresence, + Origin: string(t.ServerName), + } + if edu.Content, err = json.Marshal(content); err != nil { + return true + } - log.Debugf("Sending edu to federated servers: %s", string(edu.Content)) - - return t.queues.SendEDU(edu, t.ServerName, joined) + log.Debugf("Sending edu to federated servers: %s", string(edu.Content)) + if err := t.queues.SendEDU(edu, t.ServerName, joined); err != nil { + log.WithError(err).Error("failed to send EDU") + return false + } + return true + }) } diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 65eef5b36..a26ff5879 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -31,7 +31,7 @@ import ( "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" - userapi "github.com/matrix-org/dendrite/userapi/api" + userAPI "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/federationapi/routing" diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 0fd31082c..ab22e768a 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -19,6 +19,7 @@ var ( OutputTypingEvent = "OutputTypingEvent" OutputClientData = "OutputClientData" OutputReceiptEvent = "OutputReceiptEvent" + OutputPresenceData = "OutputPresenceData" ) var streams = []*nats.StreamConfig{ @@ -58,4 +59,9 @@ var streams = []*nats.StreamConfig{ Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, + { + Name: OutputPresenceData, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, } diff --git a/syncapi/consumers/eduserver_presence.go b/syncapi/consumers/eduserver_presence.go index c2305a90b..7f60d2def 100644 --- a/syncapi/consumers/eduserver_presence.go +++ b/syncapi/consumers/eduserver_presence.go @@ -15,26 +15,29 @@ package consumers import ( + "context" "encoding/json" - "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) // OutputPresenceDataConsumer consumes events that originated in the EDU server. type OutputPresenceDataConsumer struct { - presenceConsumer *internal.ContinualConsumer - db storage.Database - stream types.StreamProvider - notifier *notifier.Notifier + ctx context.Context + jetstream nats.JetStreamContext + topic string + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier } // NewOutputPresenceDataConsumer creates a new OutputPresenceDataConsumer. @@ -42,49 +45,41 @@ type OutputPresenceDataConsumer struct { func NewOutputPresenceDataConsumer( process *process.ProcessContext, cfg *config.SyncAPI, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, ) *OutputPresenceDataConsumer { - - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "syncapi/eduserver/presence", - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputPresenceData), - Consumer: kafkaConsumer, - PartitionStore: store, + return &OutputPresenceDataConsumer{ + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputPresenceData), + db: store, + notifier: notifier, + stream: stream, } - s := &OutputPresenceDataConsumer{ - presenceConsumer: &consumer, - db: store, - notifier: notifier, - stream: stream, - } - - consumer.ProcessMessage = s.onMessage - - return s } // Start consuming from EDU api func (s *OutputPresenceDataConsumer) Start() error { - return s.presenceConsumer.Start() + _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + return err } -func (s *OutputPresenceDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { - var output api.OutputPresenceData - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) - return nil - } - log.Debugf("presence received by sync api! %+v", output) +func (s *OutputPresenceDataConsumer) onMessage(msg *nats.Msg) { + jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool { + var output api.OutputPresenceData + if err := json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("EDU server output log: message parse failure") + sentry.CaptureException(err) + return true + } + log.Debugf("presence received by sync api! %+v", output) - s.stream.Advance(output.StreamPos) - s.notifier.OnNewPresence(types.StreamingToken{PresenceDataPosition: output.StreamPos}, output.UserID) - - return nil + s.stream.Advance(output.StreamPos) + s.notifier.OnNewPresence(types.StreamingToken{PresenceDataPosition: output.StreamPos}, output.UserID) + return true + }) } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 96eccba27..317733130 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -110,7 +110,7 @@ func AddPublicRoutes( } presenceConsumer := consumers.NewOutputPresenceDataConsumer( - process, cfg, consumer, syncDB, notifier, streams.PresenceDataStreamProvider, + process, cfg, js, syncDB, notifier, streams.PresenceDataStreamProvider, ) if err = presenceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start presence consumer")