From 8f54ab7f3b24e3d37110cdc8cb42974e918caa36 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 15 Oct 2020 12:28:09 +0100 Subject: [PATCH] Start Kafka connection for each component that needs one --- appservice/appservice.go | 4 ++- build/gobind/monolith.go | 14 ++++---- cmd/dendrite-client-api-server/main.go | 3 +- cmd/dendrite-demo-libp2p/main.go | 14 ++++---- cmd/dendrite-demo-yggdrasil/main.go | 14 ++++---- cmd/dendrite-key-server/main.go | 2 +- cmd/dendrite-monolith-server/main.go | 14 ++++---- cmd/dendrite-sync-api-server/main.go | 4 ++- cmd/dendritejs/main.go | 14 ++++---- eduserver/eduserver.go | 5 ++- federationsender/federationsender.go | 8 +++-- internal/setup/base.go | 44 +++++++++++++++----------- keyserver/keyserver.go | 6 ++-- roomserver/roomserver.go | 4 ++- roomserver/roomserver_test.go | 5 ++- 15 files changed, 82 insertions(+), 73 deletions(-) diff --git a/appservice/appservice.go b/appservice/appservice.go index e356f68ee..912b768c6 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -47,6 +47,8 @@ func NewInternalAPI( userAPI userapi.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) appserviceAPI.AppServiceQueryAPI { + consumer, _ := setup.SetupConsumerProducer(&base.Cfg.Global.Kafka) + // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) if err != nil { @@ -86,7 +88,7 @@ func NewInternalAPI( // We can't add ASes at runtime so this is safe to do. if len(workerStates) > 0 { consumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, appserviceDB, + base.Cfg, consumer, appserviceDB, rsAPI, workerStates, ) if err := consumer.Start(); err != nil { diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index 7d10b87e4..fd010809c 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -112,7 +112,7 @@ func (m *DendriteMonolith) Start() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -146,13 +146,11 @@ func (m *DendriteMonolith) Start() { rsAPI.SetFederationSenderAPI(fsAPI) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: ygg.CreateClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: ygg.CreateClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go index 0fdc6679f..b1deb5b77 100644 --- a/cmd/dendrite-client-api-server/main.go +++ b/cmd/dendrite-client-api-server/main.go @@ -22,6 +22,7 @@ import ( func main() { cfg := setup.ParseFlags(false) + _, producer := setup.SetupConsumerProducer(&cfg.Global.Kafka) base := setup.NewBaseDendrite(cfg, "ClientAPI", true) defer base.Close() // nolint: errcheck @@ -37,7 +38,7 @@ func main() { keyAPI := base.KeyServerHTTPClient() clientapi.AddPublicRoutes( - base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, accountDB, federation, + base.PublicClientAPIMux, &base.Cfg.ClientAPI, producer, accountDB, federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil, ) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index b5386325c..61fdd801a 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -139,7 +139,7 @@ func main() { accountDB := base.Base.CreateAccountsDB() federation := createFederationClient(base) - keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation, base.Base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -169,13 +169,11 @@ func main() { } monolith := setup.Monolith{ - Config: base.Base.Cfg, - AccountDB: accountDB, - Client: createClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.Base.KafkaConsumer, - KafkaProducer: base.Base.KafkaProducer, + Config: base.Base.Cfg, + AccountDB: accountDB, + Client: createClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 5e8b92318..a40973638 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -96,7 +96,7 @@ func main() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -129,13 +129,11 @@ func main() { rsComponent.SetFederationSenderAPI(fsAPI) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: ygg.CreateClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: ygg.CreateClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-key-server/main.go b/cmd/dendrite-key-server/main.go index 92d18ac38..ff5b22236 100644 --- a/cmd/dendrite-key-server/main.go +++ b/cmd/dendrite-key-server/main.go @@ -24,7 +24,7 @@ func main() { base := setup.NewBaseDendrite(cfg, "KeyServer", true) defer base.Close() // nolint: errcheck - intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient(), base.KafkaProducer) + intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient()) intAPI.SetUserAPI(base.UserAPIClient()) keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 0fe70ca8c..e935805f6 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -108,7 +108,7 @@ func main() { // This is different to rsAPI which can be the http client which doesn't need this dependency rsImpl.SetFederationSenderAPI(fsAPI) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -127,13 +127,11 @@ func main() { } monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: base.CreateClient(), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: base.CreateClient(), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go index b879f842f..3ebeb3d02 100644 --- a/cmd/dendrite-sync-api-server/main.go +++ b/cmd/dendrite-sync-api-server/main.go @@ -21,6 +21,8 @@ import ( func main() { cfg := setup.ParseFlags(false) + consumer, _ := setup.SetupConsumerProducer(&cfg.Global.Kafka) + base := setup.NewBaseDendrite(cfg, "SyncAPI", true) defer base.Close() // nolint: errcheck @@ -30,7 +32,7 @@ func main() { rsAPI := base.RoomserverHTTPClient() syncapi.AddPublicRoutes( - base.PublicClientAPIMux, base.KafkaConsumer, userAPI, rsAPI, + base.PublicClientAPIMux, consumer, userAPI, rsAPI, base.KeyServerHTTPClient(), federation, &cfg.SyncAPI, ) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 2d7f8b02b..85cc8a9fb 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -190,7 +190,7 @@ func main() { accountDB := base.CreateAccountsDB() federation := createFederationClient(cfg, node) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -212,13 +212,11 @@ func main() { p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: createClient(node), - FedClient: federation, - KeyRing: &keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: createClient(node), + FedClient: federation, + KeyRing: &keyRing, AppserviceAPI: asQuery, EDUInternalAPI: eduInputAPI, diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index b6196c269..5661a0b8c 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -41,10 +41,13 @@ func NewInternalAPI( userAPI userapi.UserInternalAPI, ) api.EDUServerInputAPI { cfg := &base.Cfg.EDUServer + + _, producer := setup.SetupConsumerProducer(&cfg.Matrix.Kafka) + return &input.EDUServerInputAPI{ Cache: eduCache, UserAPI: userAPI, - Producer: base.KafkaProducer, + Producer: producer, OutputTypingEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), ServerName: cfg.Matrix.ServerName, diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 2f1223284..2fbbf5deb 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -55,6 +55,8 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } + consumer, _ := setup.SetupConsumerProducer(&cfg.Matrix.Kafka) + queues := queue.NewOutgoingQueues( federationSenderDB, cfg.Matrix.ServerName, federation, rsAPI, stats, @@ -66,7 +68,7 @@ func NewInternalAPI( ) rsConsumer := consumers.NewOutputRoomEventConsumer( - cfg, base.KafkaConsumer, queues, + cfg, consumer, queues, federationSenderDB, rsAPI, ) if err = rsConsumer.Start(); err != nil { @@ -74,13 +76,13 @@ func NewInternalAPI( } tsConsumer := consumers.NewOutputEDUConsumer( - cfg, base.KafkaConsumer, queues, federationSenderDB, + cfg, consumer, queues, federationSenderDB, ) if err := tsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start typing server consumer") } keyConsumer := consumers.NewKeyChangeConsumer( - &base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, rsAPI, + &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI, ) if err := keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") diff --git a/internal/setup/base.go b/internal/setup/base.go index 24a0d6aa6..d3ce0047a 100644 --- a/internal/setup/base.go +++ b/internal/setup/base.go @@ -73,8 +73,8 @@ type BaseDendrite struct { httpClient *http.Client Cfg *config.Dendrite Caches *caching.Caches - KafkaConsumer sarama.Consumer - KafkaProducer sarama.SyncProducer + // KafkaConsumer sarama.Consumer + // KafkaProducer sarama.SyncProducer } const HTTPServerTimeout = time.Minute * 5 @@ -106,14 +106,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo logrus.WithError(err).Panicf("failed to start opentracing") } - var kafkaConsumer sarama.Consumer - var kafkaProducer sarama.SyncProducer - if cfg.Global.Kafka.UseNaffka { - kafkaConsumer, kafkaProducer = setupNaffka(cfg) - } else { - kafkaConsumer, kafkaProducer = setupKafka(cfg) - } - cache, err := caching.NewInMemoryLRUCache(true) if err != nil { logrus.WithError(err).Warnf("Failed to create cache") @@ -152,8 +144,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), apiHttpClient: &apiClient, httpClient: &client, - KafkaConsumer: kafkaConsumer, - KafkaProducer: kafkaProducer, } } @@ -335,14 +325,21 @@ func (b *BaseDendrite) SetupAndServeHTTP( select {} } +func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if cfg.UseNaffka { + return setupNaffka(cfg) + } + return setupKafka(cfg) +} + // setupKafka creates kafka consumer/producer pair from the config. -func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - consumer, err := sarama.NewConsumer(cfg.Global.Kafka.Addresses, nil) +func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + consumer, err := sarama.NewConsumer(cfg.Addresses, nil) if err != nil { logrus.WithError(err).Panic("failed to start kafka consumer") } - producer, err := sarama.NewSyncProducer(cfg.Global.Kafka.Addresses, nil) + producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) if err != nil { logrus.WithError(err).Panic("failed to setup kafka producers") } @@ -350,15 +347,24 @@ func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { return consumer, producer } +// In monolith mode with Naffka, we don't have the same constraints about +// consuming the same topic from more than one place like we do with Kafka. +// Therefore, we will only open one Naffka connection in case Naffka is +// running on SQLite. +var naffkaDatabase *naffka.Naffka + // setupNaffka creates kafka consumer/producer pair from the config. -func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString)) +func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if naffkaDatabase != nil { + return naffkaDatabase, naffkaDatabase + } + naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) if err != nil { logrus.WithError(err).Panic("Failed to setup naffka database") } - naff, err := naffka.New(naffkaDB) + naffkaDatabase, err = naffka.New(naffkaDB) if err != nil { logrus.WithError(err).Panic("Failed to setup naffka") } - return naff, naff + return naffkaDatabase, naffkaDatabase } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 78420db1f..3f9fcf5d6 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -15,10 +15,10 @@ package keyserver import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" @@ -36,8 +36,10 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, producer sarama.SyncProducer, + cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { + _, producer := setup.SetupConsumerProducer(&cfg.Matrix.Kafka) + db, err := storage.NewDatabase(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to key server database") diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 4c138116f..f7a323ca5 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -41,6 +41,8 @@ func NewInternalAPI( ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer + _, producer := setup.SetupConsumerProducer(&cfg.Matrix.Kafka) + var perspectiveServerNames []gomatrixserverlib.ServerName for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives { perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) @@ -52,7 +54,7 @@ func NewInternalAPI( } return internal.NewRoomserverAPI( - cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), base.Caches, keyRing, perspectiveServerNames, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 2a03195c9..888b256fe 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -169,9 +169,8 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro t.Fatalf("failed to make caches: %s", err) } base := &setup.BaseDendrite{ - KafkaProducer: dp, - Caches: cache, - Cfg: cfg, + Caches: cache, + Cfg: cfg, } return NewInternalAPI(base, &test.NopJSONVerifier{}), dp