Introduce a new stream for the appservice consumer

This commit is contained in:
Till Faelligen 2023-12-01 08:26:06 +01:00
parent fd11e65a9d
commit 2188f4f32e
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
5 changed files with 61 additions and 3 deletions

View file

@ -71,7 +71,7 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(), ctx: process.Context(),
cfg: cfg, cfg: cfg,
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputAppserviceEvent),
rsAPI: rsAPI, rsAPI: rsAPI,
} }
} }
@ -109,7 +109,7 @@ func (s *OutputRoomEventConsumer) onMessage(
for _, msg := range msgs { for _, msg := range msgs {
// Only handle events we care about // Only handle events we care about
receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType)) receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType))
if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInviteEvent { if receivedType != api.OutputTypeNewRoomEvent {
continue continue
} }
// Parse out the event JSON // Parse out the event JSON

View file

@ -20,6 +20,7 @@ var (
InputDeviceListUpdate = "InputDeviceListUpdate" InputDeviceListUpdate = "InputDeviceListUpdate"
InputSigningKeyUpdate = "InputSigningKeyUpdate" InputSigningKeyUpdate = "InputSigningKeyUpdate"
OutputRoomEvent = "OutputRoomEvent" OutputRoomEvent = "OutputRoomEvent"
OutputAppserviceEvent = "OutputAppserviceEvent"
OutputSendToDeviceEvent = "OutputSendToDeviceEvent" OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
OutputKeyChangeEvent = "OutputKeyChangeEvent" OutputKeyChangeEvent = "OutputKeyChangeEvent"
OutputTypingEvent = "OutputTypingEvent" OutputTypingEvent = "OutputTypingEvent"
@ -65,6 +66,11 @@ var streams = []*nats.StreamConfig{
Retention: nats.InterestPolicy, Retention: nats.InterestPolicy,
Storage: nats.FileStorage, Storage: nats.FileStorage,
}, },
{
Name: OutputAppserviceEvent,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{ {
Name: OutputSendToDeviceEvent, Name: OutputSendToDeviceEvent,
Retention: nats.InterestPolicy, Retention: nats.InterestPolicy,

View file

@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/synctypes" "github.com/matrix-org/dendrite/syncapi/synctypes"
@ -55,6 +56,7 @@ type OutputRoomEventConsumer struct {
inviteStream streams.StreamProvider inviteStream streams.StreamProvider
notifier *notifier.Notifier notifier *notifier.Notifier
fts fulltext.Indexer fts fulltext.Indexer
asProducer *producers.AppserviceEventProducer
} }
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@ -68,6 +70,7 @@ func NewOutputRoomEventConsumer(
inviteStream streams.StreamProvider, inviteStream streams.StreamProvider,
rsAPI api.SyncRoomserverAPI, rsAPI api.SyncRoomserverAPI,
fts *fulltext.Search, fts *fulltext.Search,
asProducer *producers.AppserviceEventProducer,
) *OutputRoomEventConsumer { ) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{ return &OutputRoomEventConsumer{
ctx: process.Context(), ctx: process.Context(),
@ -81,6 +84,7 @@ func NewOutputRoomEventConsumer(
inviteStream: inviteStream, inviteStream: inviteStream,
rsAPI: rsAPI, rsAPI: rsAPI,
fts: fts, fts: fts,
asProducer: asProducer,
} }
} }
@ -119,6 +123,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
} }
} }
err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent) err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent)
if err == nil && s.asProducer != nil {
if err = s.asProducer.ProduceRoomEvents(msg); err != nil {
log.WithError(err).Warn("failed to produce OutputAppserviceEvent")
}
}
case api.OutputTypeOldRoomEvent: case api.OutputTypeOldRoomEvent:
err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent) err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent: case api.OutputTypeNewInviteEvent:

View file

@ -0,0 +1,36 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
// AppserviceEventProducer produces events for the appservice API to consume
type AppserviceEventProducer struct {
Topic string
JetStream nats.JetStreamContext
}
func (a *AppserviceEventProducer) ProduceRoomEvents(
msg *nats.Msg,
) error {
if _, err := a.JetStream.PublishMsg(msg); err != nil {
logrus.WithError(err).Errorf("Failed to produce to topic '%s': %s", a.Topic, err)
return err
}
return nil
}

View file

@ -100,9 +100,16 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start key change consumer") logrus.WithError(err).Panicf("failed to start key change consumer")
} }
var asProducer *producers.AppserviceEventProducer
if len(dendriteCfg.AppServiceAPI.Derived.ApplicationServices) > 0 {
asProducer = &producers.AppserviceEventProducer{
JetStream: js, Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputAppserviceEvent),
}
}
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider, processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI, fts, streams.InviteStreamProvider, rsAPI, fts, asProducer,
) )
if err = roomConsumer.Start(); err != nil { if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer") logrus.WithError(err).Panicf("failed to start room server consumer")