Use process context for roomserver input (#2198)

This commit is contained in:
Neil Alexander 2022-02-17 15:58:54 +00:00 committed by GitHub
parent 140077265e
commit 0b123b29f5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 5 deletions

View file

@ -14,6 +14,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -32,6 +33,7 @@ type RoomserverInternalAPI struct {
*perform.Publisher *perform.Publisher
*perform.Backfiller *perform.Backfiller
*perform.Forgetter *perform.Forgetter
ProcessContext *process.ProcessContext
DB storage.Database DB storage.Database
Cfg *config.RoomServer Cfg *config.RoomServer
Cache caching.RoomServerCaches Cache caching.RoomServerCaches
@ -48,12 +50,13 @@ type RoomserverInternalAPI struct {
} }
func NewRoomserverAPI( func NewRoomserverAPI(
cfg *config.RoomServer, roomserverDB storage.Database, consumer nats.JetStreamContext, processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database,
inputRoomEventTopic, outputRoomEventTopic string, caches caching.RoomServerCaches, consumer nats.JetStreamContext, inputRoomEventTopic, outputRoomEventTopic string,
perspectiveServerNames []gomatrixserverlib.ServerName, caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName,
) *RoomserverInternalAPI { ) *RoomserverInternalAPI {
serverACLs := acls.NewServerACLs(roomserverDB) serverACLs := acls.NewServerACLs(roomserverDB)
a := &RoomserverInternalAPI{ a := &RoomserverInternalAPI{
ProcessContext: processCtx,
DB: roomserverDB, DB: roomserverDB,
Cfg: cfg, Cfg: cfg,
Cache: caches, Cache: caches,
@ -83,6 +86,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
r.KeyRing = keyRing r.KeyRing = keyRing
r.Inputer = &input.Inputer{ r.Inputer = &input.Inputer{
ProcessContext: r.ProcessContext,
DB: r.DB, DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic, InputRoomEventTopic: r.InputRoomEventTopic,
OutputRoomEventTopic: r.OutputRoomEventTopic, OutputRoomEventTopic: r.OutputRoomEventTopic,

View file

@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -59,6 +60,7 @@ var keyContentFields = map[string]string{
} }
type Inputer struct { type Inputer struct {
ProcessContext *process.ProcessContext
DB storage.Database DB storage.Database
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
Durable nats.SubOpt Durable nats.SubOpt
@ -115,7 +117,7 @@ func (r *Inputer) Start() error {
_ = msg.InProgress() // resets the acknowledgement wait timer _ = msg.InProgress() // resets the acknowledgement wait timer
defer eventsInProgress.Delete(index) defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
action, err := r.processRoomEventUsingUpdater(context.Background(), roomID, &inputRoomEvent) action, err := r.processRoomEventUsingUpdater(r.ProcessContext.Context(), roomID, &inputRoomEvent)
if err != nil { if err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err) sentry.CaptureException(err)

View file

@ -53,7 +53,7 @@ func NewInternalAPI(
js := jetstream.Prepare(&cfg.Matrix.JetStream) js := jetstream.Prepare(&cfg.Matrix.JetStream)
return internal.NewRoomserverAPI( return internal.NewRoomserverAPI(
cfg, roomserverDB, js, base.ProcessContext, cfg, roomserverDB, js,
cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent), cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent),
cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames, base.Caches, perspectiveServerNames,