Start Kafka connection for each component that needs one

This commit is contained in:
Neil Alexander 2020-10-15 12:28:09 +01:00
parent 10f1beb0de
commit 8f54ab7f3b
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
15 changed files with 82 additions and 73 deletions

View file

@ -47,6 +47,8 @@ func NewInternalAPI(
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
) appserviceAPI.AppServiceQueryAPI { ) appserviceAPI.AppServiceQueryAPI {
consumer, _ := setup.SetupConsumerProducer(&base.Cfg.Global.Kafka)
// Create a connection to the appservice postgres DB // Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database)
if err != nil { if err != nil {
@ -86,7 +88,7 @@ func NewInternalAPI(
// We can't add ASes at runtime so this is safe to do. // We can't add ASes at runtime so this is safe to do.
if len(workerStates) > 0 { if len(workerStates) > 0 {
consumer := consumers.NewOutputRoomEventConsumer( consumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, appserviceDB, base.Cfg, consumer, appserviceDB,
rsAPI, workerStates, rsAPI, workerStates,
) )
if err := consumer.Start(); err != nil { if err := consumer.Start(); err != nil {

View file

@ -112,7 +112,7 @@ func (m *DendriteMonolith) Start() {
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI) keyAPI.SetUserAPI(userAPI)
@ -146,13 +146,11 @@ func (m *DendriteMonolith) Start() {
rsAPI.SetFederationSenderAPI(fsAPI) rsAPI.SetFederationSenderAPI(fsAPI)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
Client: ygg.CreateClient(base), Client: ygg.CreateClient(base),
FedClient: federation, FedClient: federation,
KeyRing: keyRing, KeyRing: keyRing,
KafkaConsumer: base.KafkaConsumer,
KafkaProducer: base.KafkaProducer,
AppserviceAPI: asAPI, AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,

View file

@ -22,6 +22,7 @@ import (
func main() { func main() {
cfg := setup.ParseFlags(false) cfg := setup.ParseFlags(false)
_, producer := setup.SetupConsumerProducer(&cfg.Global.Kafka)
base := setup.NewBaseDendrite(cfg, "ClientAPI", true) base := setup.NewBaseDendrite(cfg, "ClientAPI", true)
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
@ -37,7 +38,7 @@ func main() {
keyAPI := base.KeyServerHTTPClient() keyAPI := base.KeyServerHTTPClient()
clientapi.AddPublicRoutes( clientapi.AddPublicRoutes(
base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, accountDB, federation, base.PublicClientAPIMux, &base.Cfg.ClientAPI, producer, accountDB, federation,
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil,
) )

View file

@ -139,7 +139,7 @@ func main() {
accountDB := base.Base.CreateAccountsDB() accountDB := base.Base.CreateAccountsDB()
federation := createFederationClient(base) federation := createFederationClient(base)
keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation, base.Base.KafkaProducer) keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI) keyAPI.SetUserAPI(userAPI)
@ -169,13 +169,11 @@ func main() {
} }
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Base.Cfg, Config: base.Base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
Client: createClient(base), Client: createClient(base),
FedClient: federation, FedClient: federation,
KeyRing: keyRing, KeyRing: keyRing,
KafkaConsumer: base.Base.KafkaConsumer,
KafkaProducer: base.Base.KafkaProducer,
AppserviceAPI: asAPI, AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,

View file

@ -96,7 +96,7 @@ func main() {
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI) keyAPI.SetUserAPI(userAPI)
@ -129,13 +129,11 @@ func main() {
rsComponent.SetFederationSenderAPI(fsAPI) rsComponent.SetFederationSenderAPI(fsAPI)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
Client: ygg.CreateClient(base), Client: ygg.CreateClient(base),
FedClient: federation, FedClient: federation,
KeyRing: keyRing, KeyRing: keyRing,
KafkaConsumer: base.KafkaConsumer,
KafkaProducer: base.KafkaProducer,
AppserviceAPI: asAPI, AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,

View file

@ -24,7 +24,7 @@ func main() {
base := setup.NewBaseDendrite(cfg, "KeyServer", true) base := setup.NewBaseDendrite(cfg, "KeyServer", true)
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient(), base.KafkaProducer) intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient())
intAPI.SetUserAPI(base.UserAPIClient()) intAPI.SetUserAPI(base.UserAPIClient())
keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI) keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI)

View file

@ -108,7 +108,7 @@ func main() {
// This is different to rsAPI which can be the http client which doesn't need this dependency // This is different to rsAPI which can be the http client which doesn't need this dependency
rsImpl.SetFederationSenderAPI(fsAPI) rsImpl.SetFederationSenderAPI(fsAPI)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer) keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI) keyAPI.SetUserAPI(userAPI)
@ -127,13 +127,11 @@ func main() {
} }
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
Client: base.CreateClient(), Client: base.CreateClient(),
FedClient: federation, FedClient: federation,
KeyRing: keyRing, KeyRing: keyRing,
KafkaConsumer: base.KafkaConsumer,
KafkaProducer: base.KafkaProducer,
AppserviceAPI: asAPI, AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,

View file

@ -21,6 +21,8 @@ import (
func main() { func main() {
cfg := setup.ParseFlags(false) cfg := setup.ParseFlags(false)
consumer, _ := setup.SetupConsumerProducer(&cfg.Global.Kafka)
base := setup.NewBaseDendrite(cfg, "SyncAPI", true) base := setup.NewBaseDendrite(cfg, "SyncAPI", true)
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
@ -30,7 +32,7 @@ func main() {
rsAPI := base.RoomserverHTTPClient() rsAPI := base.RoomserverHTTPClient()
syncapi.AddPublicRoutes( syncapi.AddPublicRoutes(
base.PublicClientAPIMux, base.KafkaConsumer, userAPI, rsAPI, base.PublicClientAPIMux, consumer, userAPI, rsAPI,
base.KeyServerHTTPClient(), base.KeyServerHTTPClient(),
federation, &cfg.SyncAPI, federation, &cfg.SyncAPI,
) )

View file

@ -190,7 +190,7 @@ func main() {
accountDB := base.CreateAccountsDB() accountDB := base.CreateAccountsDB()
federation := createFederationClient(cfg, node) federation := createFederationClient(cfg, node)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI) keyAPI.SetUserAPI(userAPI)
@ -212,13 +212,11 @@ func main() {
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,
Client: createClient(node), Client: createClient(node),
FedClient: federation, FedClient: federation,
KeyRing: &keyRing, KeyRing: &keyRing,
KafkaConsumer: base.KafkaConsumer,
KafkaProducer: base.KafkaProducer,
AppserviceAPI: asQuery, AppserviceAPI: asQuery,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,

View file

@ -41,10 +41,13 @@ func NewInternalAPI(
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
) api.EDUServerInputAPI { ) api.EDUServerInputAPI {
cfg := &base.Cfg.EDUServer cfg := &base.Cfg.EDUServer
_, producer := setup.SetupConsumerProducer(&cfg.Matrix.Kafka)
return &input.EDUServerInputAPI{ return &input.EDUServerInputAPI{
Cache: eduCache, Cache: eduCache,
UserAPI: userAPI, UserAPI: userAPI,
Producer: base.KafkaProducer, Producer: producer,
OutputTypingEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), OutputTypingEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,

View file

@ -55,6 +55,8 @@ func NewInternalAPI(
FailuresUntilBlacklist: cfg.FederationMaxRetries, FailuresUntilBlacklist: cfg.FederationMaxRetries,
} }
consumer, _ := setup.SetupConsumerProducer(&cfg.Matrix.Kafka)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
federationSenderDB, cfg.Matrix.ServerName, federation, federationSenderDB, cfg.Matrix.ServerName, federation,
rsAPI, stats, rsAPI, stats,
@ -66,7 +68,7 @@ func NewInternalAPI(
) )
rsConsumer := consumers.NewOutputRoomEventConsumer( rsConsumer := consumers.NewOutputRoomEventConsumer(
cfg, base.KafkaConsumer, queues, cfg, consumer, queues,
federationSenderDB, rsAPI, federationSenderDB, rsAPI,
) )
if err = rsConsumer.Start(); err != nil { if err = rsConsumer.Start(); err != nil {
@ -74,13 +76,13 @@ func NewInternalAPI(
} }
tsConsumer := consumers.NewOutputEDUConsumer( tsConsumer := consumers.NewOutputEDUConsumer(
cfg, base.KafkaConsumer, queues, federationSenderDB, cfg, consumer, queues, federationSenderDB,
) )
if err := tsConsumer.Start(); err != nil { if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer") logrus.WithError(err).Panic("failed to start typing server consumer")
} }
keyConsumer := consumers.NewKeyChangeConsumer( keyConsumer := consumers.NewKeyChangeConsumer(
&base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, rsAPI, &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI,
) )
if err := keyConsumer.Start(); err != nil { if err := keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer") logrus.WithError(err).Panic("failed to start key server consumer")

View file

@ -73,8 +73,8 @@ type BaseDendrite struct {
httpClient *http.Client httpClient *http.Client
Cfg *config.Dendrite Cfg *config.Dendrite
Caches *caching.Caches Caches *caching.Caches
KafkaConsumer sarama.Consumer // KafkaConsumer sarama.Consumer
KafkaProducer sarama.SyncProducer // KafkaProducer sarama.SyncProducer
} }
const HTTPServerTimeout = time.Minute * 5 const HTTPServerTimeout = time.Minute * 5
@ -106,14 +106,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
logrus.WithError(err).Panicf("failed to start opentracing") logrus.WithError(err).Panicf("failed to start opentracing")
} }
var kafkaConsumer sarama.Consumer
var kafkaProducer sarama.SyncProducer
if cfg.Global.Kafka.UseNaffka {
kafkaConsumer, kafkaProducer = setupNaffka(cfg)
} else {
kafkaConsumer, kafkaProducer = setupKafka(cfg)
}
cache, err := caching.NewInMemoryLRUCache(true) cache, err := caching.NewInMemoryLRUCache(true)
if err != nil { if err != nil {
logrus.WithError(err).Warnf("Failed to create cache") logrus.WithError(err).Warnf("Failed to create cache")
@ -152,8 +144,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
apiHttpClient: &apiClient, apiHttpClient: &apiClient,
httpClient: &client, httpClient: &client,
KafkaConsumer: kafkaConsumer,
KafkaProducer: kafkaProducer,
} }
} }
@ -335,14 +325,21 @@ func (b *BaseDendrite) SetupAndServeHTTP(
select {} select {}
} }
func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
if cfg.UseNaffka {
return setupNaffka(cfg)
}
return setupKafka(cfg)
}
// setupKafka creates kafka consumer/producer pair from the config. // setupKafka creates kafka consumer/producer pair from the config.
func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
consumer, err := sarama.NewConsumer(cfg.Global.Kafka.Addresses, nil) consumer, err := sarama.NewConsumer(cfg.Addresses, nil)
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to start kafka consumer") logrus.WithError(err).Panic("failed to start kafka consumer")
} }
producer, err := sarama.NewSyncProducer(cfg.Global.Kafka.Addresses, nil) producer, err := sarama.NewSyncProducer(cfg.Addresses, nil)
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to setup kafka producers") logrus.WithError(err).Panic("failed to setup kafka producers")
} }
@ -350,15 +347,24 @@ func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
return consumer, producer return consumer, producer
} }
// In monolith mode with Naffka, we don't have the same constraints about
// consuming the same topic from more than one place like we do with Kafka.
// Therefore, we will only open one Naffka connection in case Naffka is
// running on SQLite.
var naffkaDatabase *naffka.Naffka
// setupNaffka creates kafka consumer/producer pair from the config. // setupNaffka creates kafka consumer/producer pair from the config.
func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString)) if naffkaDatabase != nil {
return naffkaDatabase, naffkaDatabase
}
naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString))
if err != nil { if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka database") logrus.WithError(err).Panic("Failed to setup naffka database")
} }
naff, err := naffka.New(naffkaDB) naffkaDatabase, err = naffka.New(naffkaDB)
if err != nil { if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka") logrus.WithError(err).Panic("Failed to setup naffka")
} }
return naff, naff return naffkaDatabase, naffkaDatabase
} }

View file

@ -15,10 +15,10 @@
package keyserver package keyserver
import ( import (
"github.com/Shopify/sarama"
"github.com/gorilla/mux" "github.com/gorilla/mux"
fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" fedsenderapi "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/inthttp"
@ -36,8 +36,10 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers // NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI( func NewInternalAPI(
cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, producer sarama.SyncProducer, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
) api.KeyInternalAPI { ) api.KeyInternalAPI {
_, producer := setup.SetupConsumerProducer(&cfg.Matrix.Kafka)
db, err := storage.NewDatabase(&cfg.Database) db, err := storage.NewDatabase(&cfg.Database)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to key server database") logrus.WithError(err).Panicf("failed to connect to key server database")

View file

@ -41,6 +41,8 @@ func NewInternalAPI(
) api.RoomserverInternalAPI { ) api.RoomserverInternalAPI {
cfg := &base.Cfg.RoomServer cfg := &base.Cfg.RoomServer
_, producer := setup.SetupConsumerProducer(&cfg.Matrix.Kafka)
var perspectiveServerNames []gomatrixserverlib.ServerName var perspectiveServerNames []gomatrixserverlib.ServerName
for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives { for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives {
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
@ -52,7 +54,7 @@ func NewInternalAPI(
} }
return internal.NewRoomserverAPI( return internal.NewRoomserverAPI(
cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
base.Caches, keyRing, perspectiveServerNames, base.Caches, keyRing, perspectiveServerNames,
) )
} }

View file

@ -169,9 +169,8 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro
t.Fatalf("failed to make caches: %s", err) t.Fatalf("failed to make caches: %s", err)
} }
base := &setup.BaseDendrite{ base := &setup.BaseDendrite{
KafkaProducer: dp, Caches: cache,
Caches: cache, Cfg: cfg,
Cfg: cfg,
} }
return NewInternalAPI(base, &test.NopJSONVerifier{}), dp return NewInternalAPI(base, &test.NopJSONVerifier{}), dp