Rename TopicFor to Prefixed

This commit is contained in:
Neil Alexander 2022-03-22 13:56:02 +00:00
parent c2501c6937
commit b49160f1f1
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
21 changed files with 43 additions and 48 deletions

View file

@ -56,7 +56,7 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"), durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"),
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), topic: cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
asDB: appserviceDB, asDB: appserviceDB,
rsAPI: rsAPI, rsAPI: rsAPI,
serverName: string(cfg.Global.ServerName), serverName: string(cfg.Global.ServerName),

View file

@ -55,7 +55,7 @@ func AddPublicRoutes(
syncProducer := &producers.SyncAPIProducer{ syncProducer := &producers.SyncAPIProducer{
JetStream: js, JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
} }
routing.Setup( routing.Setup(

View file

@ -48,9 +48,9 @@ func NewInternalAPI(
Cache: eduCache, Cache: eduCache,
UserAPI: userAPI, UserAPI: userAPI,
JetStream: js, JetStream: js,
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), OutputTypingEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), OutputReceiptEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
} }
} }

View file

@ -58,9 +58,9 @@ func NewOutputEDUConsumer(
db: store, db: store,
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"), durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"),
typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), typingTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), sendToDeviceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), receiptTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
} }
} }

View file

@ -55,8 +55,8 @@ func NewKeyChangeConsumer(
return &KeyChangeConsumer{ return &KeyChangeConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
durable: cfg.Matrix.JetStream.TopicFor("FederationAPIKeyChangeConsumer"), durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
queues: queues, queues: queues,
db: store, db: store,
serverName: cfg.Matrix.ServerName, serverName: cfg.Matrix.ServerName,

View file

@ -61,7 +61,7 @@ func NewOutputRoomEventConsumer(
queues: queues, queues: queues,
rsAPI: rsAPI, rsAPI: rsAPI,
durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"), durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
} }
} }

View file

@ -46,7 +46,7 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to key server database") logrus.WithError(err).Panicf("failed to connect to key server database")
} }
keyChangeProducer := &producers.KeyChange{ keyChangeProducer := &producers.KeyChange{
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent)),
JetStream: js, JetStream: js,
DB: db, DB: db,
} }

View file

