diff --git a/appservice/appservice.go b/appservice/appservice.go index 8fe1b2fc4..9cbd47459 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -98,7 +98,7 @@ func NewInternalAPI( // We can't add ASes at runtime so this is safe to do. if len(workerStates) > 0 { consumer := consumers.NewOutputRoomEventConsumer( - base.ProcessContext, base.Cfg, js, appserviceDB, + base.ProcessContext, &base.Cfg.AppServiceAPI, js, appserviceDB, rsAPI, workerStates, ) if err := consumer.Start(); err != nil { diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index d567408be..e47d6839f 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -17,6 +17,7 @@ package consumers import ( "context" "encoding/json" + "fmt" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" @@ -34,8 +35,8 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { ctx context.Context + cfg *config.AppServiceAPI jetstream nats.JetStreamContext - durable string topic string asDB storage.Database rsAPI api.AppserviceRoomserverAPI @@ -47,7 +48,7 @@ type OutputRoomEventConsumer struct { // Start() to begin consuming from room servers. func NewOutputRoomEventConsumer( process *process.ProcessContext, - cfg *config.Dendrite, + cfg *config.AppServiceAPI, js nats.JetStreamContext, appserviceDB storage.Database, rsAPI api.AppserviceRoomserverAPI, @@ -55,27 +56,37 @@ func NewOutputRoomEventConsumer( ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ ctx: process.Context(), + cfg: cfg, jetstream: js, - durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"), - topic: cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), asDB: appserviceDB, rsAPI: rsAPI, - serverName: string(cfg.Global.ServerName), + serverName: string(cfg.Matrix.ServerName), workerStates: workerStates, } } // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, - nats.DeliverAll(), nats.ManualAck(), - ) + for _, as := range s.cfg.Derived.ApplicationServices { + appsvc, token := as, jetstream.Tokenise(as.ID) + if err := jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, + s.cfg.Matrix.JetStream.Durable("Appservice_"+token), + func(ctx context.Context, msg *nats.Msg) bool { + return s.onMessage(ctx, &appsvc, msg) + }, + nats.DeliverAll(), nats.ManualAck(), + ); err != nil { + return fmt.Errorf("failed to create %q consumer: %w", token, err) + } + } + return nil } // onMessage is called when the appservice component receives a new event from // the room server output log. -func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { +func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, as *config.ApplicationService, msg *nats.Msg) bool { // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Data, &output); err != nil { @@ -120,7 +131,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) } // Send event to any relevant application services - if err := s.filterRoomserverEvents(context.TODO(), events); err != nil { + if err := s.filterRoomserverEvents(ctx, events); err != nil { log.WithError(err).Errorf("roomserver output log: filter error") return true }