Per-room consumers (#2293)

* Roomserver input refactoring — again!

* Ensure the actor runs again

* Preserve consumer after unsubscribe

* Another sprinkling of magic

* Rename `TopicFor` to `Prefixed`

* Recreate the stream if the config is bad

* Check streams too

* Prefix subjects, preserve inboxes

* Recreate if subjects wrong

* Remove stream subject

* Reconstruct properly

* Fix mutex unlock

* Comments

* Fix tests

* Don't drop events

* Review comments

* Separate `queueInputRoomEvents` function

* Re-jig control flow a bit
This commit is contained in:
Neil Alexander 2022-03-23 10:20:18 +00:00 committed by GitHub
parent 9572f5ed19
commit 98a5e410d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 345 additions and 152 deletions

View file

@ -56,7 +56,7 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"), durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"),
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), topic: cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
asDB: appserviceDB, asDB: appserviceDB,
rsAPI: rsAPI, rsAPI: rsAPI,
serverName: string(cfg.Global.ServerName), serverName: string(cfg.Global.ServerName),

View file

@ -55,7 +55,7 @@ func AddPublicRoutes(
syncProducer := &producers.SyncAPIProducer{ syncProducer := &producers.SyncAPIProducer{
JetStream: js, JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
} }
routing.Setup( routing.Setup(

View file

@ -48,9 +48,9 @@ func NewInternalAPI(
Cache: eduCache, Cache: eduCache,
UserAPI: userAPI, UserAPI: userAPI,
JetStream: js, JetStream: js,
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), OutputTypingEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), OutputReceiptEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
} }
} }

View file

@ -58,9 +58,9 @@ func NewOutputEDUConsumer(
db: store, db: store,
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"), durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"),
typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), typingTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), sendToDeviceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), receiptTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
} }
} }

View file

@ -55,8 +55,8 @@ func NewKeyChangeConsumer(
return &KeyChangeConsumer{ return &KeyChangeConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
durable: cfg.Matrix.JetStream.TopicFor("FederationAPIKeyChangeConsumer"), durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
queues: queues, queues: queues,
db: store, db: store,
serverName: cfg.Matrix.ServerName, serverName: cfg.Matrix.ServerName,

View file

@ -61,7 +61,7 @@ func NewOutputRoomEventConsumer(
queues: queues, queues: queues,
rsAPI: rsAPI, rsAPI: rsAPI,
durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"), durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
} }
} }

View file

@ -46,7 +46,7 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to key server database") logrus.WithError(err).Panicf("failed to connect to key server database")
} }
keyChangeProducer := &producers.KeyChange{ keyChangeProducer := &producers.KeyChange{
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent)),
JetStream: js, JetStream: js,
DB: db, DB: db,
} }

View file

