Build fixes

This commit is contained in:
Neil Alexander 2022-01-07 16:24:27 +00:00
parent c73a0d9a07
commit a96c0cfc00
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
11 changed files with 27 additions and 21 deletions

View file

@ -55,8 +55,8 @@ func NewOutputRoomEventConsumer(
return &OutputRoomEventConsumer{ return &OutputRoomEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
durable: nats.Durable(cfg.Global.JetStream.Namespaced("AppserviceRoomserverConsumer")), durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"),
topic: cfg.Global.JetStream.Namespaced(jetstream.OutputRoomEvent), topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
asDB: appserviceDB, asDB: appserviceDB,
rsAPI: rsAPI, rsAPI: rsAPI,
serverName: string(cfg.Global.ServerName), serverName: string(cfg.Global.ServerName),

View file

@ -57,10 +57,10 @@ func NewOutputEDUConsumer(
queues: queues, queues: queues,
db: store, db: store,
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("FederationAPIEDUServerConsumer")), durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"),
typingTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputTypingEvent), typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
sendToDeviceTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputSendToDeviceEvent), sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
receiptTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputReceiptEvent), receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
} }
} }

View file

@ -57,7 +57,7 @@ func NewKeyChangeConsumer(
consumer: &internal.ContinualConsumer{ consumer: &internal.ContinualConsumer{
Process: process, Process: process,
ComponentName: "federationapi/keychange", ComponentName: "federationapi/keychange",
Topic: string(cfg.Matrix.JetStream.Namespaced(jetstream.OutputKeyChangeEvent)), Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
Consumer: kafkaConsumer, Consumer: kafkaConsumer,
PartitionStore: store, PartitionStore: store,
}, },

View file

@ -59,8 +59,8 @@ func NewOutputRoomEventConsumer(
db: store, db: store,
queues: queues, queues: queues,
rsAPI: rsAPI, rsAPI: rsAPI,
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("FederationAPIRoomServerConsumer")), durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputRoomEvent), topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
} }
} }

View file

@ -67,7 +67,7 @@ func NewRoomserverAPI(
InputRoomEventTopic: inputRoomEventTopic, InputRoomEventTopic: inputRoomEventTopic,
OutputRoomEventTopic: outputRoomEventTopic, OutputRoomEventTopic: outputRoomEventTopic,
JetStream: consumer, JetStream: consumer,
Durable: cfg.Matrix.JetStream.Namespaced("RoomserverInputConsumer"), Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
ACLs: serverACLs, ACLs: serverACLs,
}, },

View file

@ -2,6 +2,8 @@ package config
import ( import (
"fmt" "fmt"
"github.com/nats-io/nats.go"
) )
type JetStream struct { type JetStream struct {
@ -19,10 +21,14 @@ type JetStream struct {
InMemory bool `yaml:"in_memory"` InMemory bool `yaml:"in_memory"`
} }
func (c *JetStream) Namespaced(name string) string { func (c *JetStream) TopicFor(name string) string {
return fmt.Sprintf("%s%s", c.TopicPrefix, name) return fmt.Sprintf("%s%s", c.TopicPrefix, name)
} }
func (c *JetStream) Durable(name string) nats.SubOpt {
return nats.Durable(name)
}
func (c *JetStream) Defaults(generate bool) { func (c *JetStream) Defaults(generate bool) {
c.Addresses = []string{} c.Addresses = []string{}
c.TopicPrefix = "Dendrite" c.TopicPrefix = "Dendrite"

View file

@ -53,8 +53,8 @@ func NewOutputClientDataConsumer(
return &OutputClientDataConsumer{ return &OutputClientDataConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputClientData), topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIClientAPIConsumer")), durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
db: store, db: store,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,

View file

@ -54,8 +54,8 @@ func NewOutputReceiptEventConsumer(
return &OutputReceiptEventConsumer{ return &OutputReceiptEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputReceiptEvent), topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerReceiptConsumer")), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
db: store, db: store,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,

View file

@ -57,8 +57,8 @@ func NewOutputSendToDeviceEventConsumer(
return &OutputSendToDeviceEventConsumer{ return &OutputSendToDeviceEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputSendToDeviceEvent), topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerSendToDeviceConsumer")), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"),
db: store, db: store,
serverName: cfg.Matrix.ServerName, serverName: cfg.Matrix.ServerName,
notifier: notifier, notifier: notifier,

View file

@ -56,8 +56,8 @@ func NewOutputTypingEventConsumer(
return &OutputTypingEventConsumer{ return &OutputTypingEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputTypingEvent), topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerTypingConsumer")), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"),
eduCache: eduCache, eduCache: eduCache,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,

View file

@ -61,8 +61,8 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(), ctx: process.Context(),
cfg: cfg, cfg: cfg,
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputRoomEvent), topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIRoomServerConsumer")), durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"),
db: store, db: store,
notifier: notifier, notifier: notifier,
pduStream: pduStream, pduStream: pduStream,