diff --git a/appservice/appservice.go b/appservice/appservice.go index 912b768c6..cf9a47b74 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -30,6 +30,7 @@ import ( "github.com/matrix-org/dendrite/appservice/workers" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" @@ -47,7 +48,7 @@ func NewInternalAPI( userAPI userapi.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) appserviceAPI.AppServiceQueryAPI { - consumer, _ := setup.SetupConsumerProducer(&base.Cfg.Global.Kafka) + consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka) // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) diff --git a/build/docker/docker-compose.deps.yml b/build/docker/docker-compose.deps.yml index afc572d0c..74e478a8d 100644 --- a/build/docker/docker-compose.deps.yml +++ b/build/docker/docker-compose.deps.yml @@ -29,6 +29,8 @@ services: KAFKA_ADVERTISED_HOST_NAME: "kafka" KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + ports: + - 9092:9092 depends_on: - zookeeper networks: diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 2ab92ed4e..ebe55aec9 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -15,7 +15,6 @@ package clientapi import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/api" @@ -24,6 +23,7 @@ import ( eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/internal/transactions" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -36,7 +36,6 @@ import ( func AddPublicRoutes( router *mux.Router, cfg *config.ClientAPI, - producer sarama.SyncProducer, accountsDB accounts.Database, federation *gomatrixserverlib.FederationClient, rsAPI roomserverAPI.RoomserverInternalAPI, @@ -48,6 +47,8 @@ func AddPublicRoutes( keyAPI keyserverAPI.KeyInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, ) { + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + syncProducer := &producers.SyncAPIProducer{ Producer: producer, Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go index b1deb5b77..0061de74f 100644 --- a/cmd/dendrite-client-api-server/main.go +++ b/cmd/dendrite-client-api-server/main.go @@ -22,7 +22,6 @@ import ( func main() { cfg := setup.ParseFlags(false) - _, producer := setup.SetupConsumerProducer(&cfg.Global.Kafka) base := setup.NewBaseDendrite(cfg, "ClientAPI", true) defer base.Close() // nolint: errcheck @@ -38,7 +37,7 @@ func main() { keyAPI := base.KeyServerHTTPClient() clientapi.AddPublicRoutes( - base.PublicClientAPIMux, &base.Cfg.ClientAPI, producer, accountDB, federation, + base.PublicClientAPIMux, &base.Cfg.ClientAPI, accountDB, federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil, ) diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go index 3ebeb3d02..351dbc5f4 100644 --- a/cmd/dendrite-sync-api-server/main.go +++ b/cmd/dendrite-sync-api-server/main.go @@ -21,7 +21,6 @@ import ( func main() { cfg := setup.ParseFlags(false) - consumer, _ := setup.SetupConsumerProducer(&cfg.Global.Kafka) base := setup.NewBaseDendrite(cfg, "SyncAPI", true) defer base.Close() // nolint: errcheck @@ -32,7 +31,7 @@ func main() { rsAPI := base.RoomserverHTTPClient() syncapi.AddPublicRoutes( - base.PublicClientAPIMux, consumer, userAPI, rsAPI, + base.PublicClientAPIMux, userAPI, rsAPI, base.KeyServerHTTPClient(), federation, &cfg.SyncAPI, ) diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 5661a0b8c..098ac0248 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/inthttp" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" userapi "github.com/matrix-org/dendrite/userapi/api" ) @@ -42,7 +43,7 @@ func NewInternalAPI( ) api.EDUServerInputAPI { cfg := &base.Cfg.EDUServer - _, producer := setup.SetupConsumerProducer(&cfg.Matrix.Kafka) + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) return &input.EDUServerInputAPI{ Cache: eduCache, diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 2fbbf5deb..78791140e 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -55,7 +56,7 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } - consumer, _ := setup.SetupConsumerProducer(&cfg.Matrix.Kafka) + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) queues := queue.NewOutgoingQueues( federationSenderDB, cfg.Matrix.ServerName, federation, diff --git a/internal/setup/base.go b/internal/setup/base.go index fba387fae..8bc4ae17a 100644 --- a/internal/setup/base.go +++ b/internal/setup/base.go @@ -26,13 +26,9 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/matrix-org/naffka" - naffkaStorage "github.com/matrix-org/naffka/storage" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/userapi/storage/accounts" - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" @@ -324,47 +320,3 @@ func (b *BaseDendrite) SetupAndServeHTTP( select {} } - -func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - if cfg.UseNaffka { - return setupNaffka(cfg) - } - return setupKafka(cfg) -} - -// setupKafka creates kafka consumer/producer pair from the config. -func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - consumer, err := sarama.NewConsumer(cfg.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to start kafka consumer") - } - - producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to setup kafka producers") - } - - return consumer, producer -} - -// In monolith mode with Naffka, we don't have the same constraints about -// consuming the same topic from more than one place like we do with Kafka. -// Therefore, we will only open one Naffka connection in case Naffka is -// running on SQLite. -var naffkaInstance *naffka.Naffka - -// setupNaffka creates kafka consumer/producer pair from the config. -func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - if naffkaInstance != nil { - return naffkaInstance, naffkaInstance - } - naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - naffkaInstance, err = naffka.New(naffkaDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka") - } - return naffkaInstance, naffkaInstance -} diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go new file mode 100644 index 000000000..9855ae156 --- /dev/null +++ b/internal/setup/kafka/kafka.go @@ -0,0 +1,53 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/naffka" + naffkaStorage "github.com/matrix-org/naffka/storage" + "github.com/sirupsen/logrus" +) + +func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if cfg.UseNaffka { + return setupNaffka(cfg) + } + return setupKafka(cfg) +} + +// setupKafka creates kafka consumer/producer pair from the config. +func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + consumer, err := sarama.NewConsumer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to start kafka consumer") + } + + producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to setup kafka producers") + } + + return consumer, producer +} + +// In monolith mode with Naffka, we don't have the same constraints about +// consuming the same topic from more than one place like we do with Kafka. +// Therefore, we will only open one Naffka connection in case Naffka is +// running on SQLite. +var naffkaInstance *naffka.Naffka + +// setupNaffka creates kafka consumer/producer pair from the config. +func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if naffkaInstance != nil { + return naffkaInstance, naffkaInstance + } + naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + naffkaInstance, err = naffka.New(naffkaDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka") + } + return naffkaInstance, naffkaInstance +} diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index a0675d61f..9d3625d2f 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -15,7 +15,6 @@ package setup import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi" @@ -38,13 +37,11 @@ import ( // Monolith represents an instantiation of all dependencies required to build // all components of Dendrite, for use in monolith mode. type Monolith struct { - Config *config.Dendrite - AccountDB accounts.Database - KeyRing *gomatrixserverlib.KeyRing - Client *gomatrixserverlib.Client - FedClient *gomatrixserverlib.FederationClient - KafkaConsumer sarama.Consumer - KafkaProducer sarama.SyncProducer + Config *config.Dendrite + AccountDB accounts.Database + KeyRing *gomatrixserverlib.KeyRing + Client *gomatrixserverlib.Client + FedClient *gomatrixserverlib.FederationClient AppserviceAPI appserviceAPI.AppServiceQueryAPI EDUInternalAPI eduServerAPI.EDUServerInputAPI @@ -61,7 +58,7 @@ type Monolith struct { // AddAllPublicRoutes attaches all public paths to the given router func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) { clientapi.AddPublicRoutes( - csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB, + csMux, &m.Config.ClientAPI, m.AccountDB, m.FedClient, m.RoomserverAPI, m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider, @@ -73,7 +70,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router ) mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client) syncapi.AddPublicRoutes( - csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, + csMux, m.UserAPI, m.RoomserverAPI, m.KeyAPI, m.FedClient, &m.Config.SyncAPI, ) } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 3f9fcf5d6..6c54d2a08 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -18,7 +18,7 @@ import ( "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" @@ -38,7 +38,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { func NewInternalAPI( cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - _, producer := setup.SetupConsumerProducer(&cfg.Matrix.Kafka) + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) db, err := storage.NewDatabase(&cfg.Database) if err != nil { diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index f7a323ca5..b2cc0728c 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/roomserver/internal" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/sirupsen/logrus" @@ -41,7 +42,7 @@ func NewInternalAPI( ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer - _, producer := setup.SetupConsumerProducer(&cfg.Matrix.Kafka) + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) var perspectiveServerNames []gomatrixserverlib.ServerName for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives { diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 43e2455b6..de0bb434b 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -17,11 +17,11 @@ package syncapi import ( "context" - "github.com/Shopify/sarama" "github.com/gorilla/mux" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup/kafka" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -37,13 +37,14 @@ import ( // component. func AddPublicRoutes( router *mux.Router, - consumer sarama.Consumer, userAPI userapi.UserInternalAPI, rsAPI api.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db")