Another sprinkling of magic

This commit is contained in:
Neil Alexander 2022-03-22 13:24:16 +00:00
parent a0116959c3
commit c2501c6937
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 30 additions and 7 deletions

View file

@ -90,6 +90,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
r.KeyRing = keyRing r.KeyRing = keyRing
r.Inputer = &input.Inputer{ r.Inputer = &input.Inputer{
Cfg: r.Cfg,
ProcessContext: r.ProcessContext, ProcessContext: r.ProcessContext,
DB: r.DB, DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic, InputRoomEventTopic: r.InputRoomEventTopic,

View file

@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -46,6 +47,7 @@ var keyContentFields = map[string]string{
} }
type Inputer struct { type Inputer struct {
Cfg *config.RoomServer
ProcessContext *process.ProcessContext ProcessContext *process.ProcessContext
DB storage.Database DB storage.Database
NATSClient *nats.Conn NATSClient *nats.Conn
@ -69,7 +71,7 @@ type worker struct {
subscription *nats.Subscription subscription *nats.Subscription
} }
func (r *Inputer) workerForRoom(roomID string) *worker { func (r *Inputer) startWorkerForRoom(roomID string) {
v, loaded := r.workers.LoadOrStore(roomID, &worker{ v, loaded := r.workers.LoadOrStore(roomID, &worker{
r: r, r: r,
roomID: roomID, roomID: roomID,
@ -77,23 +79,43 @@ func (r *Inputer) workerForRoom(roomID string) *worker {
w := v.(*worker) w := v.(*worker)
if !loaded { if !loaded {
consumer := "DendriteRoomInputConsumerPull" + jetstream.Tokenise(w.roomID) consumer := "DendriteRoomInputConsumerPull" + jetstream.Tokenise(w.roomID)
subject := jetstream.InputRoomEventSubj(w.roomID)
if info, err := w.r.JetStream.ConsumerInfo(
jetstream.InputRoomEvent,
consumer,
); err != nil || info == nil {
if _, err := w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent),
&nats.ConsumerConfig{
Durable: consumer,
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: subject,
AckWait: MaximumMissingProcessingTime + (time.Second * 10),
},
); err != nil {
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
return
}
}
sub, err := w.r.JetStream.PullSubscribe( sub, err := w.r.JetStream.PullSubscribe(
jetstream.InputRoomEventSubj(w.roomID), consumer, subject, consumer,
nats.ManualAck(), nats.ManualAck(),
nats.DeliverAll(), nats.DeliverAll(),
nats.MaxAckPending(-1),
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
nats.Bind(r.InputRoomEventTopic, consumer), nats.Bind(r.InputRoomEventTopic, consumer),
) )
if err != nil { if err != nil {
logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
return nil return
} }
logrus.Infof("Subscribed to stream for room %q", w.roomID)
logrus.Infof("Started stream for room %q", w.roomID)
w.subscription = sub w.subscription = sub
w.Act(nil, w.next) w.Act(nil, w.next)
} }
return w
} }
// onMessage is called when a new event arrives in the roomserver input stream. // onMessage is called when a new event arrives in the roomserver input stream.
@ -102,7 +124,7 @@ func (r *Inputer) Start() error {
"", // don't supply a subject because we're using BindStream "", // don't supply a subject because we're using BindStream
func(m *nats.Msg) { func(m *nats.Msg) {
roomID := m.Header.Get(jetstream.RoomID) roomID := m.Header.Get(jetstream.RoomID)
r.workerForRoom(roomID) // start the room worker r.startWorkerForRoom(roomID)
}, },
nats.DeliverAll(), nats.DeliverAll(),
nats.AckNone(), nats.AckNone(),