@ -90,6 +90,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
r.KeyRing = keyRing r.KeyRing = keyRing
r.Inputer = &input.Inputer{ r.Inputer = &input.Inputer{
Cfg: r.Cfg,
ProcessContext: r.ProcessContext, ProcessContext: r.ProcessContext,
DB: r.DB, DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic, InputRoomEventTopic: r.InputRoomEventTopic,

View file

@ -19,6 +19,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"sync" "sync"
"time" "time"
@ -29,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -45,7 +47,35 @@ var keyContentFields = map[string]string{
"m.room.member": "membership", "m.room.member": "membership",
} }
// Inputer is responsible for consuming from the roomserver input
// streams and processing the events. All input events are queued
// into a single NATS stream and the order is preserved strictly.
// The `room_id` message header will contain the room ID which will
// be used to assign the pending event to a per-room worker.
//
// The input API maintains an ephemeral headers-only consumer. It
// will speed through the stream working out which room IDs are
// pending and create durable consumers for them. The durable
// consumer will then be used for each room worker goroutine to
// fetch events one by one and process them. Each room having a
// durable consumer of its own means there is no head-of-line
// blocking between rooms. Filtering ensures that each durable
// consumer only receives events for the room it is interested in.
//
// The ephemeral consumer closely tracks the newest events. The
// per-room durable consumers will only progress through the stream
// as events are processed.
//
// A BC * -> positions of each consumer (* = ephemeral)
// ⌄ ⌄⌄ ⌄
// ABAABCAABCAA -> newest (letter = subject for each message)
//
// In this example, A is still processing an event but has two
// pending events to process afterwards. Both B and C are caught
// up, so they will do nothing until a new event comes in for B
// or C.
type Inputer struct { type Inputer struct {
Cfg *config.RoomServer
ProcessContext *process.ProcessContext ProcessContext *process.ProcessContext
DB storage.Database DB storage.Database
NATSClient *nats.Conn NATSClient *nats.Conn
@ -57,147 +87,275 @@ type Inputer struct {
ACLs *acls.ServerACLs ACLs *acls.ServerACLs
InputRoomEventTopic string InputRoomEventTopic string
OutputRoomEventTopic string OutputRoomEventTopic string
workers sync.Map // room ID -> *phony.Inbox workers sync.Map // room ID -> *worker
Queryer *query.Queryer Queryer *query.Queryer
} }
func (r *Inputer) workerForRoom(roomID string) *phony.Inbox { type worker struct {
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) phony.Inbox
return inbox.(*phony.Inbox) sync.Mutex
r *Inputer
roomID string
subscription *nats.Subscription
} }
// eventsInProgress is an in-memory map to keep a track of which events we have func (r *Inputer) startWorkerForRoom(roomID string) {
// queued up for processing. If we get a redelivery from NATS and we still have v, loaded := r.workers.LoadOrStore(roomID, &worker{
// the queued up item then we won't do anything with the redelivered message. If r: r,
// we've restarted Dendrite and now this map is empty then it means that we will roomID: roomID,
// reload pending work from NATS. })
var eventsInProgress sync.Map w := v.(*worker)
w.Lock()
defer w.Unlock()
if !loaded || w.subscription == nil {
consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
// onMessage is called when a new event arrives in the roomserver input stream. // Create the consumer. We do this as a specific step rather than
// letting PullSubscribe create it for us because we need the consumer
// to outlive the subscription. If we do it this way, we can Bind in the
// next step, and when we Unsubscribe, the consumer continues to live. If
// we leave PullSubscribe to create the durable consumer, Unsubscribe will
// delete it because it thinks it "owns" it, which in turn breaks the
// interest-based retention storage policy.
// If the durable consumer already exists, this is effectively a no-op.
// Another interesting tid-bit here: the ACK policy is set to "all" so that
// if we acknowledge a message, we also acknowledge everything that comes
// before it. This is necessary because otherwise our consumer will never
// acknowledge things we filtered out for other subjects and therefore they
// will linger around forever.
if _, err := w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
&nats.ConsumerConfig{
Durable: consumer,
AckPolicy: nats.AckAllPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: subject,
AckWait: MaximumMissingProcessingTime + (time.Second * 10),
},
); err != nil {
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
return
}
// Bind to our durable consumer. We want to receive all messages waiting
// for this subject and we want to manually acknowledge them, so that we
// can ensure they are only cleaned up when we are done processing them.
sub, err := w.r.JetStream.PullSubscribe(
subject, consumer,
nats.ManualAck(),
nats.DeliverAll(),
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
nats.Bind(r.InputRoomEventTopic, consumer),
)
if err != nil {
logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
return
}
// Go and start pulling messages off the queue.
w.subscription = sub
w.Act(nil, w._next)
}
}
// Start creates an ephemeral non-durable consumer on the roomserver
// input topic. It is configured to deliver us headers only because we
// don't actually care about the contents of the message at this point,
// we only care about the `room_id` field. Once a message arrives, we
// will look to see if we have a worker for that room which has its
// own consumer. If we don't, we'll start one.
func (r *Inputer) Start() error { func (r *Inputer) Start() error {
_, err := r.JetStream.Subscribe( _, err := r.JetStream.Subscribe(
r.InputRoomEventTopic, "", // This is blank because we specified it in BindStream.
// We specifically don't use jetstream.WithJetStreamMessage here because we func(m *nats.Msg) {
// queue the task off to a room-specific queue and the ACK needs to be sent roomID := m.Header.Get(jetstream.RoomID)
// later, possibly with an error response to the inputter if synchronous. r.startWorkerForRoom(roomID)
func(msg *nats.Msg) { _ = m.Ack()
roomID := msg.Header.Get("room_id")
var inputRoomEvent api.InputRoomEvent
if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
_ = msg.Term()
return
}
_ = msg.InProgress()
index := roomID + "\000" + inputRoomEvent.Event.EventID()
if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok {
// We're already waiting to deal with this event, so there's no
// point in queuing it up again. We've notified NATS that we're
// working on the message still, so that will have deferred the
// redelivery by a bit.
return
}
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
r.workerForRoom(roomID).Act(nil, func() {
_ = msg.InProgress() // resets the acknowledgement wait timer
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
var errString string
if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
}
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": inputRoomEvent.Event.EventID(),
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process async event")
_ = msg.Term()
errString = err.Error()
} else {
_ = msg.Ack()
}
if replyTo := msg.Header.Get("sync"); replyTo != "" {
if err := r.NATSClient.Publish(replyTo, []byte(errString)); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": inputRoomEvent.Event.EventID(),
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to respond for sync event")
}
}
})
}, },
// NATS wants to acknowledge automatically by default when the message is nats.HeadersOnly(),
// read from the stream, but we want to override that behaviour by making
// sure that we only acknowledge when we're happy we've done everything we
// can. This ensures we retry things when it makes sense to do so.
nats.ManualAck(),
// Use a durable named consumer.
r.Durable,
// If we've missed things in the stream, e.g. we restarted, then replay
// all of the queued messages that were waiting for us.
nats.DeliverAll(), nats.DeliverAll(),
// Ensure that NATS doesn't try to resend us something that wasn't done nats.AckAll(),
// within the period of time that we might still be processing it. nats.BindStream(r.InputRoomEventTopic),
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
// It is recommended to disable this for pull consumers as per the docs:
// https://docs.nats.io/nats-concepts/jetstream/consumers#note-about-push-and-pull-consumers
nats.MaxAckPending(-1),
) )
return err return err
} }
// _next is called by the worker for the room. It must only be called
// by the actor embedded into the worker.
func (w *worker) _next() {
// Look up what the next event is that's waiting to be processed.
ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Minute)
defer cancel()
msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
switch err {
case nil:
// Make sure that once we're done here, we queue up another call
// to _next in the inbox.
defer w.Act(nil, w._next)
// If no error was reported, but we didn't get exactly one message,
// then skip over this and try again on the next iteration.
if len(msgs) != 1 {
return
}
case context.DeadlineExceeded:
// The context exceeded, so we've been waiting for more than a
// minute for activity in this room. At this point we will shut
// down the subscriber to free up resources. It'll get started
// again if new activity happens.
if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
}
w.Lock()
w.subscription = nil
w.Unlock()
return
default:
// Something went wrong while trying to fetch the next event
// from the queue. In which case, we'll shut down the subscriber
// and wait to be notified about new room activity again. Maybe
// the problem will be corrected by then.
logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID)
if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
}
w.Lock()
w.subscription = nil
w.Unlock()
return
}
// Try to unmarshal the input room event. If the JSON unmarshalling
// fails then we'll terminate the message — this notifies NATS that
// we are done with the message and never want to see it again.
msg := msgs[0]
var inputRoomEvent api.InputRoomEvent
if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
_ = msg.Term()
return
}
roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc()
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
// Process the room event. If something goes wrong then we'll tell
// NATS to terminate the message. We'll store the error result as
// a string, because we might want to return that to the caller if
// it was a synchronous request.
var errString string
if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
}
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": w.roomID,
"event_id": inputRoomEvent.Event.EventID(),
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process async event")
_ = msg.Term()
errString = err.Error()
} else {
_ = msg.Ack()
}
// If it was a synchronous input request then the "sync" field
// will be present in the message. That means that someone is
// waiting for a response. The temporary inbox name is present in
// that field, so send back the error string (if any). If there
// was no error then we'll return a blank message, which means
// that everything was OK.
if replyTo := msg.Header.Get("sync"); replyTo != "" {
if err = w.r.NATSClient.Publish(replyTo, []byte(errString)); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": w.roomID,
"event_id": inputRoomEvent.Event.EventID(),
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to respond for sync event")
}
}
}
// queueInputRoomEvents queues events into the roomserver input
// stream in NATS.
func (r *Inputer) queueInputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest,
) (replySub *nats.Subscription, err error) {
// If the request is synchronous then we need to create a
// temporary inbox to wait for responses on, and then create
// a subscription to it. If it's asynchronous then we won't
// bother, so these values will remain empty.
var replyTo string
if !request.Asynchronous {
replyTo = nats.NewInbox()
replySub, err = r.NATSClient.SubscribeSync(replyTo)
if err != nil {
return nil, fmt.Errorf("r.NATSClient.SubscribeSync: %w", err)
}
if replySub == nil {
// This shouldn't ever happen, but it doesn't hurt to check
// because we can potentially avoid a nil pointer panic later
// if it did for some reason.
return nil, fmt.Errorf("expected a subscription to the temporary inbox")
}
}
// For each event, marshal the input room event and then
// send it into the input queue.
for _, e := range request.InputRoomEvents {
roomID := e.Event.RoomID()
subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID))
msg := &nats.Msg{
Subject: subj,
Header: nats.Header{},
}
msg.Header.Set("room_id", roomID)
if replyTo != "" {
msg.Header.Set("sync", replyTo)
}
msg.Data, err = json.Marshal(e)
if err != nil {
return nil, fmt.Errorf("json.Marshal: %w", err)
}
if _, err = r.JetStream.PublishMsg(msg, nats.Context(ctx)); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": e.Event.EventID(),
"subj": subj,
}).Error("Roomserver failed to queue async event")
return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
}
}
return
}
// InputRoomEvents implements api.RoomserverInternalAPI // InputRoomEvents implements api.RoomserverInternalAPI
func (r *Inputer) InputRoomEvents( func (r *Inputer) InputRoomEvents(
ctx context.Context, ctx context.Context,
request *api.InputRoomEventsRequest, request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse, response *api.InputRoomEventsResponse,
) { ) {
var replyTo string // Queue up the event into the roomserver.
var replySub *nats.Subscription replySub, err := r.queueInputRoomEvents(ctx, request)
if !request.Asynchronous { if err != nil {
var err error response.ErrMsg = err.Error()
replyTo = nats.NewInbox()
replySub, err = r.NATSClient.SubscribeSync(replyTo)
if err != nil {
response.ErrMsg = err.Error()
return
}
}
var err error
for _, e := range request.InputRoomEvents {
msg := &nats.Msg{
Subject: r.InputRoomEventTopic,
Header: nats.Header{},
Reply: replyTo,
}
roomID := e.Event.RoomID()
msg.Header.Set("room_id", roomID)
if replyTo != "" {
msg.Header.Set("sync", replyTo)
}
msg.Data, err = json.Marshal(e)
if err != nil {
response.ErrMsg = err.Error()
return
}
if _, err = r.JetStream.PublishMsg(msg); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": e.Event.EventID(),
}).Error("Roomserver failed to queue async event")
return
}
}
if request.Asynchronous || replySub == nil {
return return
} }
// If we aren't waiting for synchronous responses then we can
// give up here, there is nothing further to do.
if replySub == nil {
return
}
// Otherwise, we'll want to sit and wait for the responses
// from the roomserver. There will be one response for every
// input we submitted. The last error value we receive will
// be the one returned as the error string.
defer replySub.Drain() // nolint:errcheck defer replySub.Drain() // nolint:errcheck
for i := 0; i < len(request.InputRoomEvents); i++ { for i := 0; i < len(request.InputRoomEvents); i++ {
msg, err := replySub.NextMsgWithContext(ctx) msg, err := replySub.NextMsgWithContext(ctx)
@ -207,7 +365,6 @@ func (r *Inputer) InputRoomEvents(
} }
if len(msg.Data) > 0 { if len(msg.Data) > 0 {
response.ErrMsg = string(msg.Data) response.ErrMsg = string(msg.Data)
return
} }
} }
} }

