diff --git a/appservice/appservice.go b/appservice/appservice.go index bd292767b..c5ae9ceb2 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -32,7 +32,6 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" ) @@ -55,7 +54,7 @@ func NewInternalAPI( gomatrixserverlib.WithSkipVerify(base.Cfg.AppServiceAPI.DisableTLSValidation), ) - js, _ := jetstream.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(base, &base.Cfg.AppServiceAPI.Database) diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index c1e86114b..f550c29bb 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -44,7 +44,7 @@ func AddPublicRoutes( ) { cfg := &base.Cfg.ClientAPI mscCfg := &base.Cfg.MSCs - js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ JetStream: js, diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index e52377c94..bec9ac777 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -56,7 +56,7 @@ func AddPublicRoutes( ) { cfg := &base.Cfg.FederationAPI mscCfg := &base.Cfg.MSCs - js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) producer := &producers.SyncAPIProducer{ JetStream: js, TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), @@ -115,7 +115,7 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } - js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) queues := queue.NewOutgoingQueues( federationDB, base.ProcessContext, diff --git a/go.mod b/go.mod index 58cfbba11..803272c56 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20220506144035-8958f9dfdffd + github.com/matrix-org/gomatrixserverlib v0.0.0-20220509120958-8d818048c34c github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.10 diff --git a/go.sum b/go.sum index 047bbf5c3..5fde89e06 100644 --- a/go.sum +++ b/go.sum @@ -795,8 +795,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220506144035-8958f9dfdffd h1:11Wh+NMPDE5UDEx50RJnxeYj7zH5HEClzGfVMAjUy9U= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220506144035-8958f9dfdffd/go.mod h1:V5eO8rn/C3rcxig37A/BCeKerLFS+9Avg/77FIeTZ48= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220509120958-8d818048c34c h1:KqzqFWxvs90pcDaW9QEveW+Q5JcEYuNnKyaqXc+ohno= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220509120958-8d818048c34c/go.mod h1:V5eO8rn/C3rcxig37A/BCeKerLFS+9Avg/77FIeTZ48= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 007a48a55..47d7f57f9 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -39,7 +39,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { func NewInternalAPI( base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) db, err := storage.NewDatabase(base, &cfg.Database) if err != nil { diff --git a/keyserver/storage/postgres/one_time_keys_table.go b/keyserver/storage/postgres/one_time_keys_table.go index 0b143a1aa..d8c76b49b 100644 --- a/keyserver/storage/postgres/one_time_keys_table.go +++ b/keyserver/storage/postgres/one_time_keys_table.go @@ -39,6 +39,8 @@ CREATE TABLE IF NOT EXISTS keyserver_one_time_keys ( -- Clobber based on 4-uple of user/device/key/algorithm. CONSTRAINT keyserver_one_time_keys_unique UNIQUE (user_id, device_id, key_id, algorithm) ); + +CREATE INDEX IF NOT EXISTS keyserver_one_time_keys_idx ON keyserver_one_time_keys (user_id, device_id); ` const upsertKeysSQL = "" + diff --git a/keyserver/storage/sqlite3/one_time_keys_table.go b/keyserver/storage/sqlite3/one_time_keys_table.go index 897839aca..d2c0b7b20 100644 --- a/keyserver/storage/sqlite3/one_time_keys_table.go +++ b/keyserver/storage/sqlite3/one_time_keys_table.go @@ -38,6 +38,8 @@ CREATE TABLE IF NOT EXISTS keyserver_one_time_keys ( -- Clobber based on 4-uple of user/device/key/algorithm. UNIQUE (user_id, device_id, key_id, algorithm) ); + +CREATE INDEX IF NOT EXISTS keyserver_one_time_keys_idx ON keyserver_one_time_keys (user_id, device_id); ` const upsertKeysSQL = "" + diff --git a/roomserver/internal/input/input_test.go b/roomserver/internal/input/input_test.go index 5d34842bf..a95c13550 100644 --- a/roomserver/internal/input/input_test.go +++ b/roomserver/internal/input/input_test.go @@ -10,9 +10,9 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/test" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" ) @@ -21,11 +21,11 @@ var js nats.JetStreamContext var jc *nats.Conn func TestMain(m *testing.M) { - var pc *process.ProcessContext - pc, js, jc = jetstream.PrepareForTests() + var b *base.BaseDendrite + b, js, jc = test.Base(nil) code := m.Run() - pc.ShutdownDendrite() - pc.WaitForComponentsToFinish() + b.ShutdownDendrite() + b.WaitForComponentsToFinish() os.Exit(code) } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 46261eb3e..1480e8942 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -50,7 +50,7 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to room server db") } - js, nc := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) return internal.NewRoomserverAPI( base.ProcessContext, cfg, roomserverDB, js, nc, diff --git a/setup/base/base.go b/setup/base/base.go index ef449cc35..0e7528a03 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -41,6 +41,7 @@ import ( "golang.org/x/net/http2/h2c" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/gorilla/mux" @@ -77,6 +78,7 @@ type BaseDendrite struct { InternalAPIMux *mux.Router DendriteAdminMux *mux.Router SynapseAdminMux *mux.Router + NATS *jetstream.NATSInstance UseHTTPAPIs bool apiHttpClient *http.Client Cfg *config.Dendrite @@ -240,6 +242,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), DendriteAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.DendriteAdminPathPrefix).Subrouter().UseEncodedPath(), SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.SynapseAdminPathPrefix).Subrouter().UseEncodedPath(), + NATS: &jetstream.NATSInstance{}, apiHttpClient: &apiClient, Database: db, // set if monolith with global connection pool only DatabaseWriter: writer, // set if monolith with global connection pool only diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 8d5289697..426f02bb6 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -13,31 +13,23 @@ import ( "github.com/sirupsen/logrus" natsserver "github.com/nats-io/nats-server/v2/server" - "github.com/nats-io/nats.go" natsclient "github.com/nats-io/nats.go" ) -var natsServer *natsserver.Server -var natsServerMutex sync.Mutex - -func PrepareForTests() (*process.ProcessContext, nats.JetStreamContext, *nats.Conn) { - cfg := &config.Dendrite{} - cfg.Defaults(true) - cfg.Global.JetStream.InMemory = true - pc := process.NewProcessContext() - js, jc := Prepare(pc, &cfg.Global.JetStream) - return pc, js, jc +type NATSInstance struct { + *natsserver.Server + sync.Mutex } -func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { +func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { return setupNATS(process, cfg, nil) } - natsServerMutex.Lock() - if natsServer == nil { + s.Lock() + if s.Server == nil { var err error - natsServer, err = natsserver.NewServer(&natsserver.Options{ + s.Server, err = natsserver.NewServer(&natsserver.Options{ ServerName: "monolith", DontListen: true, JetStream: true, @@ -49,23 +41,23 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient if err != nil { panic(err) } - natsServer.ConfigureLogger() + s.ConfigureLogger() go func() { process.ComponentStarted() - natsServer.Start() + s.Start() }() go func() { <-process.WaitForShutdown() - natsServer.Shutdown() - natsServer.WaitForShutdown() + s.Shutdown() + s.WaitForShutdown() process.ComponentFinished() }() } - natsServerMutex.Unlock() - if !natsServer.ReadyForConnections(time.Second * 10) { + s.Unlock() + if !s.ReadyForConnections(time.Second * 10) { logrus.Fatalln("NATS did not start in time") } - nc, err := natsclient.Connect("", natsclient.InProcessServer(natsServer)) + nc, err := natsclient.Connect("", natsclient.InProcessServer(s)) if err != nil { logrus.Fatalln("Failed to create NATS client") } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 6da8ce6d1..dbc6e240c 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -45,7 +45,7 @@ func AddPublicRoutes( ) { cfg := &base.Cfg.SyncAPI - js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database) if err != nil { diff --git a/test/base.go b/test/base.go new file mode 100644 index 000000000..32fc8dc53 --- /dev/null +++ b/test/base.go @@ -0,0 +1,18 @@ +package test + +import ( + "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/setup/config" + "github.com/nats-io/nats.go" +) + +func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nats.Conn) { + if cfg == nil { + cfg = &config.Dendrite{} + cfg.Defaults(true) + } + cfg.Global.JetStream.InMemory = true + base := base.NewBaseDendrite(cfg, "Tests") + js, jc := base.NATS.Prepare(base.ProcessContext, &cfg.Global.JetStream) + return base, js, jc +} diff --git a/userapi/userapi.go b/userapi/userapi.go index 03a46807f..603b416bf 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -47,7 +47,7 @@ func NewInternalAPI( appServices []config.ApplicationService, keyAPI keyapi.UserKeyAPI, rsAPI rsapi.UserRoomserverAPI, pgClient pushgateway.Client, ) api.UserInternalAPI { - js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) db, err := storage.NewUserAPIDatabase( base,