@ -78,18 +78,14 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
}) })
w := v.(*worker) w := v.(*worker)
if !loaded { if !loaded {
consumer := "DendriteRoomInputConsumerPull" + jetstream.Tokenise(w.roomID) consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
subject := jetstream.InputRoomEventSubj(w.roomID) subject := jetstream.InputRoomEventSubj(w.roomID)
if info, err := w.r.JetStream.ConsumerInfo(
jetstream.InputRoomEvent,
consumer,
); err != nil || info == nil {
if _, err := w.r.JetStream.AddConsumer( if _, err := w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent), r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
&nats.ConsumerConfig{ &nats.ConsumerConfig{
Durable: consumer, Durable: consumer,
AckPolicy: nats.AckExplicitPolicy, AckPolicy: nats.AckAllPolicy,
DeliverPolicy: nats.DeliverAllPolicy, DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: subject, FilterSubject: subject,
AckWait: MaximumMissingProcessingTime + (time.Second * 10), AckWait: MaximumMissingProcessingTime + (time.Second * 10),
@ -98,7 +94,6 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
return return
} }
}
sub, err := w.r.JetStream.PullSubscribe( sub, err := w.r.JetStream.PullSubscribe(
subject, consumer, subject, consumer,

View file

@ -54,8 +54,8 @@ func NewInternalAPI(
return internal.NewRoomserverAPI( return internal.NewRoomserverAPI(
base.ProcessContext, cfg, roomserverDB, js, nc, base.ProcessContext, cfg, roomserverDB, js, nc,
cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent), cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames, base.Caches, perspectiveServerNames,
) )
} }

View file

@ -19,12 +19,12 @@ type JetStream struct {
InMemory bool `yaml:"in_memory"` InMemory bool `yaml:"in_memory"`
} }
func (c *JetStream) TopicFor(name string) string { func (c *JetStream) Prefixed(name string) string {
return fmt.Sprintf("%s%s", c.TopicPrefix, name) return fmt.Sprintf("%s%s", c.TopicPrefix, name)
} }
func (c *JetStream) Durable(name string) string { func (c *JetStream) Durable(name string) string {
return c.TopicFor(name) return c.Prefixed(name)
} }
func (c *JetStream) Defaults(generate bool) { func (c *JetStream) Defaults(generate bool) {

View file

@ -75,7 +75,7 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
} }
for _, stream := range streams { // streams are defined in streams.go for _, stream := range streams { // streams are defined in streams.go
name := cfg.TopicFor(stream.Name) name := cfg.Prefixed(stream.Name)
info, err := s.StreamInfo(name) info, err := s.StreamInfo(name)
if err != nil && err != natsclient.ErrStreamNotFound { if err != nil && err != natsclient.ErrStreamNotFound {
logrus.WithError(err).Fatal("Unable to get stream info") logrus.WithError(err).Fatal("Unable to get stream info")

View file

@ -61,7 +61,7 @@ func NewOutputClientDataConsumer(
return &OutputClientDataConsumer{ return &OutputClientDataConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
db: store, db: store,
notifier: notifier, notifier: notifier,

View file

@ -62,7 +62,7 @@ func NewOutputReceiptEventConsumer(
return &OutputReceiptEventConsumer{ return &OutputReceiptEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
db: store, db: store,
notifier: notifier, notifier: notifier,

View file

@ -57,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer(
return &OutputSendToDeviceEventConsumer{ return &OutputSendToDeviceEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"),
db: store, db: store,
serverName: cfg.Matrix.ServerName, serverName: cfg.Matrix.ServerName,

View file

@ -56,7 +56,7 @@ func NewOutputTypingEventConsumer(
return &OutputTypingEventConsumer{ return &OutputTypingEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"),
eduCache: eduCache, eduCache: eduCache,
notifier: notifier, notifier: notifier,

View file

@ -65,7 +65,7 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(), ctx: process.Context(),
cfg: cfg, cfg: cfg,
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"),
db: store, db: store,
notifier: notifier, notifier: notifier,

View file

@ -56,7 +56,7 @@ func NewOutputNotificationDataConsumer(
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
durable: cfg.Matrix.JetStream.Durable("SyncAPINotificationDataConsumer"), durable: cfg.Matrix.JetStream.Durable("SyncAPINotificationDataConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData),
db: store, db: store,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,

View file

@ -67,18 +67,18 @@ func AddPublicRoutes(
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js, JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent), Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
} }
userAPIReadUpdateProducer := &producers.UserAPIReadProducer{ userAPIReadUpdateProducer := &producers.UserAPIReadProducer{
JetStream: js, JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate), Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
} }
_ = userAPIReadUpdateProducer _ = userAPIReadUpdateProducer
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier, js, keyAPI, rsAPI, syncDB, notifier,
streams.DeviceListStreamProvider, streams.DeviceListStreamProvider,
) )

View file

@ -47,7 +47,7 @@ func NewOutputReadUpdateConsumer(
db: store, db: store,
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"), durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
pgClient: pgClient, pgClient: pgClient,
userAPI: userAPI, userAPI: userAPI,
syncProducer: syncProducer, syncProducer: syncProducer,

View file

@ -54,7 +54,7 @@ func NewOutputStreamEventConsumer(
jetstream: js, jetstream: js,
db: store, db: store,
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"), durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
pgClient: pgClient, pgClient: pgClient,
userAPI: userAPI, userAPI: userAPI,
rsAPI: rsAPI, rsAPI: rsAPI,

View file

@ -54,8 +54,8 @@ func NewInternalAPI(
// it's handled by clientapi, and hence uses its topic. When user // it's handled by clientapi, and hence uses its topic. When user
// API handles it for all account data, we can remove it from // API handles it for all account data, we can remove it from
// here. // here.
cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData), cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData),
) )
userAPI := &internal.UserInternalAPI{ userAPI := &internal.UserInternalAPI{