View file

@ -54,8 +54,8 @@ func NewInternalAPI(
return internal.NewRoomserverAPI( return internal.NewRoomserverAPI(
base.ProcessContext, cfg, roomserverDB, js, nc, base.ProcessContext, cfg, roomserverDB, js, nc,
cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent), cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames, base.Caches, perspectiveServerNames,
) )
} }

View file

@ -19,12 +19,12 @@ type JetStream struct {
InMemory bool `yaml:"in_memory"` InMemory bool `yaml:"in_memory"`
} }
func (c *JetStream) TopicFor(name string) string { func (c *JetStream) Prefixed(name string) string {
return fmt.Sprintf("%s%s", c.TopicPrefix, name) return fmt.Sprintf("%s%s", c.TopicPrefix, name)
} }
func (c *JetStream) Durable(name string) string { func (c *JetStream) Durable(name string) string {
return c.TopicFor(name) return c.Prefixed(name)
} }
func (c *JetStream) Defaults(generate bool) { func (c *JetStream) Defaults(generate bool) {

View file

@ -1,6 +1,7 @@
package jetstream package jetstream
import ( import (
"reflect"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -75,14 +76,35 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
} }
for _, stream := range streams { // streams are defined in streams.go for _, stream := range streams { // streams are defined in streams.go
name := cfg.TopicFor(stream.Name) name := cfg.Prefixed(stream.Name)
info, err := s.StreamInfo(name) info, err := s.StreamInfo(name)
if err != nil && err != natsclient.ErrStreamNotFound { if err != nil && err != natsclient.ErrStreamNotFound {
logrus.WithError(err).Fatal("Unable to get stream info") logrus.WithError(err).Fatal("Unable to get stream info")
} }
subjects := stream.Subjects
if len(subjects) == 0 {
// By default we want each stream to listen for the subjects
// that are either an exact match for the stream name, or where
// the first part of the subject is the stream name. ">" is a
// wildcard in NATS for one or more subject tokens. In the case
// that the stream is called "Foo", this will match any message
// with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc.
subjects = []string{name, name + ".>"}
}
if info != nil {
switch {
case !reflect.DeepEqual(info.Config.Subjects, subjects):
fallthrough
case info.Config.Retention != stream.Retention:
fallthrough
case info.Config.Storage != stream.Storage:
if err = s.DeleteStream(name); err != nil {
logrus.WithError(err).Fatal("Unable to delete stream")
}
info = nil
}
}
if info == nil { if info == nil {
stream.Subjects = []string{name}
// If we're trying to keep everything in memory (e.g. unit tests) // If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy. // then overwrite the storage policy.
if cfg.InMemory { if cfg.InMemory {
@ -93,8 +115,9 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
// array, otherwise we end up with namespaces on namespaces. // array, otherwise we end up with namespaces on namespaces.
namespaced := *stream namespaced := *stream
namespaced.Name = name namespaced.Name = name
namespaced.Subjects = subjects
if _, err = s.AddStream(&namespaced); err != nil { if _, err = s.AddStream(&namespaced); err != nil {
logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream") logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream")
} }
} }
} }

View file

@ -1,6 +1,8 @@
package jetstream package jetstream
import ( import (
"fmt"
"regexp"
"time" "time"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
@ -24,10 +26,20 @@ var (
OutputReadUpdate = "OutputReadUpdate" OutputReadUpdate = "OutputReadUpdate"
) )
var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+")
func Tokenise(str string) string {
return safeCharacters.ReplaceAllString(str, "_")
}
func InputRoomEventSubj(roomID string) string {
return fmt.Sprintf("%s.%s", InputRoomEvent, Tokenise(roomID))
}
var streams = []*nats.StreamConfig{ var streams = []*nats.StreamConfig{
{ {
Name: InputRoomEvent, Name: InputRoomEvent,
Retention: nats.WorkQueuePolicy, Retention: nats.InterestPolicy,
Storage: nats.FileStorage, Storage: nats.FileStorage,
}, },
{ {

View file

@ -61,7 +61,7 @@ func NewOutputClientDataConsumer(
return &OutputClientDataConsumer{ return &OutputClientDataConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
db: store, db: store,
notifier: notifier, notifier: notifier,

View file

@ -62,7 +62,7 @@ func NewOutputReceiptEventConsumer(
return &OutputReceiptEventConsumer{ return &OutputReceiptEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
db: store, db: store,
notifier: notifier, notifier: notifier,

View file

@ -57,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer(
return &OutputSendToDeviceEventConsumer{ return &OutputSendToDeviceEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"),
db: store, db: store,
serverName: cfg.Matrix.ServerName, serverName: cfg.Matrix.ServerName,

View file

@ -56,7 +56,7 @@ func NewOutputTypingEventConsumer(
return &OutputTypingEventConsumer{ return &OutputTypingEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"),
eduCache: eduCache, eduCache: eduCache,
notifier: notifier, notifier: notifier,

View file

@ -65,7 +65,7 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(), ctx: process.Context(),
cfg: cfg, cfg: cfg,
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"),
db: store, db: store,
notifier: notifier, notifier: notifier,

View file

@ -56,7 +56,7 @@ func NewOutputNotificationDataConsumer(
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
durable: cfg.Matrix.JetStream.Durable("SyncAPINotificationDataConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPINotificationDataConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData),
db: store, db: store,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,

View file

@ -67,18 +67,18 @@ func AddPublicRoutes(
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js, JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent), Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
} }
userAPIReadUpdateProducer := &producers.UserAPIReadProducer{ userAPIReadUpdateProducer := &producers.UserAPIReadProducer{
JetStream: js, JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate), Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
} }
_ = userAPIReadUpdateProducer _ = userAPIReadUpdateProducer
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier, js, keyAPI, rsAPI, syncDB, notifier,
streams.DeviceListStreamProvider, streams.DeviceListStreamProvider,
) )

View file

@ -47,7 +47,7 @@ func NewOutputReadUpdateConsumer(
db: store, db: store,
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"), durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
pgClient: pgClient, pgClient: pgClient,
userAPI: userAPI, userAPI: userAPI,
syncProducer: syncProducer, syncProducer: syncProducer,

View file

@ -54,7 +54,7 @@ func NewOutputStreamEventConsumer(
jetstream: js, jetstream: js,
db: store, db: store,
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"), durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
pgClient: pgClient, pgClient: pgClient,
userAPI: userAPI, userAPI: userAPI,
rsAPI: rsAPI, rsAPI: rsAPI,

View file

@ -54,8 +54,8 @@ func NewInternalAPI(
// it's handled by clientapi, and hence uses its topic. When user // it's handled by clientapi, and hence uses its topic. When user
// API handles it for all account data, we can remove it from // API handles it for all account data, we can remove it from
// here. // here.
cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData), cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData),
) )
userAPI := &internal.UserInternalAPI{ userAPI := &internal.UserInternalAPI{