From 09d754cfbf9268044d0f59fbe509640b8d71e011 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 9 May 2022 14:15:24 +0100 Subject: [PATCH] One NATS instance per `BaseDendrite` (#2438) * One NATS instance per `BaseDendrite` * Fix roomserver --- appservice/appservice.go | 3 +-- clientapi/clientapi.go | 2 +- federationapi/federationapi.go | 4 +-- keyserver/keyserver.go | 2 +- roomserver/internal/input/input_test.go | 12 ++++----- roomserver/roomserver.go | 2 +- setup/base/base.go | 3 +++ setup/jetstream/nats.go | 36 ++++++++++--------------- syncapi/syncapi.go | 2 +- test/base.go | 18 +++++++++++++ userapi/userapi.go | 2 +- 11 files changed, 49 insertions(+), 37 deletions(-) create mode 100644 test/base.go diff --git a/appservice/appservice.go b/appservice/appservice.go index bd292767b..c5ae9ceb2 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -32,7 +32,6 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" ) @@ -55,7 +54,7 @@ func NewInternalAPI( gomatrixserverlib.WithSkipVerify(base.Cfg.AppServiceAPI.DisableTLSValidation), ) - js, _ := jetstream.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(base, &base.Cfg.AppServiceAPI.Database) diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index c1e86114b..f550c29bb 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -44,7 +44,7 @@ func AddPublicRoutes( ) { cfg := &base.Cfg.ClientAPI mscCfg := &base.Cfg.MSCs - js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ JetStream: js, diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index e52377c94..bec9ac777 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -56,7 +56,7 @@ func AddPublicRoutes( ) { cfg := &base.Cfg.FederationAPI mscCfg := &base.Cfg.MSCs - js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) producer := &producers.SyncAPIProducer{ JetStream: js, TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), @@ -115,7 +115,7 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } - js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) queues := queue.NewOutgoingQueues( federationDB, base.ProcessContext, diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 007a48a55..47d7f57f9 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -39,7 +39,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { func NewInternalAPI( base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) db, err := storage.NewDatabase(base, &cfg.Database) if err != nil { diff --git a/roomserver/internal/input/input_test.go b/roomserver/internal/input/input_test.go index 5d34842bf..a95c13550 100644 --- a/roomserver/internal/input/input_test.go +++ b/roomserver/internal/input/input_test.go @@ -10,9 +10,9 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/test" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" ) @@ -21,11 +21,11 @@ var js nats.JetStreamContext var jc *nats.Conn func TestMain(m *testing.M) { - var pc *process.ProcessContext - pc, js, jc = jetstream.PrepareForTests() + var b *base.BaseDendrite + b, js, jc = test.Base(nil) code := m.Run() - pc.ShutdownDendrite() - pc.WaitForComponentsToFinish() + b.ShutdownDendrite() + b.WaitForComponentsToFinish() os.Exit(code) } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 46261eb3e..1480e8942 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -50,7 +50,7 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to room server db") } - js, nc := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) return internal.NewRoomserverAPI( base.ProcessContext, cfg, roomserverDB, js, nc, diff --git a/setup/base/base.go b/setup/base/base.go index ef449cc35..0e7528a03 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -41,6 +41,7 @@ import ( "golang.org/x/net/http2/h2c" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/gorilla/mux" @@ -77,6 +78,7 @@ type BaseDendrite struct { InternalAPIMux *mux.Router DendriteAdminMux *mux.Router SynapseAdminMux *mux.Router + NATS *jetstream.NATSInstance UseHTTPAPIs bool apiHttpClient *http.Client Cfg *config.Dendrite @@ -240,6 +242,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), DendriteAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.DendriteAdminPathPrefix).Subrouter().UseEncodedPath(), SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.SynapseAdminPathPrefix).Subrouter().UseEncodedPath(), + NATS: &jetstream.NATSInstance{}, apiHttpClient: &apiClient, Database: db, // set if monolith with global connection pool only DatabaseWriter: writer, // set if monolith with global connection pool only diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 8d5289697..426f02bb6 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -13,31 +13,23 @@ import ( "github.com/sirupsen/logrus" natsserver "github.com/nats-io/nats-server/v2/server" - "github.com/nats-io/nats.go" natsclient "github.com/nats-io/nats.go" ) -var natsServer *natsserver.Server -var natsServerMutex sync.Mutex - -func PrepareForTests() (*process.ProcessContext, nats.JetStreamContext, *nats.Conn) { - cfg := &config.Dendrite{} - cfg.Defaults(true) - cfg.Global.JetStream.InMemory = true - pc := process.NewProcessContext() - js, jc := Prepare(pc, &cfg.Global.JetStream) - return pc, js, jc +type NATSInstance struct { + *natsserver.Server + sync.Mutex } -func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { +func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { return setupNATS(process, cfg, nil) } - natsServerMutex.Lock() - if natsServer == nil { + s.Lock() + if s.Server == nil { var err error - natsServer, err = natsserver.NewServer(&natsserver.Options{ + s.Server, err = natsserver.NewServer(&natsserver.Options{ ServerName: "monolith", DontListen: true, JetStream: true, @@ -49,23 +41,23 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient if err != nil { panic(err) } - natsServer.ConfigureLogger() + s.ConfigureLogger() go func() { process.ComponentStarted() - natsServer.Start() + s.Start() }() go func() { <-process.WaitForShutdown() - natsServer.Shutdown() - natsServer.WaitForShutdown() + s.Shutdown() + s.WaitForShutdown() process.ComponentFinished() }() } - natsServerMutex.Unlock() - if !natsServer.ReadyForConnections(time.Second * 10) { + s.Unlock() + if !s.ReadyForConnections(time.Second * 10) { logrus.Fatalln("NATS did not start in time") } - nc, err := natsclient.Connect("", natsclient.InProcessServer(natsServer)) + nc, err := natsclient.Connect("", natsclient.InProcessServer(s)) if err != nil { logrus.Fatalln("Failed to create NATS client") } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 6da8ce6d1..dbc6e240c 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -45,7 +45,7 @@ func AddPublicRoutes( ) { cfg := &base.Cfg.SyncAPI - js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database) if err != nil { diff --git a/test/base.go b/test/base.go new file mode 100644 index 000000000..32fc8dc53 --- /dev/null +++ b/test/base.go @@ -0,0 +1,18 @@ +package test + +import ( + "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/setup/config" + "github.com/nats-io/nats.go" +) + +func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nats.Conn) { + if cfg == nil { + cfg = &config.Dendrite{} + cfg.Defaults(true) + } + cfg.Global.JetStream.InMemory = true + base := base.NewBaseDendrite(cfg, "Tests") + js, jc := base.NATS.Prepare(base.ProcessContext, &cfg.Global.JetStream) + return base, js, jc +} diff --git a/userapi/userapi.go b/userapi/userapi.go index 03a46807f..603b416bf 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -47,7 +47,7 @@ func NewInternalAPI( appServices []config.ApplicationService, keyAPI keyapi.UserKeyAPI, rsAPI rsapi.UserRoomserverAPI, pgClient pushgateway.Client, ) api.UserInternalAPI { - js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) db, err := storage.NewUserAPIDatabase( base,