One NATS instance per BaseDendrite (#2438)

* One NATS instance per `BaseDendrite`

* Fix roomserver
This commit is contained in:
Neil Alexander 2022-05-09 14:15:24 +01:00 committed by GitHub
parent 79e2fbc663
commit 09d754cfbf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 49 additions and 37 deletions

View file

@ -32,7 +32,6 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -55,7 +54,7 @@ func NewInternalAPI(
gomatrixserverlib.WithSkipVerify(base.Cfg.AppServiceAPI.DisableTLSValidation), gomatrixserverlib.WithSkipVerify(base.Cfg.AppServiceAPI.DisableTLSValidation),
) )
js, _ := jetstream.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
// Create a connection to the appservice postgres DB // Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(base, &base.Cfg.AppServiceAPI.Database) appserviceDB, err := storage.NewDatabase(base, &base.Cfg.AppServiceAPI.Database)

View file

@ -44,7 +44,7 @@ func AddPublicRoutes(
) { ) {
cfg := &base.Cfg.ClientAPI cfg := &base.Cfg.ClientAPI
mscCfg := &base.Cfg.MSCs mscCfg := &base.Cfg.MSCs
js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{ syncProducer := &producers.SyncAPIProducer{
JetStream: js, JetStream: js,

View file

@ -56,7 +56,7 @@ func AddPublicRoutes(
) { ) {
cfg := &base.Cfg.FederationAPI cfg := &base.Cfg.FederationAPI
mscCfg := &base.Cfg.MSCs mscCfg := &base.Cfg.MSCs
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
producer := &producers.SyncAPIProducer{ producer := &producers.SyncAPIProducer{
JetStream: js, JetStream: js,
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
@ -115,7 +115,7 @@ func NewInternalAPI(
FailuresUntilBlacklist: cfg.FederationMaxRetries, FailuresUntilBlacklist: cfg.FederationMaxRetries,
} }
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
federationDB, base.ProcessContext, federationDB, base.ProcessContext,

View file

@ -39,7 +39,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
func NewInternalAPI( func NewInternalAPI(
base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
) api.KeyInternalAPI { ) api.KeyInternalAPI {
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
db, err := storage.NewDatabase(base, &cfg.Database) db, err := storage.NewDatabase(base, &cfg.Database)
if err != nil { if err != nil {

View file

@ -10,9 +10,9 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
@ -21,11 +21,11 @@ var js nats.JetStreamContext
var jc *nats.Conn var jc *nats.Conn
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
var pc *process.ProcessContext var b *base.BaseDendrite
pc, js, jc = jetstream.PrepareForTests() b, js, jc = test.Base(nil)
code := m.Run() code := m.Run()
pc.ShutdownDendrite() b.ShutdownDendrite()
pc.WaitForComponentsToFinish() b.WaitForComponentsToFinish()
os.Exit(code) os.Exit(code)
} }

View file

@ -50,7 +50,7 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to room server db") logrus.WithError(err).Panicf("failed to connect to room server db")
} }
js, nc := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
return internal.NewRoomserverAPI( return internal.NewRoomserverAPI(
base.ProcessContext, cfg, roomserverDB, js, nc, base.ProcessContext, cfg, roomserverDB, js, nc,

View file

@ -41,6 +41,7 @@ import (
"golang.org/x/net/http2/h2c" "golang.org/x/net/http2/h2c"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -77,6 +78,7 @@ type BaseDendrite struct {
InternalAPIMux *mux.Router InternalAPIMux *mux.Router
DendriteAdminMux *mux.Router DendriteAdminMux *mux.Router
SynapseAdminMux *mux.Router SynapseAdminMux *mux.Router
NATS *jetstream.NATSInstance
UseHTTPAPIs bool UseHTTPAPIs bool
apiHttpClient *http.Client apiHttpClient *http.Client
Cfg *config.Dendrite Cfg *config.Dendrite
@ -240,6 +242,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
DendriteAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.DendriteAdminPathPrefix).Subrouter().UseEncodedPath(), DendriteAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.DendriteAdminPathPrefix).Subrouter().UseEncodedPath(),
SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.SynapseAdminPathPrefix).Subrouter().UseEncodedPath(), SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.SynapseAdminPathPrefix).Subrouter().UseEncodedPath(),
NATS: &jetstream.NATSInstance{},
apiHttpClient: &apiClient, apiHttpClient: &apiClient,
Database: db, // set if monolith with global connection pool only Database: db, // set if monolith with global connection pool only
DatabaseWriter: writer, // set if monolith with global connection pool only DatabaseWriter: writer, // set if monolith with global connection pool only

View file

@ -13,31 +13,23 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
natsserver "github.com/nats-io/nats-server/v2/server" natsserver "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go" natsclient "github.com/nats-io/nats.go"
) )
var natsServer *natsserver.Server type NATSInstance struct {
var natsServerMutex sync.Mutex *natsserver.Server
sync.Mutex
func PrepareForTests() (*process.ProcessContext, nats.JetStreamContext, *nats.Conn) {
cfg := &config.Dendrite{}
cfg.Defaults(true)
cfg.Global.JetStream.InMemory = true
pc := process.NewProcessContext()
js, jc := Prepare(pc, &cfg.Global.JetStream)
return pc, js, jc
} }
func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
// check if we need an in-process NATS Server // check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 { if len(cfg.Addresses) != 0 {
return setupNATS(process, cfg, nil) return setupNATS(process, cfg, nil)
} }
natsServerMutex.Lock() s.Lock()
if natsServer == nil { if s.Server == nil {
var err error var err error
natsServer, err = natsserver.NewServer(&natsserver.Options{ s.Server, err = natsserver.NewServer(&natsserver.Options{
ServerName: "monolith", ServerName: "monolith",
DontListen: true, DontListen: true,
JetStream: true, JetStream: true,
@ -49,23 +41,23 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient
if err != nil { if err != nil {
panic(err) panic(err)
} }
natsServer.ConfigureLogger() s.ConfigureLogger()
go func() { go func() {
process.ComponentStarted() process.ComponentStarted()
natsServer.Start() s.Start()
}() }()
go func() { go func() {
<-process.WaitForShutdown() <-process.WaitForShutdown()
natsServer.Shutdown() s.Shutdown()
natsServer.WaitForShutdown() s.WaitForShutdown()
process.ComponentFinished() process.ComponentFinished()
}() }()
} }
natsServerMutex.Unlock() s.Unlock()
if !natsServer.ReadyForConnections(time.Second * 10) { if !s.ReadyForConnections(time.Second * 10) {
logrus.Fatalln("NATS did not start in time") logrus.Fatalln("NATS did not start in time")
} }
nc, err := natsclient.Connect("", natsclient.InProcessServer(natsServer)) nc, err := natsclient.Connect("", natsclient.InProcessServer(s))
if err != nil { if err != nil {
logrus.Fatalln("Failed to create NATS client") logrus.Fatalln("Failed to create NATS client")
} }

View file

@ -45,7 +45,7 @@ func AddPublicRoutes(
) { ) {
cfg := &base.Cfg.SyncAPI cfg := &base.Cfg.SyncAPI
js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database) syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database)
if err != nil { if err != nil {

18
test/base.go Normal file
View file

@ -0,0 +1,18 @@
package test
import (
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/nats-io/nats.go"
)
func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nats.Conn) {
if cfg == nil {
cfg = &config.Dendrite{}
cfg.Defaults(true)
}
cfg.Global.JetStream.InMemory = true
base := base.NewBaseDendrite(cfg, "Tests")
js, jc := base.NATS.Prepare(base.ProcessContext, &cfg.Global.JetStream)
return base, js, jc
}

View file

@ -47,7 +47,7 @@ func NewInternalAPI(
appServices []config.ApplicationService, keyAPI keyapi.UserKeyAPI, appServices []config.ApplicationService, keyAPI keyapi.UserKeyAPI,
rsAPI rsapi.UserRoomserverAPI, pgClient pushgateway.Client, rsAPI rsapi.UserRoomserverAPI, pgClient pushgateway.Client,
) api.UserInternalAPI { ) api.UserInternalAPI {
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
db, err := storage.NewUserAPIDatabase( db, err := storage.NewUserAPIDatabase(
base, base,