Add required changes for JetStream

This commit is contained in:
Till Faelligen 2022-01-06 10:00:21 +01:00
parent 49311d83a3
commit 4fff7bf246
6 changed files with 98 additions and 90 deletions

View file

@ -256,11 +256,9 @@ func (t *EDUServerInputAPI) InputPresence(
if err != nil {
return err
}
m := &sarama.ProducerMessage{
Topic: t.OutputPresenceTopic,
Key: sarama.StringEncoder(request.UserID),
Value: sarama.ByteEncoder(js),
}
_, _, err = t.Producer.SendMessage(m)
_, err = t.JetStream.PublishMsg(&nats.Msg{
Subject: t.OutputPresenceTopic,
Data: js,
})
return err
}

View file

@ -17,6 +17,7 @@ package consumers
import (
"context"
"encoding/json"
"time"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi/queue"
@ -40,6 +41,7 @@ type OutputEDUConsumer struct {
typingTopic string
sendToDeviceTopic string
receiptTopic string
presenceTopic string
}
// NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
@ -59,6 +61,7 @@ func NewOutputEDUConsumer(
typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
presenceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputPresenceData),
}
}
@ -73,8 +76,8 @@ func (t *OutputEDUConsumer) Start() error {
if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent); err != nil {
return err
}
if err := t.presenceConsumer.Start(); err != nil {
return fmt.Errorf("t.presenceConsumer.Start: %w", err)
if _, err := t.jetstream.Subscribe(t.presenceTopic, t.onPresenceData); err != nil {
return err
}
return nil
}
@ -256,52 +259,58 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
// onPresenceData is called in response to a message received on the presence
// data topic from the EDU server.
func (t *OutputEDUConsumer) onPresenceData(msg *sarama.ConsumerMessage) error {
// Extract the presence data from msg.
var presence api.OutputPresenceData
if err := json.Unmarshal(msg.Value, &presence); err != nil {
// Skip this msg but continue processing messages.
log.WithError(err).Errorf("eduserver output log: message parse failed (expected presence)")
return nil
}
func (t *OutputEDUConsumer) onPresenceData(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
// Extract the presence data from msg.
var presence api.OutputPresenceData
if err := json.Unmarshal(msg.Data, &presence); err != nil {
// Skip this msg but continue processing messages.
log.WithError(err).Errorf("eduserver output log: message parse failed (expected presence)")
return true
}
// only send presence events which originated from us
_, senderServerName, err := gomatrixserverlib.SplitID('@', presence.UserID)
if err != nil {
log.WithError(err).WithField("user_id", presence.UserID).Error("Failed to extract domain from presence sender")
return nil
}
if senderServerName != t.ServerName {
return nil // don't log, very spammy as it logs for each remote presence
}
// only send presence events which originated from us
_, senderServerName, err := gomatrixserverlib.SplitID('@', presence.UserID)
if err != nil {
log.WithError(err).WithField("user_id", presence.UserID).Error("Failed to extract domain from presence sender")
return true
}
if senderServerName != t.ServerName {
return true // don't log, very spammy as it logs for each remote presence
}
joined, err := t.db.GetAllJoinedHosts(context.TODO())
if err != nil {
return err
}
joined, err := t.db.GetAllJoinedHosts(context.TODO())
if err != nil {
log.WithError(err).Error("failed to get all joined hosts")
return true
}
lastActiveTS := time.Since(presence.LastActiveTS.Time())
content := api.FederationPresenceData{
Push: []api.FederationPresenceSingle{
{
CurrentlyActive: lastActiveTS < time.Minute*5,
LastActiveAgo: int(lastActiveTS.Milliseconds()),
Presence: presence.Presence.String(),
UserID: presence.UserID,
StatusMsg: presence.StatusMsg,
lastActiveTS := time.Since(presence.LastActiveTS.Time())
content := api.FederationPresenceData{
Push: []api.FederationPresenceSingle{
{
CurrentlyActive: lastActiveTS < time.Minute*5,
LastActiveAgo: int(lastActiveTS.Milliseconds()),
Presence: presence.Presence.String(),
UserID: presence.UserID,
StatusMsg: presence.StatusMsg,
},
},
},
}
}
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MPresence,
Origin: string(t.ServerName),
}
if edu.Content, err = json.Marshal(content); err != nil {
return err
}
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MPresence,
Origin: string(t.ServerName),
}
if edu.Content, err = json.Marshal(content); err != nil {
return true
}
log.Debugf("Sending edu to federated servers: %s", string(edu.Content))
return t.queues.SendEDU(edu, t.ServerName, joined)
log.Debugf("Sending edu to federated servers: %s", string(edu.Content))
if err := t.queues.SendEDU(edu, t.ServerName, joined); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}
return true
})
}

View file

@ -31,7 +31,7 @@ import (
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
userAPI "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/federationapi/routing"

View file

@ -19,6 +19,7 @@ var (
OutputTypingEvent = "OutputTypingEvent"
OutputClientData = "OutputClientData"
OutputReceiptEvent = "OutputReceiptEvent"
OutputPresenceData = "OutputPresenceData"
)
var streams = []*nats.StreamConfig{
@ -58,4 +59,9 @@ var streams = []*nats.StreamConfig{
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: OutputPresenceData,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
}

View file

@ -15,26 +15,29 @@
package consumers
import (
"context"
"encoding/json"
"github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputPresenceDataConsumer consumes events that originated in the EDU server.
type OutputPresenceDataConsumer struct {
presenceConsumer *internal.ContinualConsumer
db storage.Database
stream types.StreamProvider
notifier *notifier.Notifier
ctx context.Context
jetstream nats.JetStreamContext
topic string
db storage.Database
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputPresenceDataConsumer creates a new OutputPresenceDataConsumer.
@ -42,49 +45,41 @@ type OutputPresenceDataConsumer struct {
func NewOutputPresenceDataConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
js nats.JetStreamContext,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputPresenceDataConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/eduserver/presence",
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputPresenceData),
Consumer: kafkaConsumer,
PartitionStore: store,
return &OutputPresenceDataConsumer{
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputPresenceData),
db: store,
notifier: notifier,
stream: stream,
}
s := &OutputPresenceDataConsumer{
presenceConsumer: &consumer,
db: store,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from EDU api
func (s *OutputPresenceDataConsumer) Start() error {
return s.presenceConsumer.Start()
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
return err
}
func (s *OutputPresenceDataConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var output api.OutputPresenceData
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return nil
}
log.Debugf("presence received by sync api! %+v", output)
func (s *OutputPresenceDataConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
var output api.OutputPresenceData
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return true
}
log.Debugf("presence received by sync api! %+v", output)
s.stream.Advance(output.StreamPos)
s.notifier.OnNewPresence(types.StreamingToken{PresenceDataPosition: output.StreamPos}, output.UserID)
return nil
s.stream.Advance(output.StreamPos)
s.notifier.OnNewPresence(types.StreamingToken{PresenceDataPosition: output.StreamPos}, output.UserID)
return true
})
}

View file

@ -110,7 +110,7 @@ func AddPublicRoutes(
}
presenceConsumer := consumers.NewOutputPresenceDataConsumer(
process, cfg, consumer, syncDB, notifier, streams.PresenceDataStreamProvider,
process, cfg, js, syncDB, notifier, streams.PresenceDataStreamProvider,
)
if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start presence consumer")