diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index e8b9211c4..f677fe560 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -71,7 +71,7 @@ func NewOutputRoomEventConsumer( ctx: process.Context(), cfg: cfg, jetstream: js, - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputAppserviceEvent), rsAPI: rsAPI, } } @@ -109,7 +109,7 @@ func (s *OutputRoomEventConsumer) onMessage( for _, msg := range msgs { // Only handle events we care about receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType)) - if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInviteEvent { + if receivedType != api.OutputTypeNewRoomEvent { continue } // Parse out the event JSON diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 741407926..1dc9f4cec 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -20,6 +20,7 @@ var ( InputDeviceListUpdate = "InputDeviceListUpdate" InputSigningKeyUpdate = "InputSigningKeyUpdate" OutputRoomEvent = "OutputRoomEvent" + OutputAppserviceEvent = "OutputAppserviceEvent" OutputSendToDeviceEvent = "OutputSendToDeviceEvent" OutputKeyChangeEvent = "OutputKeyChangeEvent" OutputTypingEvent = "OutputTypingEvent" @@ -65,6 +66,11 @@ var streams = []*nats.StreamConfig{ Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, + { + Name: OutputAppserviceEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, { Name: OutputSendToDeviceEvent, Retention: nats.InterestPolicy, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 666f900d7..81c532f19 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/synctypes" @@ -55,6 +56,7 @@ type OutputRoomEventConsumer struct { inviteStream streams.StreamProvider notifier *notifier.Notifier fts fulltext.Indexer + asProducer *producers.AppserviceEventProducer } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -68,6 +70,7 @@ func NewOutputRoomEventConsumer( inviteStream streams.StreamProvider, rsAPI api.SyncRoomserverAPI, fts *fulltext.Search, + asProducer *producers.AppserviceEventProducer, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ ctx: process.Context(), @@ -81,6 +84,7 @@ func NewOutputRoomEventConsumer( inviteStream: inviteStream, rsAPI: rsAPI, fts: fts, + asProducer: asProducer, } } @@ -119,6 +123,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms } } err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent) + if err == nil && s.asProducer != nil { + if err = s.asProducer.ProduceRoomEvents(msg); err != nil { + log.WithError(err).Warn("failed to produce OutputAppserviceEvent") + } + } case api.OutputTypeOldRoomEvent: err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent) case api.OutputTypeNewInviteEvent: diff --git a/syncapi/producers/appservices.go b/syncapi/producers/appservices.go new file mode 100644 index 000000000..b34fdc43e --- /dev/null +++ b/syncapi/producers/appservices.go @@ -0,0 +1,36 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" +) + +// AppserviceEventProducer produces events for the appservice API to consume +type AppserviceEventProducer struct { + Topic string + JetStream nats.JetStreamContext +} + +func (a *AppserviceEventProducer) ProduceRoomEvents( + msg *nats.Msg, +) error { + if _, err := a.JetStream.PublishMsg(msg); err != nil { + logrus.WithError(err).Errorf("Failed to produce to topic '%s': %s", a.Topic, err) + return err + } + return nil +} diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 091e3db41..0418ffc05 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -100,9 +100,16 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start key change consumer") } + var asProducer *producers.AppserviceEventProducer + if len(dendriteCfg.AppServiceAPI.Derived.ApplicationServices) > 0 { + asProducer = &producers.AppserviceEventProducer{ + JetStream: js, Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputAppserviceEvent), + } + } + roomConsumer := consumers.NewOutputRoomEventConsumer( processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider, - streams.InviteStreamProvider, rsAPI, fts, + streams.InviteStreamProvider, rsAPI, fts, asProducer, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer")