diff --git a/appservice/appservice.go b/appservice/appservice.go index e356f68ee..cf9a47b74 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -30,6 +30,7 @@ import ( "github.com/matrix-org/dendrite/appservice/workers" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" @@ -47,6 +48,8 @@ func NewInternalAPI( userAPI userapi.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) appserviceAPI.AppServiceQueryAPI { + consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka) + // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) if err != nil { @@ -86,7 +89,7 @@ func NewInternalAPI( // We can't add ASes at runtime so this is safe to do. if len(workerStates) > 0 { consumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, appserviceDB, + base.Cfg, consumer, appserviceDB, rsAPI, workerStates, ) if err := consumer.Start(); err != nil { diff --git a/build/docker/docker-compose.deps.yml b/build/docker/docker-compose.deps.yml index afc572d0c..74e478a8d 100644 --- a/build/docker/docker-compose.deps.yml +++ b/build/docker/docker-compose.deps.yml @@ -29,6 +29,8 @@ services: KAFKA_ADVERTISED_HOST_NAME: "kafka" KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + ports: + - 9092:9092 depends_on: - zookeeper networks: diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index 7d10b87e4..fd010809c 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -112,7 +112,7 @@ func (m *DendriteMonolith) Start() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -146,13 +146,11 @@ func (m *DendriteMonolith) Start() { rsAPI.SetFederationSenderAPI(fsAPI) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: ygg.CreateClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: ygg.CreateClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 2ab92ed4e..ebe55aec9 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -15,7 +15,6 @@ package clientapi import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/api" @@ -24,6 +23,7 @@ import ( eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/internal/transactions" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -36,7 +36,6 @@ import ( func AddPublicRoutes( router *mux.Router, cfg *config.ClientAPI, - producer sarama.SyncProducer, accountsDB accounts.Database, federation *gomatrixserverlib.FederationClient, rsAPI roomserverAPI.RoomserverInternalAPI, @@ -48,6 +47,8 @@ func AddPublicRoutes( keyAPI keyserverAPI.KeyInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, ) { + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + syncProducer := &producers.SyncAPIProducer{ Producer: producer, Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go index 0fdc6679f..0061de74f 100644 --- a/cmd/dendrite-client-api-server/main.go +++ b/cmd/dendrite-client-api-server/main.go @@ -37,7 +37,7 @@ func main() { keyAPI := base.KeyServerHTTPClient() clientapi.AddPublicRoutes( - base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, accountDB, federation, + base.PublicClientAPIMux, &base.Cfg.ClientAPI, accountDB, federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil, ) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index b5386325c..61fdd801a 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -139,7 +139,7 @@ func main() { accountDB := base.Base.CreateAccountsDB() federation := createFederationClient(base) - keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation, base.Base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -169,13 +169,11 @@ func main() { } monolith := setup.Monolith{ - Config: base.Base.Cfg, - AccountDB: accountDB, - Client: createClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.Base.KafkaConsumer, - KafkaProducer: base.Base.KafkaProducer, + Config: base.Base.Cfg, + AccountDB: accountDB, + Client: createClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 5e8b92318..a40973638 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -96,7 +96,7 @@ func main() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -129,13 +129,11 @@ func main() { rsComponent.SetFederationSenderAPI(fsAPI) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: ygg.CreateClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: ygg.CreateClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-key-server/main.go b/cmd/dendrite-key-server/main.go index 92d18ac38..ff5b22236 100644 --- a/cmd/dendrite-key-server/main.go +++ b/cmd/dendrite-key-server/main.go @@ -24,7 +24,7 @@ func main() { base := setup.NewBaseDendrite(cfg, "KeyServer", true) defer base.Close() // nolint: errcheck - intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient(), base.KafkaProducer) + intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient()) intAPI.SetUserAPI(base.UserAPIClient()) keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 0fe70ca8c..e935805f6 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -108,7 +108,7 @@ func main() { // This is different to rsAPI which can be the http client which doesn't need this dependency rsImpl.SetFederationSenderAPI(fsAPI) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -127,13 +127,11 @@ func main() { } monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: base.CreateClient(), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: base.CreateClient(), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go index b879f842f..351dbc5f4 100644 --- a/cmd/dendrite-sync-api-server/main.go +++ b/cmd/dendrite-sync-api-server/main.go @@ -21,6 +21,7 @@ import ( func main() { cfg := setup.ParseFlags(false) + base := setup.NewBaseDendrite(cfg, "SyncAPI", true) defer base.Close() // nolint: errcheck @@ -30,7 +31,7 @@ func main() { rsAPI := base.RoomserverHTTPClient() syncapi.AddPublicRoutes( - base.PublicClientAPIMux, base.KafkaConsumer, userAPI, rsAPI, + base.PublicClientAPIMux, userAPI, rsAPI, base.KeyServerHTTPClient(), federation, &cfg.SyncAPI, ) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 2d7f8b02b..85cc8a9fb 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -190,7 +190,7 @@ func main() { accountDB := base.CreateAccountsDB() federation := createFederationClient(cfg, node) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -212,13 +212,11 @@ func main() { p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: createClient(node), - FedClient: federation, - KeyRing: &keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: createClient(node), + FedClient: federation, + KeyRing: &keyRing, AppserviceAPI: asQuery, EDUInternalAPI: eduInputAPI, diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 09cbe02b5..d5ab36818 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/inthttp" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" userapi "github.com/matrix-org/dendrite/userapi/api" ) @@ -41,10 +42,13 @@ func NewInternalAPI( userAPI userapi.UserInternalAPI, ) api.EDUServerInputAPI { cfg := &base.Cfg.EDUServer + + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + return &input.EDUServerInputAPI{ Cache: eduCache, UserAPI: userAPI, - Producer: base.KafkaProducer, + Producer: producer, OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index fa2a7bbb6..783fdc3b8 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -289,6 +289,15 @@ func (t *txnReq) processEDUs(ctx context.Context) { util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal typing event") continue } + _, domain, err := gomatrixserverlib.SplitID('@', typingPayload.UserID) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("Failed to split domain from typing event sender") + continue + } + if domain != t.Origin { + util.GetLogger(ctx).Warnf("Dropping typing event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin) + continue + } if err := eduserverAPI.SendTyping(ctx, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to edu server") } @@ -467,6 +476,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver return gomatrixserverlib.Allowed(e, &authUsingState) } +// nolint:gocyclo func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error { // Do this with a fresh context, so that we keep working even if the // original request times out. With any luck, by the time the remote @@ -504,35 +514,70 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser backwardsExtremity := &newEvents[0] newEvents = newEvents[1:] + type respState struct { + // A snapshot is considered trustworthy if it came from our own roomserver. + // That's because the state will have been through state resolution once + // already in QueryStateAfterEvent. + trustworthy bool + *gomatrixserverlib.RespState + } + // at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity. // Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query // the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event. - var states []*gomatrixserverlib.RespState + var states []*respState for _, prevEventID := range backwardsExtremity.PrevEventIDs() { // Look up what the state is after the backward extremity. This will either // come from the roomserver, if we know all the required events, or it will // come from a remote server via /state_ids if not. - var prevState *gomatrixserverlib.RespState - prevState, err = t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID) - if err != nil { - util.GetLogger(ctx).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID) - return err + prevState, trustworthy, lerr := t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID) + if lerr != nil { + util.GetLogger(ctx).WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID) + return lerr } // Append the state onto the collected state. We'll run this through the // state resolution next. - states = append(states, prevState) + states = append(states, &respState{trustworthy, prevState}) } // Now that we have collected all of the state from the prev_events, we'll // run the state through the appropriate state resolution algorithm for the - // room. This does a couple of things: + // room if needed. This does a couple of things: // 1. Ensures that the state is deduplicated fully for each state-key tuple // 2. Ensures that we pick the latest events from both sets, in the case that // one of the prev_events is quite a bit older than the others - resolvedState, err := t.resolveStatesAndCheck(gmectx, roomVersion, states, backwardsExtremity) - if err != nil { - util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID()) - return err + resolvedState := &gomatrixserverlib.RespState{} + switch len(states) { + case 0: + extremityIsCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("") + if !extremityIsCreate { + // There are no previous states and this isn't the beginning of the + // room - this is an error condition! + util.GetLogger(ctx).Errorf("Failed to lookup any state after prev_events") + return fmt.Errorf("expected %d states but got %d", len(backwardsExtremity.PrevEventIDs()), len(states)) + } + case 1: + // There's only one previous state - if it's trustworthy (came from a + // local state snapshot which will already have been through state res), + // use it as-is. There's no point in resolving it again. + if states[0].trustworthy { + resolvedState = states[0].RespState + break + } + // Otherwise, if it isn't trustworthy (came from federation), run it through + // state resolution anyway for safety, in case there are duplicates. + fallthrough + default: + respStates := make([]*gomatrixserverlib.RespState, len(states)) + for i := range states { + respStates[i] = states[i].RespState + } + // There's more than one previous state - run them all through state res + resolvedState, err = t.resolveStatesAndCheck(gmectx, roomVersion, respStates, backwardsExtremity) + if err != nil { + util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID()) + return err + } } // First of all, send the backward extremity into the roomserver with the @@ -572,16 +617,16 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) // added into the mix. -func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*gomatrixserverlib.RespState, error) { +func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*gomatrixserverlib.RespState, bool, error) { // try doing all this locally before we resort to querying federation respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID) if respState != nil { - return respState, nil + return respState, true, nil } respState, err := t.lookupStateBeforeEvent(ctx, roomVersion, roomID, eventID) if err != nil { - return nil, fmt.Errorf("t.lookupStateBeforeEvent: %w", err) + return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err) } servers := t.getServers(ctx, roomID) @@ -593,11 +638,11 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers) switch err.(type) { case verifySigError: - return respState, nil + return respState, false, nil case nil: // do nothing default: - return nil, fmt.Errorf("t.lookupEvent: %w", err) + return nil, false, fmt.Errorf("t.lookupEvent: %w", err) } t.haveEvents[h.EventID()] = h if h.StateKey() != nil { @@ -615,7 +660,7 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix } } - return respState, nil + return respState, false, nil } func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState { diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 2f1223284..78791140e 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -55,6 +56,8 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + queues := queue.NewOutgoingQueues( federationSenderDB, cfg.Matrix.ServerName, federation, rsAPI, stats, @@ -66,7 +69,7 @@ func NewInternalAPI( ) rsConsumer := consumers.NewOutputRoomEventConsumer( - cfg, base.KafkaConsumer, queues, + cfg, consumer, queues, federationSenderDB, rsAPI, ) if err = rsConsumer.Start(); err != nil { @@ -74,13 +77,13 @@ func NewInternalAPI( } tsConsumer := consumers.NewOutputEDUConsumer( - cfg, base.KafkaConsumer, queues, federationSenderDB, + cfg, consumer, queues, federationSenderDB, ) if err := tsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start typing server consumer") } keyConsumer := consumers.NewKeyChangeConsumer( - &base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, rsAPI, + &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI, ) if err := keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") diff --git a/internal/setup/base.go b/internal/setup/base.go index 24a0d6aa6..8bc4ae17a 100644 --- a/internal/setup/base.go +++ b/internal/setup/base.go @@ -26,13 +26,9 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/matrix-org/naffka" - naffkaStorage "github.com/matrix-org/naffka/storage" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/userapi/storage/accounts" - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" @@ -73,8 +69,8 @@ type BaseDendrite struct { httpClient *http.Client Cfg *config.Dendrite Caches *caching.Caches - KafkaConsumer sarama.Consumer - KafkaProducer sarama.SyncProducer + // KafkaConsumer sarama.Consumer + // KafkaProducer sarama.SyncProducer } const HTTPServerTimeout = time.Minute * 5 @@ -106,14 +102,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo logrus.WithError(err).Panicf("failed to start opentracing") } - var kafkaConsumer sarama.Consumer - var kafkaProducer sarama.SyncProducer - if cfg.Global.Kafka.UseNaffka { - kafkaConsumer, kafkaProducer = setupNaffka(cfg) - } else { - kafkaConsumer, kafkaProducer = setupKafka(cfg) - } - cache, err := caching.NewInMemoryLRUCache(true) if err != nil { logrus.WithError(err).Warnf("Failed to create cache") @@ -152,8 +140,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), apiHttpClient: &apiClient, httpClient: &client, - KafkaConsumer: kafkaConsumer, - KafkaProducer: kafkaProducer, } } @@ -334,31 +320,3 @@ func (b *BaseDendrite) SetupAndServeHTTP( select {} } - -// setupKafka creates kafka consumer/producer pair from the config. -func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - consumer, err := sarama.NewConsumer(cfg.Global.Kafka.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to start kafka consumer") - } - - producer, err := sarama.NewSyncProducer(cfg.Global.Kafka.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to setup kafka producers") - } - - return consumer, producer -} - -// setupNaffka creates kafka consumer/producer pair from the config. -func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString)) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - naff, err := naffka.New(naffkaDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka") - } - return naff, naff -} diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go new file mode 100644 index 000000000..9855ae156 --- /dev/null +++ b/internal/setup/kafka/kafka.go @@ -0,0 +1,53 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/naffka" + naffkaStorage "github.com/matrix-org/naffka/storage" + "github.com/sirupsen/logrus" +) + +func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if cfg.UseNaffka { + return setupNaffka(cfg) + } + return setupKafka(cfg) +} + +// setupKafka creates kafka consumer/producer pair from the config. +func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + consumer, err := sarama.NewConsumer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to start kafka consumer") + } + + producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to setup kafka producers") + } + + return consumer, producer +} + +// In monolith mode with Naffka, we don't have the same constraints about +// consuming the same topic from more than one place like we do with Kafka. +// Therefore, we will only open one Naffka connection in case Naffka is +// running on SQLite. +var naffkaInstance *naffka.Naffka + +// setupNaffka creates kafka consumer/producer pair from the config. +func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if naffkaInstance != nil { + return naffkaInstance, naffkaInstance + } + naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + naffkaInstance, err = naffka.New(naffkaDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka") + } + return naffkaInstance, naffkaInstance +} diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index a0675d61f..9d3625d2f 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -15,7 +15,6 @@ package setup import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi" @@ -38,13 +37,11 @@ import ( // Monolith represents an instantiation of all dependencies required to build // all components of Dendrite, for use in monolith mode. type Monolith struct { - Config *config.Dendrite - AccountDB accounts.Database - KeyRing *gomatrixserverlib.KeyRing - Client *gomatrixserverlib.Client - FedClient *gomatrixserverlib.FederationClient - KafkaConsumer sarama.Consumer - KafkaProducer sarama.SyncProducer + Config *config.Dendrite + AccountDB accounts.Database + KeyRing *gomatrixserverlib.KeyRing + Client *gomatrixserverlib.Client + FedClient *gomatrixserverlib.FederationClient AppserviceAPI appserviceAPI.AppServiceQueryAPI EDUInternalAPI eduServerAPI.EDUServerInputAPI @@ -61,7 +58,7 @@ type Monolith struct { // AddAllPublicRoutes attaches all public paths to the given router func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) { clientapi.AddPublicRoutes( - csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB, + csMux, &m.Config.ClientAPI, m.AccountDB, m.FedClient, m.RoomserverAPI, m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider, @@ -73,7 +70,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router ) mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client) syncapi.AddPublicRoutes( - csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, + csMux, m.UserAPI, m.RoomserverAPI, m.KeyAPI, m.FedClient, &m.Config.SyncAPI, ) } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 78420db1f..6c54d2a08 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -15,10 +15,10 @@ package keyserver import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" @@ -36,8 +36,10 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, producer sarama.SyncProducer, + cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + db, err := storage.NewDatabase(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to key server database") diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 4c138116f..b2cc0728c 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/roomserver/internal" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/sirupsen/logrus" @@ -41,6 +42,8 @@ func NewInternalAPI( ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + var perspectiveServerNames []gomatrixserverlib.ServerName for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives { perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) @@ -52,7 +55,7 @@ func NewInternalAPI( } return internal.NewRoomserverAPI( - cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), base.Caches, keyRing, perspectiveServerNames, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 2a03195c9..1b692a098 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -17,7 +17,10 @@ import ( "github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/internal/test" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/internal" + "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" ) const ( @@ -160,7 +163,9 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro cfg.Defaults() cfg.Global.ServerName = testOrigin cfg.Global.Kafka.UseNaffka = true - cfg.RoomServer.Database.ConnectionString = config.DataSource(roomserverDBFileURI) + cfg.RoomServer.Database = config.DatabaseOptions{ + ConnectionString: roomserverDBFileURI, + } dp := &dummyProducer{ topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), } @@ -169,12 +174,17 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro t.Fatalf("failed to make caches: %s", err) } base := &setup.BaseDendrite{ - KafkaProducer: dp, - Caches: cache, - Cfg: cfg, + Caches: cache, + Cfg: cfg, } - - return NewInternalAPI(base, &test.NopJSONVerifier{}), dp + roomserverDB, err := storage.Open(&cfg.RoomServer.Database, base.Caches) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to room server db") + } + return internal.NewRoomserverAPI( + &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent)), + base.Caches, &test.NopJSONVerifier{}, nil, + ), dp } func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) (api.RoomserverInternalAPI, *dummyProducer, []gomatrixserverlib.HeaderedEvent) { diff --git a/roomserver/storage/postgres/redactions_table.go b/roomserver/storage/postgres/redactions_table.go index 289e1320f..42aba5985 100644 --- a/roomserver/storage/postgres/redactions_table.go +++ b/roomserver/storage/postgres/redactions_table.go @@ -39,7 +39,8 @@ CREATE INDEX IF NOT EXISTS roomserver_redactions_redacts_event_id ON roomserver_ const insertRedactionSQL = "" + "INSERT INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" + - " VALUES ($1, $2, $3)" + " VALUES ($1, $2, $3)" + + " ON CONFLICT DO NOTHING" const selectRedactionInfoByRedactionEventIDSQL = "" + "SELECT redaction_event_id, redacts_event_id, validated FROM roomserver_redactions" + diff --git a/roomserver/storage/shared/latest_events_updater.go b/roomserver/storage/shared/latest_events_updater.go index 29eab0c98..b316f639d 100644 --- a/roomserver/storage/shared/latest_events_updater.go +++ b/roomserver/storage/shared/latest_events_updater.go @@ -18,23 +18,30 @@ type LatestEventsUpdater struct { currentStateSnapshotNID types.StateSnapshotNID } +func rollback(txn *sql.Tx) { + if txn == nil { + return + } + txn.Rollback() // nolint: errcheck +} + func NewLatestEventsUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomInfo types.RoomInfo) (*LatestEventsUpdater, error) { eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err := d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomInfo.RoomNID) if err != nil { - txn.Rollback() // nolint: errcheck + rollback(txn) return nil, err } stateAndRefs, err := d.EventsTable.BulkSelectStateAtEventAndReference(ctx, txn, eventNIDs) if err != nil { - txn.Rollback() // nolint: errcheck + rollback(txn) return nil, err } var lastEventIDSent string if lastEventNIDSent != 0 { lastEventIDSent, err = d.EventsTable.SelectEventID(ctx, txn, lastEventNIDSent) if err != nil { - txn.Rollback() // nolint: errcheck + rollback(txn) return nil, err } } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index e96eab71b..f2be8b3cf 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -458,7 +458,7 @@ func (d *Database) StoreEvent( eventNID, stateNID, err = d.EventsTable.SelectEvent(ctx, txn, event.EventID()) } if err != nil { - return err + return fmt.Errorf("d.EventsTable.SelectEvent: %w", err) } } @@ -467,6 +467,9 @@ func (d *Database) StoreEvent( } if !isRejected { // ignore rejected redaction events redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event) + if err != nil { + return fmt.Errorf("d.handleRedactions: %w", err) + } } return nil }) @@ -627,6 +630,7 @@ func extractRoomVersionFromCreateEvent(event gomatrixserverlib.Event) ( // to cross-reference with other tables when loading. // // Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction. +// nolint:gocyclo func (d *Database) handleRedactions( ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event gomatrixserverlib.Event, ) (*gomatrixserverlib.Event, string, error) { @@ -644,13 +648,13 @@ func (d *Database) handleRedactions( RedactsEventID: event.Redacts(), }) if err != nil { - return nil, "", err + return nil, "", fmt.Errorf("d.RedactionsTable.InsertRedaction: %w", err) } } redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, eventNID, event) if err != nil { - return nil, "", err + return nil, "", fmt.Errorf("d.loadRedactionPair: %w", err) } if validated || redactedEvent == nil || redactionEvent == nil { // we've seen this redaction before or there is nothing to redact @@ -664,7 +668,7 @@ func (d *Database) handleRedactions( // mark the event as redacted err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent) if err != nil { - return nil, "", err + return nil, "", fmt.Errorf("redactedEvent.SetUnsignedField: %w", err) } if redactionsArePermanent { redactedEvent.Event = redactedEvent.Redact() @@ -672,10 +676,15 @@ func (d *Database) handleRedactions( // overwrite the eventJSON table err = d.EventJSONTable.InsertEventJSON(ctx, txn, redactedEvent.EventNID, redactedEvent.JSON()) if err != nil { - return nil, "", err + return nil, "", fmt.Errorf("d.EventJSONTable.InsertEventJSON: %w", err) } - return &redactionEvent.Event, redactedEvent.EventID(), d.RedactionsTable.MarkRedactionValidated(ctx, txn, redactionEvent.EventID(), true) + err = d.RedactionsTable.MarkRedactionValidated(ctx, txn, redactionEvent.EventID(), true) + if err != nil { + err = fmt.Errorf("d.RedactionsTable.MarkRedactionValidated: %w", err) + } + + return &redactionEvent.Event, redactedEvent.EventID(), err } // loadRedactionPair returns both the redaction event and the redacted event, else nil. diff --git a/roomserver/storage/sqlite3/redactions_table.go b/roomserver/storage/sqlite3/redactions_table.go index a2179357c..e64714862 100644 --- a/roomserver/storage/sqlite3/redactions_table.go +++ b/roomserver/storage/sqlite3/redactions_table.go @@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS roomserver_redactions ( ` const insertRedactionSQL = "" + - "INSERT INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" + + "INSERT OR IGNORE INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" + " VALUES ($1, $2, $3)" const selectRedactionInfoByRedactionEventIDSQL = "" + diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 6c9d0ebd4..393a7aa55 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -17,11 +17,11 @@ package syncapi import ( "context" - "github.com/Shopify/sarama" "github.com/gorilla/mux" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup/kafka" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -37,13 +37,14 @@ import ( // component. func AddPublicRoutes( router *mux.Router, - consumer sarama.Consumer, userAPI userapi.UserInternalAPI, rsAPI api.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") diff --git a/sytest-whitelist b/sytest-whitelist index 805f0e4dd..2ba0a88b2 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -482,3 +482,5 @@ m.room.history_visibility == "joined" allows/forbids appropriately for Real user POST rejects invalid utf-8 in JSON Users cannot kick users who have already left a room A prev_batch token from incremental sync can be used in the v1 messages API +Event with an invalid signature in the send_join response should not cause room join to fail +Inbound federation rejects typing notifications from wrong remote