diff --git a/appservice/appservice.go b/appservice/appservice.go index e356f68ee..cf9a47b74 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -30,6 +30,7 @@ import ( "github.com/matrix-org/dendrite/appservice/workers" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" @@ -47,6 +48,8 @@ func NewInternalAPI( userAPI userapi.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) appserviceAPI.AppServiceQueryAPI { + consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka) + // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) if err != nil { @@ -86,7 +89,7 @@ func NewInternalAPI( // We can't add ASes at runtime so this is safe to do. if len(workerStates) > 0 { consumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, appserviceDB, + base.Cfg, consumer, appserviceDB, rsAPI, workerStates, ) if err := consumer.Start(); err != nil { diff --git a/build/docker/docker-compose.deps.yml b/build/docker/docker-compose.deps.yml index afc572d0c..74e478a8d 100644 --- a/build/docker/docker-compose.deps.yml +++ b/build/docker/docker-compose.deps.yml @@ -29,6 +29,8 @@ services: KAFKA_ADVERTISED_HOST_NAME: "kafka" KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + ports: + - 9092:9092 depends_on: - zookeeper networks: diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index 7d10b87e4..fd010809c 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -112,7 +112,7 @@ func (m *DendriteMonolith) Start() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -146,13 +146,11 @@ func (m *DendriteMonolith) Start() { rsAPI.SetFederationSenderAPI(fsAPI) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: ygg.CreateClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: ygg.CreateClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 2ab92ed4e..ebe55aec9 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -15,7 +15,6 @@ package clientapi import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/api" @@ -24,6 +23,7 @@ import ( eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/internal/transactions" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -36,7 +36,6 @@ import ( func AddPublicRoutes( router *mux.Router, cfg *config.ClientAPI, - producer sarama.SyncProducer, accountsDB accounts.Database, federation *gomatrixserverlib.FederationClient, rsAPI roomserverAPI.RoomserverInternalAPI, @@ -48,6 +47,8 @@ func AddPublicRoutes( keyAPI keyserverAPI.KeyInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, ) { + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + syncProducer := &producers.SyncAPIProducer{ Producer: producer, Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index af43064fe..9655339cd 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -339,12 +339,21 @@ func createRoom( util.GetLogger(req.Context()).WithError(err).Error("authEvents.AddEvent failed") return jsonerror.InternalServerError() } - } - // send events to the room server - if err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") - return jsonerror.InternalServerError() + accumulated := gomatrixserverlib.UnwrapEventHeaders(builtEvents) + if err = roomserverAPI.SendEventWithState( + req.Context(), + rsAPI, + &gomatrixserverlib.RespState{ + StateEvents: accumulated, + AuthEvents: accumulated, + }, + ev.Headered(roomVersion), + nil, + ); err != nil { + util.GetLogger(req.Context()).WithError(err).Error("SendEventWithState failed") + return jsonerror.InternalServerError() + } } // TODO(#269): Reserve room alias while we create the room. This stops us diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go index 0fdc6679f..0061de74f 100644 --- a/cmd/dendrite-client-api-server/main.go +++ b/cmd/dendrite-client-api-server/main.go @@ -37,7 +37,7 @@ func main() { keyAPI := base.KeyServerHTTPClient() clientapi.AddPublicRoutes( - base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, accountDB, federation, + base.PublicClientAPIMux, &base.Cfg.ClientAPI, accountDB, federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil, ) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index b5386325c..61fdd801a 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -139,7 +139,7 @@ func main() { accountDB := base.Base.CreateAccountsDB() federation := createFederationClient(base) - keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation, base.Base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -169,13 +169,11 @@ func main() { } monolith := setup.Monolith{ - Config: base.Base.Cfg, - AccountDB: accountDB, - Client: createClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.Base.KafkaConsumer, - KafkaProducer: base.Base.KafkaProducer, + Config: base.Base.Cfg, + AccountDB: accountDB, + Client: createClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 5e8b92318..a40973638 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -96,7 +96,7 @@ func main() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -129,13 +129,11 @@ func main() { rsComponent.SetFederationSenderAPI(fsAPI) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: ygg.CreateClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: ygg.CreateClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-key-server/main.go b/cmd/dendrite-key-server/main.go index 92d18ac38..ff5b22236 100644 --- a/cmd/dendrite-key-server/main.go +++ b/cmd/dendrite-key-server/main.go @@ -24,7 +24,7 @@ func main() { base := setup.NewBaseDendrite(cfg, "KeyServer", true) defer base.Close() // nolint: errcheck - intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient(), base.KafkaProducer) + intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient()) intAPI.SetUserAPI(base.UserAPIClient()) keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 0fe70ca8c..e935805f6 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -108,7 +108,7 @@ func main() { // This is different to rsAPI which can be the http client which doesn't need this dependency rsImpl.SetFederationSenderAPI(fsAPI) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -127,13 +127,11 @@ func main() { } monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: base.CreateClient(), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: base.CreateClient(), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go index b879f842f..351dbc5f4 100644 --- a/cmd/dendrite-sync-api-server/main.go +++ b/cmd/dendrite-sync-api-server/main.go @@ -21,6 +21,7 @@ import ( func main() { cfg := setup.ParseFlags(false) + base := setup.NewBaseDendrite(cfg, "SyncAPI", true) defer base.Close() // nolint: errcheck @@ -30,7 +31,7 @@ func main() { rsAPI := base.RoomserverHTTPClient() syncapi.AddPublicRoutes( - base.PublicClientAPIMux, base.KafkaConsumer, userAPI, rsAPI, + base.PublicClientAPIMux, userAPI, rsAPI, base.KeyServerHTTPClient(), federation, &cfg.SyncAPI, ) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 2d7f8b02b..85cc8a9fb 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -190,7 +190,7 @@ func main() { accountDB := base.CreateAccountsDB() federation := createFederationClient(cfg, node) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -212,13 +212,11 @@ func main() { p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: createClient(node), - FedClient: federation, - KeyRing: &keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: createClient(node), + FedClient: federation, + KeyRing: &keyRing, AppserviceAPI: asQuery, EDUInternalAPI: eduInputAPI, diff --git a/cmd/goose/main.go b/cmd/goose/main.go index ef3942d90..83c97a729 100644 --- a/cmd/goose/main.go +++ b/cmd/goose/main.go @@ -8,19 +8,38 @@ import ( "log" "os" - // Example complex Go migration import: - // _ "github.com/matrix-org/dendrite/serverkeyapi/storage/postgres/deltas" + pgaccounts "github.com/matrix-org/dendrite/userapi/storage/accounts/postgres/deltas" + slaccounts "github.com/matrix-org/dendrite/userapi/storage/accounts/sqlite3/deltas" + pgdevices "github.com/matrix-org/dendrite/userapi/storage/devices/postgres/deltas" + sldevices "github.com/matrix-org/dendrite/userapi/storage/devices/sqlite3/deltas" "github.com/pressly/goose" _ "github.com/lib/pq" _ "github.com/mattn/go-sqlite3" ) -var ( - flags = flag.NewFlagSet("goose", flag.ExitOnError) - dir = flags.String("dir", ".", "directory with migration files") +const ( + AppService = "appservice" + FederationSender = "federationsender" + KeyServer = "keyserver" + MediaAPI = "mediaapi" + RoomServer = "roomserver" + SigningKeyServer = "signingkeyserver" + SyncAPI = "syncapi" + UserAPIAccounts = "userapi_accounts" + UserAPIDevices = "userapi_devices" ) +var ( + dir = flags.String("dir", "", "directory with migration files") + flags = flag.NewFlagSet("goose", flag.ExitOnError) + component = flags.String("component", "", "dendrite component name") + knownDBs = []string{ + AppService, FederationSender, KeyServer, MediaAPI, RoomServer, SigningKeyServer, SyncAPI, UserAPIAccounts, UserAPIDevices, + } +) + +// nolint: gocyclo func main() { err := flags.Parse(os.Args[1:]) if err != nil { @@ -37,19 +56,20 @@ Drivers: sqlite3 Examples: - goose -d roomserver/storage/sqlite3/deltas sqlite3 ./roomserver.db status - goose -d roomserver/storage/sqlite3/deltas sqlite3 ./roomserver.db up + goose -component roomserver sqlite3 ./roomserver.db status + goose -component roomserver sqlite3 ./roomserver.db up - goose -d roomserver/storage/postgres/deltas postgres "user=dendrite dbname=dendrite sslmode=disable" status + goose -component roomserver postgres "user=dendrite dbname=dendrite sslmode=disable" status Options: - - -dir string - directory with migration files (default ".") + -component string + Dendrite component name e.g roomserver, signingkeyserver, clientapi, syncapi -table string migrations table name (default "goose_db_version") -h print help -v enable verbose mode + -dir string + directory with migration files, only relevant when creating new migrations. -version print version @@ -74,6 +94,25 @@ Commands: fmt.Println("engine must be one of 'sqlite3' or 'postgres'") return } + + knownComponent := false + for _, c := range knownDBs { + if c == *component { + knownComponent = true + break + } + } + if !knownComponent { + fmt.Printf("component must be one of %v\n", knownDBs) + return + } + + if engine == "sqlite3" { + loadSQLiteDeltas(*component) + } else { + loadPostgresDeltas(*component) + } + dbstring, command := args[1], args[2] db, err := goose.OpenDBWithDriver(engine, dbstring) @@ -92,7 +131,30 @@ Commands: arguments = append(arguments, args[3:]...) } - if err := goose.Run(command, db, *dir, arguments...); err != nil { + // goose demands a directory even though we don't use it for upgrades + d := *dir + if d == "" { + d = os.TempDir() + } + if err := goose.Run(command, db, d, arguments...); err != nil { log.Fatalf("goose %v: %v", command, err) } } + +func loadSQLiteDeltas(component string) { + switch component { + case UserAPIAccounts: + slaccounts.LoadFromGoose() + case UserAPIDevices: + sldevices.LoadFromGoose() + } +} + +func loadPostgresDeltas(component string) { + switch component { + case UserAPIAccounts: + pgaccounts.LoadFromGoose() + case UserAPIDevices: + pgdevices.LoadFromGoose() + } +} diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index b6196c269..098ac0248 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/inthttp" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" userapi "github.com/matrix-org/dendrite/userapi/api" ) @@ -41,10 +42,13 @@ func NewInternalAPI( userAPI userapi.UserInternalAPI, ) api.EDUServerInputAPI { cfg := &base.Cfg.EDUServer + + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + return &input.EDUServerInputAPI{ Cache: eduCache, UserAPI: userAPI, - Producer: base.KafkaProducer, + Producer: producer, OutputTypingEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), ServerName: cfg.Matrix.ServerName, diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 24e29a18d..783fdc3b8 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -289,6 +289,15 @@ func (t *txnReq) processEDUs(ctx context.Context) { util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal typing event") continue } + _, domain, err := gomatrixserverlib.SplitID('@', typingPayload.UserID) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("Failed to split domain from typing event sender") + continue + } + if domain != t.Origin { + util.GetLogger(ctx).Warnf("Dropping typing event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin) + continue + } if err := eduserverAPI.SendTyping(ctx, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to edu server") } @@ -467,6 +476,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver return gomatrixserverlib.Allowed(e, &authUsingState) } +// nolint:gocyclo func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error { // Do this with a fresh context, so that we keep working even if the // original request times out. With any luck, by the time the remote @@ -504,36 +514,70 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser backwardsExtremity := &newEvents[0] newEvents = newEvents[1:] + type respState struct { + // A snapshot is considered trustworthy if it came from our own roomserver. + // That's because the state will have been through state resolution once + // already in QueryStateAfterEvent. + trustworthy bool + *gomatrixserverlib.RespState + } + // at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity. // Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query // the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event. - var states []*gomatrixserverlib.RespState - needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples() + var states []*respState for _, prevEventID := range backwardsExtremity.PrevEventIDs() { // Look up what the state is after the backward extremity. This will either // come from the roomserver, if we know all the required events, or it will // come from a remote server via /state_ids if not. - var prevState *gomatrixserverlib.RespState - prevState, err = t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID, needed) - if err != nil { - util.GetLogger(ctx).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID) - return err + prevState, trustworthy, lerr := t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID) + if lerr != nil { + util.GetLogger(ctx).WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID) + return lerr } // Append the state onto the collected state. We'll run this through the // state resolution next. - states = append(states, prevState) + states = append(states, &respState{trustworthy, prevState}) } // Now that we have collected all of the state from the prev_events, we'll // run the state through the appropriate state resolution algorithm for the - // room. This does a couple of things: + // room if needed. This does a couple of things: // 1. Ensures that the state is deduplicated fully for each state-key tuple // 2. Ensures that we pick the latest events from both sets, in the case that // one of the prev_events is quite a bit older than the others - resolvedState, err := t.resolveStatesAndCheck(gmectx, roomVersion, states, backwardsExtremity) - if err != nil { - util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID()) - return err + resolvedState := &gomatrixserverlib.RespState{} + switch len(states) { + case 0: + extremityIsCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("") + if !extremityIsCreate { + // There are no previous states and this isn't the beginning of the + // room - this is an error condition! + util.GetLogger(ctx).Errorf("Failed to lookup any state after prev_events") + return fmt.Errorf("expected %d states but got %d", len(backwardsExtremity.PrevEventIDs()), len(states)) + } + case 1: + // There's only one previous state - if it's trustworthy (came from a + // local state snapshot which will already have been through state res), + // use it as-is. There's no point in resolving it again. + if states[0].trustworthy { + resolvedState = states[0].RespState + break + } + // Otherwise, if it isn't trustworthy (came from federation), run it through + // state resolution anyway for safety, in case there are duplicates. + fallthrough + default: + respStates := make([]*gomatrixserverlib.RespState, len(states)) + for i := range states { + respStates[i] = states[i].RespState + } + // There's more than one previous state - run them all through state res + resolvedState, err = t.resolveStatesAndCheck(gmectx, roomVersion, respStates, backwardsExtremity) + if err != nil { + util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID()) + return err + } } // First of all, send the backward extremity into the roomserver with the @@ -573,16 +617,16 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) // added into the mix. -func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) (*gomatrixserverlib.RespState, error) { +func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*gomatrixserverlib.RespState, bool, error) { // try doing all this locally before we resort to querying federation - respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID, needed) + respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID) if respState != nil { - return respState, nil + return respState, true, nil } respState, err := t.lookupStateBeforeEvent(ctx, roomVersion, roomID, eventID) if err != nil { - return nil, fmt.Errorf("t.lookupStateBeforeEvent: %w", err) + return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err) } servers := t.getServers(ctx, roomID) @@ -594,11 +638,11 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers) switch err.(type) { case verifySigError: - return respState, nil + return respState, false, nil case nil: // do nothing default: - return nil, fmt.Errorf("t.lookupEvent: %w", err) + return nil, false, fmt.Errorf("t.lookupEvent: %w", err) } t.haveEvents[h.EventID()] = h if h.StateKey() != nil { @@ -616,15 +660,14 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix } } - return respState, nil + return respState, false, nil } -func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) *gomatrixserverlib.RespState { +func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState { var res api.QueryStateAfterEventsResponse err := t.rsAPI.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{ RoomID: roomID, PrevEventIDs: []string{eventID}, - StateToFetch: needed, }, &res) if err != nil || !res.PrevEventsExist { util.GetLogger(ctx).WithError(err).Warnf("failed to query state after %s locally", eventID) diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 2f1223284..78791140e 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -55,6 +56,8 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + queues := queue.NewOutgoingQueues( federationSenderDB, cfg.Matrix.ServerName, federation, rsAPI, stats, @@ -66,7 +69,7 @@ func NewInternalAPI( ) rsConsumer := consumers.NewOutputRoomEventConsumer( - cfg, base.KafkaConsumer, queues, + cfg, consumer, queues, federationSenderDB, rsAPI, ) if err = rsConsumer.Start(); err != nil { @@ -74,13 +77,13 @@ func NewInternalAPI( } tsConsumer := consumers.NewOutputEDUConsumer( - cfg, base.KafkaConsumer, queues, federationSenderDB, + cfg, consumer, queues, federationSenderDB, ) if err := tsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start typing server consumer") } keyConsumer := consumers.NewKeyChangeConsumer( - &base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, rsAPI, + &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI, ) if err := keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") diff --git a/go.mod b/go.mod index c02463832..d3060fa0f 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd - github.com/matrix-org/gomatrixserverlib v0.0.0-20201009153043-8d27a9f0e350 + github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8 github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.2 diff --git a/go.sum b/go.sum index 101b8e18f..377a2e093 100644 --- a/go.sum +++ b/go.sum @@ -569,8 +569,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg= github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20201009153043-8d27a9f0e350 h1:G9K8k5KIzbeBdd0bMk+4itdZU3JGHgV+z0FNUsTEhkE= -github.com/matrix-org/gomatrixserverlib v0.0.0-20201009153043-8d27a9f0e350/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8 h1:GF1PxbvImWDoz1DQZNMoaYtIqQXtyLAtmQOzwwmw1OI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4= github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= diff --git a/internal/setup/base.go b/internal/setup/base.go index 24a0d6aa6..8bc4ae17a 100644 --- a/internal/setup/base.go +++ b/internal/setup/base.go @@ -26,13 +26,9 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/matrix-org/naffka" - naffkaStorage "github.com/matrix-org/naffka/storage" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/userapi/storage/accounts" - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" @@ -73,8 +69,8 @@ type BaseDendrite struct { httpClient *http.Client Cfg *config.Dendrite Caches *caching.Caches - KafkaConsumer sarama.Consumer - KafkaProducer sarama.SyncProducer + // KafkaConsumer sarama.Consumer + // KafkaProducer sarama.SyncProducer } const HTTPServerTimeout = time.Minute * 5 @@ -106,14 +102,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo logrus.WithError(err).Panicf("failed to start opentracing") } - var kafkaConsumer sarama.Consumer - var kafkaProducer sarama.SyncProducer - if cfg.Global.Kafka.UseNaffka { - kafkaConsumer, kafkaProducer = setupNaffka(cfg) - } else { - kafkaConsumer, kafkaProducer = setupKafka(cfg) - } - cache, err := caching.NewInMemoryLRUCache(true) if err != nil { logrus.WithError(err).Warnf("Failed to create cache") @@ -152,8 +140,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), apiHttpClient: &apiClient, httpClient: &client, - KafkaConsumer: kafkaConsumer, - KafkaProducer: kafkaProducer, } } @@ -334,31 +320,3 @@ func (b *BaseDendrite) SetupAndServeHTTP( select {} } - -// setupKafka creates kafka consumer/producer pair from the config. -func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - consumer, err := sarama.NewConsumer(cfg.Global.Kafka.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to start kafka consumer") - } - - producer, err := sarama.NewSyncProducer(cfg.Global.Kafka.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to setup kafka producers") - } - - return consumer, producer -} - -// setupNaffka creates kafka consumer/producer pair from the config. -func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString)) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - naff, err := naffka.New(naffkaDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka") - } - return naff, naff -} diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go new file mode 100644 index 000000000..9855ae156 --- /dev/null +++ b/internal/setup/kafka/kafka.go @@ -0,0 +1,53 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/naffka" + naffkaStorage "github.com/matrix-org/naffka/storage" + "github.com/sirupsen/logrus" +) + +func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if cfg.UseNaffka { + return setupNaffka(cfg) + } + return setupKafka(cfg) +} + +// setupKafka creates kafka consumer/producer pair from the config. +func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + consumer, err := sarama.NewConsumer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to start kafka consumer") + } + + producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to setup kafka producers") + } + + return consumer, producer +} + +// In monolith mode with Naffka, we don't have the same constraints about +// consuming the same topic from more than one place like we do with Kafka. +// Therefore, we will only open one Naffka connection in case Naffka is +// running on SQLite. +var naffkaInstance *naffka.Naffka + +// setupNaffka creates kafka consumer/producer pair from the config. +func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if naffkaInstance != nil { + return naffkaInstance, naffkaInstance + } + naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + naffkaInstance, err = naffka.New(naffkaDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka") + } + return naffkaInstance, naffkaInstance +} diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index a0675d61f..9d3625d2f 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -15,7 +15,6 @@ package setup import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi" @@ -38,13 +37,11 @@ import ( // Monolith represents an instantiation of all dependencies required to build // all components of Dendrite, for use in monolith mode. type Monolith struct { - Config *config.Dendrite - AccountDB accounts.Database - KeyRing *gomatrixserverlib.KeyRing - Client *gomatrixserverlib.Client - FedClient *gomatrixserverlib.FederationClient - KafkaConsumer sarama.Consumer - KafkaProducer sarama.SyncProducer + Config *config.Dendrite + AccountDB accounts.Database + KeyRing *gomatrixserverlib.KeyRing + Client *gomatrixserverlib.Client + FedClient *gomatrixserverlib.FederationClient AppserviceAPI appserviceAPI.AppServiceQueryAPI EDUInternalAPI eduServerAPI.EDUServerInputAPI @@ -61,7 +58,7 @@ type Monolith struct { // AddAllPublicRoutes attaches all public paths to the given router func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) { clientapi.AddPublicRoutes( - csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB, + csMux, &m.Config.ClientAPI, m.AccountDB, m.FedClient, m.RoomserverAPI, m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider, @@ -73,7 +70,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router ) mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client) syncapi.AddPublicRoutes( - csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, + csMux, m.UserAPI, m.RoomserverAPI, m.KeyAPI, m.FedClient, &m.Config.SyncAPI, ) } diff --git a/internal/sqlutil/migrate.go b/internal/sqlutil/migrate.go new file mode 100644 index 000000000..833977ba4 --- /dev/null +++ b/internal/sqlutil/migrate.go @@ -0,0 +1,130 @@ +package sqlutil + +import ( + "database/sql" + "fmt" + "runtime" + "sort" + + "github.com/matrix-org/dendrite/internal/config" + "github.com/pressly/goose" +) + +type Migrations struct { + registeredGoMigrations map[int64]*goose.Migration +} + +func NewMigrations() *Migrations { + return &Migrations{ + registeredGoMigrations: make(map[int64]*goose.Migration), + } +} + +// Copy-pasted from goose directly to store migrations into a map we control + +// AddMigration adds a migration. +func (m *Migrations) AddMigration(up func(*sql.Tx) error, down func(*sql.Tx) error) { + _, filename, _, _ := runtime.Caller(1) + m.AddNamedMigration(filename, up, down) +} + +// AddNamedMigration : Add a named migration. +func (m *Migrations) AddNamedMigration(filename string, up func(*sql.Tx) error, down func(*sql.Tx) error) { + v, _ := goose.NumericComponent(filename) + migration := &goose.Migration{Version: v, Next: -1, Previous: -1, Registered: true, UpFn: up, DownFn: down, Source: filename} + + if existing, ok := m.registeredGoMigrations[v]; ok { + panic(fmt.Sprintf("failed to add migration %q: version conflicts with %q", filename, existing.Source)) + } + + m.registeredGoMigrations[v] = migration +} + +// RunDeltas up to the latest version. +func (m *Migrations) RunDeltas(db *sql.DB, props *config.DatabaseOptions) error { + maxVer := goose.MaxVersion + minVer := int64(0) + migrations, err := m.collect(minVer, maxVer) + if err != nil { + return fmt.Errorf("RunDeltas: Failed to collect migrations: %w", err) + } + if props.ConnectionString.IsPostgres() { + if err = goose.SetDialect("postgres"); err != nil { + return err + } + } else if props.ConnectionString.IsSQLite() { + if err = goose.SetDialect("sqlite3"); err != nil { + return err + } + } else { + return fmt.Errorf("Unknown connection string: %s", props.ConnectionString) + } + for { + current, err := goose.EnsureDBVersion(db) + if err != nil { + return fmt.Errorf("RunDeltas: Failed to EnsureDBVersion: %w", err) + } + + next, err := migrations.Next(current) + if err != nil { + if err == goose.ErrNoNextVersion { + return nil + } + + return fmt.Errorf("RunDeltas: Failed to load next migration to %+v : %w", next, err) + } + + if err = next.Up(db); err != nil { + return fmt.Errorf("RunDeltas: Failed run migration: %w", err) + } + } +} + +func (m *Migrations) collect(current, target int64) (goose.Migrations, error) { + var migrations goose.Migrations + + // Go migrations registered via goose.AddMigration(). + for _, migration := range m.registeredGoMigrations { + v, err := goose.NumericComponent(migration.Source) + if err != nil { + return nil, err + } + if versionFilter(v, current, target) { + migrations = append(migrations, migration) + } + } + + migrations = sortAndConnectMigrations(migrations) + + return migrations, nil +} + +func sortAndConnectMigrations(migrations goose.Migrations) goose.Migrations { + sort.Sort(migrations) + + // now that we're sorted in the appropriate direction, + // populate next and previous for each migration + for i, m := range migrations { + prev := int64(-1) + if i > 0 { + prev = migrations[i-1].Version + migrations[i-1].Next = m.Version + } + migrations[i].Previous = prev + } + + return migrations +} + +func versionFilter(v, current, target int64) bool { + + if target > current { + return v > current && v <= target + } + + if target < current { + return v <= current && v > target + } + + return false +} diff --git a/internal/version.go b/internal/version.go index 2ffd7c90e..a9e245d44 100644 --- a/internal/version.go +++ b/internal/version.go @@ -1,6 +1,12 @@ package internal -import "fmt" +import ( + "fmt" + "strings" +) + +// the final version string +var version string // -ldflags "-X github.com/matrix-org/dendrite/internal.branch=master" var branch string @@ -16,12 +22,22 @@ const ( ) func VersionString() string { - version := fmt.Sprintf("%d.%d.%d%s", VersionMajor, VersionMinor, VersionPatch, VersionTag) - if branch != "" { - version += fmt.Sprintf("-%s", branch) - } - if build != "" { - version += fmt.Sprintf("+%s", build) - } return version } + +func init() { + version = fmt.Sprintf("%d.%d.%d", VersionMajor, VersionMinor, VersionPatch) + if VersionTag != "" { + version += "-" + VersionTag + } + parts := []string{} + if build != "" { + parts = append(parts, build) + } + if branch != "" { + parts = append(parts, branch) + } + if len(parts) > 0 { + version += "+" + strings.Join(parts, ".") + } +} diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 78420db1f..6c54d2a08 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -15,10 +15,10 @@ package keyserver import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" @@ -36,8 +36,10 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, producer sarama.SyncProducer, + cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + db, err := storage.NewDatabase(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to key server database") diff --git a/roomserver/api/query.go b/roomserver/api/query.go index aff6ee07a..3afca7e81 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -63,7 +63,8 @@ type QueryStateAfterEventsRequest struct { RoomID string `json:"room_id"` // The list of previous events to return the events after. PrevEventIDs []string `json:"prev_event_ids"` - // The state key tuples to fetch from the state + // The state key tuples to fetch from the state. If none are specified then + // the entire resolved room state will be returned. StateToFetch []gomatrixserverlib.StateKeyTuple `json:"state_to_fetch"` } diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 229665a0b..ca5d214d7 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -133,8 +133,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // If the event has already been written to the output log then we // don't need to do anything, as we've handled it already. - hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID) - if err != nil { + if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil { return fmt.Errorf("u.updater.HasEventBeenSent: %w", err) } else if hasBeenSent { return nil @@ -142,17 +141,19 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // Work out what the latest events are. This will include the new // event if it is not already referenced. - u.calculateLatest( + if err := u.calculateLatest( oldLatest, types.StateAtEventAndReference{ EventReference: u.event.EventReference(), StateAtEvent: u.stateAtEvent, }, - ) + ); err != nil { + return fmt.Errorf("u.calculateLatest: %w", err) + } // Now that we know what the latest events are, it's time to get the // latest state. - if err = u.latestState(); err != nil { + if err := u.latestState(); err != nil { return fmt.Errorf("u.latestState: %w", err) } @@ -261,7 +262,7 @@ func (u *latestEventsUpdater) latestState() error { func (u *latestEventsUpdater) calculateLatest( oldLatest []types.StateAtEventAndReference, newEvent types.StateAtEventAndReference, -) { +) error { var newLatest []types.StateAtEventAndReference // First of all, let's see if any of the existing forward extremities @@ -271,6 +272,7 @@ func (u *latestEventsUpdater) calculateLatest( referenced, err := u.updater.IsReferenced(l.EventReference) if err != nil { logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID) + return fmt.Errorf("u.updater.IsReferenced (old): %w", err) } else if !referenced { newLatest = append(newLatest, l) } @@ -285,7 +287,7 @@ func (u *latestEventsUpdater) calculateLatest( // We've already referenced this new event so we can just return // the newly completed extremities at this point. u.latest = newLatest - return + return nil } } @@ -296,11 +298,13 @@ func (u *latestEventsUpdater) calculateLatest( referenced, err := u.updater.IsReferenced(newEvent.EventReference) if err != nil { logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID) + return fmt.Errorf("u.updater.IsReferenced (new): %w", err) } else if !referenced || len(newLatest) == 0 { newLatest = append(newLatest, newEvent) } u.latest = newLatest + return nil } func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 810511505..ecfb580f2 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -49,6 +49,7 @@ func (r *Queryer) QueryLatestEventsAndState( } // QueryStateAfterEvents implements api.RoomserverInternalAPI +// nolint:gocyclo func (r *Queryer) QueryStateAfterEvents( ctx context.Context, request *api.QueryStateAfterEventsRequest, @@ -78,10 +79,18 @@ func (r *Queryer) QueryStateAfterEvents( } response.PrevEventsExist = true - // Look up the currrent state for the requested tuples. - stateEntries, err := roomState.LoadStateAfterEventsForStringTuples( - ctx, prevStates, request.StateToFetch, - ) + var stateEntries []types.StateEntry + if len(request.StateToFetch) == 0 { + // Look up all of the current room state. + stateEntries, err = roomState.LoadCombinedStateAfterEvents( + ctx, prevStates, + ) + } else { + // Look up the current state for the requested tuples. + stateEntries, err = roomState.LoadStateAfterEventsForStringTuples( + ctx, prevStates, request.StateToFetch, + ) + } if err != nil { return err } @@ -91,6 +100,24 @@ func (r *Queryer) QueryStateAfterEvents( return err } + if len(request.PrevEventIDs) > 1 && len(request.StateToFetch) == 0 { + var authEventIDs []string + for _, e := range stateEvents { + authEventIDs = append(authEventIDs, e.AuthEventIDs()...) + } + authEventIDs = util.UniqueStrings(authEventIDs) + + authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs) + if err != nil { + return fmt.Errorf("getAuthChain: %w", err) + } + + stateEvents, err = state.ResolveConflictsAdhoc(info.RoomVersion, stateEvents, authEvents) + if err != nil { + return fmt.Errorf("state.ResolveConflictsAdhoc: %w", err) + } + } + for _, event := range stateEvents { response.StateEvents = append(response.StateEvents, event.Headered(info.RoomVersion)) } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 4c138116f..b2cc0728c 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/roomserver/internal" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/sirupsen/logrus" @@ -41,6 +42,8 @@ func NewInternalAPI( ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + var perspectiveServerNames []gomatrixserverlib.ServerName for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives { perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) @@ -52,7 +55,7 @@ func NewInternalAPI( } return internal.NewRoomserverAPI( - cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), base.Caches, keyRing, perspectiveServerNames, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 2a03195c9..1b692a098 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -17,7 +17,10 @@ import ( "github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/internal/test" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/internal" + "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" ) const ( @@ -160,7 +163,9 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro cfg.Defaults() cfg.Global.ServerName = testOrigin cfg.Global.Kafka.UseNaffka = true - cfg.RoomServer.Database.ConnectionString = config.DataSource(roomserverDBFileURI) + cfg.RoomServer.Database = config.DatabaseOptions{ + ConnectionString: roomserverDBFileURI, + } dp := &dummyProducer{ topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), } @@ -169,12 +174,17 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro t.Fatalf("failed to make caches: %s", err) } base := &setup.BaseDendrite{ - KafkaProducer: dp, - Caches: cache, - Cfg: cfg, + Caches: cache, + Cfg: cfg, } - - return NewInternalAPI(base, &test.NopJSONVerifier{}), dp + roomserverDB, err := storage.Open(&cfg.RoomServer.Database, base.Caches) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to room server db") + } + return internal.NewRoomserverAPI( + &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent)), + base.Caches, &test.NopJSONVerifier{}, nil, + ), dp } func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) (api.RoomserverInternalAPI, *dummyProducer, []gomatrixserverlib.HeaderedEvent) { diff --git a/roomserver/storage/postgres/redactions_table.go b/roomserver/storage/postgres/redactions_table.go index 289e1320f..42aba5985 100644 --- a/roomserver/storage/postgres/redactions_table.go +++ b/roomserver/storage/postgres/redactions_table.go @@ -39,7 +39,8 @@ CREATE INDEX IF NOT EXISTS roomserver_redactions_redacts_event_id ON roomserver_ const insertRedactionSQL = "" + "INSERT INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" + - " VALUES ($1, $2, $3)" + " VALUES ($1, $2, $3)" + + " ON CONFLICT DO NOTHING" const selectRedactionInfoByRedactionEventIDSQL = "" + "SELECT redaction_event_id, redacts_event_id, validated FROM roomserver_redactions" + diff --git a/roomserver/storage/shared/latest_events_updater.go b/roomserver/storage/shared/latest_events_updater.go index 29eab0c98..b316f639d 100644 --- a/roomserver/storage/shared/latest_events_updater.go +++ b/roomserver/storage/shared/latest_events_updater.go @@ -18,23 +18,30 @@ type LatestEventsUpdater struct { currentStateSnapshotNID types.StateSnapshotNID } +func rollback(txn *sql.Tx) { + if txn == nil { + return + } + txn.Rollback() // nolint: errcheck +} + func NewLatestEventsUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomInfo types.RoomInfo) (*LatestEventsUpdater, error) { eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err := d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomInfo.RoomNID) if err != nil { - txn.Rollback() // nolint: errcheck + rollback(txn) return nil, err } stateAndRefs, err := d.EventsTable.BulkSelectStateAtEventAndReference(ctx, txn, eventNIDs) if err != nil { - txn.Rollback() // nolint: errcheck + rollback(txn) return nil, err } var lastEventIDSent string if lastEventNIDSent != 0 { lastEventIDSent, err = d.EventsTable.SelectEventID(ctx, txn, lastEventNIDSent) if err != nil { - txn.Rollback() // nolint: errcheck + rollback(txn) return nil, err } } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index e96eab71b..f2be8b3cf 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -458,7 +458,7 @@ func (d *Database) StoreEvent( eventNID, stateNID, err = d.EventsTable.SelectEvent(ctx, txn, event.EventID()) } if err != nil { - return err + return fmt.Errorf("d.EventsTable.SelectEvent: %w", err) } } @@ -467,6 +467,9 @@ func (d *Database) StoreEvent( } if !isRejected { // ignore rejected redaction events redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event) + if err != nil { + return fmt.Errorf("d.handleRedactions: %w", err) + } } return nil }) @@ -627,6 +630,7 @@ func extractRoomVersionFromCreateEvent(event gomatrixserverlib.Event) ( // to cross-reference with other tables when loading. // // Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction. +// nolint:gocyclo func (d *Database) handleRedactions( ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event gomatrixserverlib.Event, ) (*gomatrixserverlib.Event, string, error) { @@ -644,13 +648,13 @@ func (d *Database) handleRedactions( RedactsEventID: event.Redacts(), }) if err != nil { - return nil, "", err + return nil, "", fmt.Errorf("d.RedactionsTable.InsertRedaction: %w", err) } } redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, eventNID, event) if err != nil { - return nil, "", err + return nil, "", fmt.Errorf("d.loadRedactionPair: %w", err) } if validated || redactedEvent == nil || redactionEvent == nil { // we've seen this redaction before or there is nothing to redact @@ -664,7 +668,7 @@ func (d *Database) handleRedactions( // mark the event as redacted err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent) if err != nil { - return nil, "", err + return nil, "", fmt.Errorf("redactedEvent.SetUnsignedField: %w", err) } if redactionsArePermanent { redactedEvent.Event = redactedEvent.Redact() @@ -672,10 +676,15 @@ func (d *Database) handleRedactions( // overwrite the eventJSON table err = d.EventJSONTable.InsertEventJSON(ctx, txn, redactedEvent.EventNID, redactedEvent.JSON()) if err != nil { - return nil, "", err + return nil, "", fmt.Errorf("d.EventJSONTable.InsertEventJSON: %w", err) } - return &redactionEvent.Event, redactedEvent.EventID(), d.RedactionsTable.MarkRedactionValidated(ctx, txn, redactionEvent.EventID(), true) + err = d.RedactionsTable.MarkRedactionValidated(ctx, txn, redactionEvent.EventID(), true) + if err != nil { + err = fmt.Errorf("d.RedactionsTable.MarkRedactionValidated: %w", err) + } + + return &redactionEvent.Event, redactedEvent.EventID(), err } // loadRedactionPair returns both the redaction event and the redacted event, else nil. diff --git a/roomserver/storage/sqlite3/redactions_table.go b/roomserver/storage/sqlite3/redactions_table.go index a2179357c..e64714862 100644 --- a/roomserver/storage/sqlite3/redactions_table.go +++ b/roomserver/storage/sqlite3/redactions_table.go @@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS roomserver_redactions ( ` const insertRedactionSQL = "" + - "INSERT INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" + + "INSERT OR IGNORE INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" + " VALUES ($1, $2, $3)" const selectRedactionInfoByRedactionEventIDSQL = "" + diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 43e2455b6..de0bb434b 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -17,11 +17,11 @@ package syncapi import ( "context" - "github.com/Shopify/sarama" "github.com/gorilla/mux" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup/kafka" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -37,13 +37,14 @@ import ( // component. func AddPublicRoutes( router *mux.Router, - consumer sarama.Consumer, userAPI userapi.UserInternalAPI, rsAPI api.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") diff --git a/sytest-whitelist b/sytest-whitelist index 805f0e4dd..2ba0a88b2 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -482,3 +482,5 @@ m.room.history_visibility == "joined" allows/forbids appropriately for Real user POST rejects invalid utf-8 in JSON Users cannot kick users who have already left a room A prev_batch token from incremental sync can be used in the v1 messages API +Event with an invalid signature in the send_join response should not cause room join to fail +Inbound federation rejects typing notifications from wrong remote diff --git a/userapi/storage/accounts/postgres/accounts_table.go b/userapi/storage/accounts/postgres/accounts_table.go index 254da84c3..4eaa5b581 100644 --- a/userapi/storage/accounts/postgres/accounts_table.go +++ b/userapi/storage/accounts/postgres/accounts_table.go @@ -75,11 +75,12 @@ type accountsStatements struct { serverName gomatrixserverlib.ServerName } +func (s *accountsStatements) execSchema(db *sql.DB) error { + _, err := db.Exec(accountsSchema) + return err +} + func (s *accountsStatements) prepare(db *sql.DB, server gomatrixserverlib.ServerName) (err error) { - _, err = db.Exec(accountsSchema) - if err != nil { - return - } if s.insertAccountStmt, err = db.Prepare(insertAccountSQL); err != nil { return } diff --git a/userapi/storage/accounts/postgres/deltas/20200929203058_is_active.go b/userapi/storage/accounts/postgres/deltas/20200929203058_is_active.go new file mode 100644 index 000000000..9e14286e0 --- /dev/null +++ b/userapi/storage/accounts/postgres/deltas/20200929203058_is_active.go @@ -0,0 +1,33 @@ +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpIsActive, DownIsActive) +} + +func LoadIsActive(m *sqlutil.Migrations) { + m.AddMigration(UpIsActive, DownIsActive) +} + +func UpIsActive(tx *sql.Tx) error { + _, err := tx.Exec("ALTER TABLE account_accounts ADD COLUMN IF NOT EXISTS is_deactivated BOOLEAN DEFAULT FALSE;") + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownIsActive(tx *sql.Tx) error { + _, err := tx.Exec("ALTER TABLE account_accounts DROP COLUMN is_deactivated;") + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/userapi/storage/accounts/postgres/deltas/20200929203058_is_active.sql b/userapi/storage/accounts/postgres/deltas/20200929203058_is_active.sql deleted file mode 100644 index 32e6e1664..000000000 --- a/userapi/storage/accounts/postgres/deltas/20200929203058_is_active.sql +++ /dev/null @@ -1,9 +0,0 @@ --- +goose Up --- +goose StatementBegin -ALTER TABLE account_accounts ADD COLUMN IF NOT EXISTS is_deactivated BOOLEAN DEFAULT FALSE; --- +goose StatementEnd - --- +goose Down --- +goose StatementBegin -ALTER TABLE account_accounts DROP COLUMN is_deactivated; --- +goose StatementEnd diff --git a/userapi/storage/accounts/postgres/storage.go b/userapi/storage/accounts/postgres/storage.go index 2230f7e79..40c4b8ff5 100644 --- a/userapi/storage/accounts/postgres/storage.go +++ b/userapi/storage/accounts/postgres/storage.go @@ -25,6 +25,8 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/dendrite/userapi/storage/accounts/postgres/deltas" + _ "github.com/matrix-org/dendrite/userapi/storage/accounts/postgres/deltas" "github.com/matrix-org/gomatrixserverlib" "golang.org/x/crypto/bcrypt" @@ -55,6 +57,18 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver db: db, writer: sqlutil.NewDummyWriter(), } + + // Create tables before executing migrations so we don't fail if the table is missing, + // and THEN prepare statements so we don't fail due to referencing new columns + if err = d.accounts.execSchema(db); err != nil { + return nil, err + } + m := sqlutil.NewMigrations() + deltas.LoadIsActive(m) + if err = m.RunDeltas(db, dbProperties); err != nil { + return nil, err + } + if err = d.PartitionOffsetStatements.Prepare(db, d.writer, "account"); err != nil { return nil, err } @@ -70,6 +84,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver if err = d.threepids.prepare(db); err != nil { return nil, err } + return d, nil } diff --git a/userapi/storage/accounts/sqlite3/accounts_table.go b/userapi/storage/accounts/sqlite3/accounts_table.go index d0ea8a8bc..50f07237e 100644 --- a/userapi/storage/accounts/sqlite3/accounts_table.go +++ b/userapi/storage/accounts/sqlite3/accounts_table.go @@ -74,13 +74,13 @@ type accountsStatements struct { serverName gomatrixserverlib.ServerName } +func (s *accountsStatements) execSchema(db *sql.DB) error { + _, err := db.Exec(accountsSchema) + return err +} + func (s *accountsStatements) prepare(db *sql.DB, server gomatrixserverlib.ServerName) (err error) { s.db = db - - _, err = db.Exec(accountsSchema) - if err != nil { - return - } if s.insertAccountStmt, err = db.Prepare(insertAccountSQL); err != nil { return } diff --git a/userapi/storage/accounts/sqlite3/deltas/20200929203058_is_active.go b/userapi/storage/accounts/sqlite3/deltas/20200929203058_is_active.go new file mode 100644 index 000000000..9fddb05a1 --- /dev/null +++ b/userapi/storage/accounts/sqlite3/deltas/20200929203058_is_active.go @@ -0,0 +1,64 @@ +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpIsActive, DownIsActive) +} + +func LoadIsActive(m *sqlutil.Migrations) { + m.AddMigration(UpIsActive, DownIsActive) +} + +func UpIsActive(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE account_accounts RENAME TO account_accounts_tmp; +CREATE TABLE account_accounts ( + localpart TEXT NOT NULL PRIMARY KEY, + created_ts BIGINT NOT NULL, + password_hash TEXT, + appservice_id TEXT, + is_deactivated BOOLEAN DEFAULT 0 +); +INSERT + INTO account_accounts ( + localpart, created_ts, password_hash, appservice_id + ) SELECT + localpart, created_ts, password_hash, appservice_id + FROM account_accounts_tmp +; +DROP TABLE account_accounts_tmp;`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownIsActive(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE account_accounts RENAME TO account_accounts_tmp; +CREATE TABLE account_accounts ( + localpart TEXT NOT NULL PRIMARY KEY, + created_ts BIGINT NOT NULL, + password_hash TEXT, + appservice_id TEXT +); +INSERT + INTO account_accounts ( + localpart, created_ts, password_hash, appservice_id + ) SELECT + localpart, created_ts, password_hash, appservice_id + FROM account_accounts_tmp +; +DROP TABLE account_accounts_tmp;`) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/userapi/storage/accounts/sqlite3/deltas/20200929203058_is_active.sql b/userapi/storage/accounts/sqlite3/deltas/20200929203058_is_active.sql deleted file mode 100644 index 51e9bae3c..000000000 --- a/userapi/storage/accounts/sqlite3/deltas/20200929203058_is_active.sql +++ /dev/null @@ -1,38 +0,0 @@ --- +goose Up --- +goose StatementBegin -ALTER TABLE account_accounts RENAME TO account_accounts_tmp; -CREATE TABLE account_accounts ( - localpart TEXT NOT NULL PRIMARY KEY, - created_ts BIGINT NOT NULL, - password_hash TEXT, - appservice_id TEXT, - is_deactivated BOOLEAN DEFAULT 0 -); -INSERT - INTO account_accounts ( - localpart, created_ts, password_hash, appservice_id - ) SELECT - localpart, created_ts, password_hash, appservice_id - FROM account_accounts_tmp -; -DROP TABLE account_accounts_tmp; --- +goose StatementEnd - --- +goose Down --- +goose StatementBegin -ALTER TABLE account_accounts RENAME TO account_accounts_tmp; -CREATE TABLE account_accounts ( - localpart TEXT NOT NULL PRIMARY KEY, - created_ts BIGINT NOT NULL, - password_hash TEXT, - appservice_id TEXT -); -INSERT - INTO account_accounts ( - localpart, created_ts, password_hash, appservice_id - ) SELECT - localpart, created_ts, password_hash, appservice_id - FROM account_accounts_tmp -; -DROP TABLE account_accounts_tmp; --- +goose StatementEnd diff --git a/userapi/storage/accounts/sqlite3/storage.go b/userapi/storage/accounts/sqlite3/storage.go index 7a2830a93..0be7bcbe7 100644 --- a/userapi/storage/accounts/sqlite3/storage.go +++ b/userapi/storage/accounts/sqlite3/storage.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/dendrite/userapi/storage/accounts/sqlite3/deltas" "github.com/matrix-org/gomatrixserverlib" "golang.org/x/crypto/bcrypt" // Import the sqlite3 database driver. @@ -60,6 +61,18 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver db: db, writer: sqlutil.NewExclusiveWriter(), } + + // Create tables before executing migrations so we don't fail if the table is missing, + // and THEN prepare statements so we don't fail due to referencing new columns + if err = d.accounts.execSchema(db); err != nil { + return nil, err + } + m := sqlutil.NewMigrations() + deltas.LoadIsActive(m) + if err = m.RunDeltas(db, dbProperties); err != nil { + return nil, err + } + partitions := sqlutil.PartitionOffsetStatements{} if err = partitions.Prepare(db, d.writer, "account"); err != nil { return nil, err @@ -76,6 +89,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver if err = d.threepids.prepare(db); err != nil { return nil, err } + return d, nil } diff --git a/userapi/storage/devices/postgres/deltas/20201001204705_last_seen_ts_ip.go b/userapi/storage/devices/postgres/deltas/20201001204705_last_seen_ts_ip.go new file mode 100644 index 000000000..290f854c8 --- /dev/null +++ b/userapi/storage/devices/postgres/deltas/20201001204705_last_seen_ts_ip.go @@ -0,0 +1,39 @@ +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpLastSeenTSIP, DownLastSeenTSIP) +} + +func LoadLastSeenTSIP(m *sqlutil.Migrations) { + m.AddMigration(UpLastSeenTSIP, DownLastSeenTSIP) +} + +func UpLastSeenTSIP(tx *sql.Tx) error { + _, err := tx.Exec(` +ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS last_seen_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)*1000; +ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS ip TEXT; +ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS user_agent TEXT;`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownLastSeenTSIP(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE device_devices DROP COLUMN last_seen_ts; + ALTER TABLE device_devices DROP COLUMN ip; + ALTER TABLE device_devices DROP COLUMN user_agent;`) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/userapi/storage/devices/postgres/deltas/20201001204705_last_seen_ts_ip.sql b/userapi/storage/devices/postgres/deltas/20201001204705_last_seen_ts_ip.sql deleted file mode 100644 index e7900b0b3..000000000 --- a/userapi/storage/devices/postgres/deltas/20201001204705_last_seen_ts_ip.sql +++ /dev/null @@ -1,13 +0,0 @@ --- +goose Up --- +goose StatementBegin -ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS last_seen_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)*1000; -ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS ip TEXT; -ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS user_agent TEXT; --- +goose StatementEnd - --- +goose Down --- +goose StatementBegin -ALTER TABLE device_devices DROP COLUMN last_seen_ts; -ALTER TABLE device_devices DROP COLUMN ip; -ALTER TABLE device_devices DROP COLUMN user_agent; --- +goose StatementEnd diff --git a/userapi/storage/devices/postgres/devices_table.go b/userapi/storage/devices/postgres/devices_table.go index 2a4d337c7..379fed794 100644 --- a/userapi/storage/devices/postgres/devices_table.go +++ b/userapi/storage/devices/postgres/devices_table.go @@ -111,11 +111,12 @@ type devicesStatements struct { serverName gomatrixserverlib.ServerName } +func (s *devicesStatements) execSchema(db *sql.DB) error { + _, err := db.Exec(devicesSchema) + return err +} + func (s *devicesStatements) prepare(db *sql.DB, server gomatrixserverlib.ServerName) (err error) { - _, err = db.Exec(devicesSchema) - if err != nil { - return - } if s.insertDeviceStmt, err = db.Prepare(insertDeviceSQL); err != nil { return } diff --git a/userapi/storage/devices/postgres/storage.go b/userapi/storage/devices/postgres/storage.go index faa5796b0..e318b260b 100644 --- a/userapi/storage/devices/postgres/storage.go +++ b/userapi/storage/devices/postgres/storage.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/dendrite/userapi/storage/devices/postgres/deltas" "github.com/matrix-org/gomatrixserverlib" ) @@ -42,9 +43,22 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver return nil, err } d := devicesStatements{} + + // Create tables before executing migrations so we don't fail if the table is missing, + // and THEN prepare statements so we don't fail due to referencing new columns + if err = d.execSchema(db); err != nil { + return nil, err + } + m := sqlutil.NewMigrations() + deltas.LoadLastSeenTSIP(m) + if err = m.RunDeltas(db, dbProperties); err != nil { + return nil, err + } + if err = d.prepare(db, serverName); err != nil { return nil, err } + return &Database{db, d}, nil } diff --git a/userapi/storage/devices/sqlite3/deltas/20201001204705_last_seen_ts_ip.go b/userapi/storage/devices/sqlite3/deltas/20201001204705_last_seen_ts_ip.go new file mode 100644 index 000000000..262098265 --- /dev/null +++ b/userapi/storage/devices/sqlite3/deltas/20201001204705_last_seen_ts_ip.go @@ -0,0 +1,70 @@ +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpLastSeenTSIP, DownLastSeenTSIP) +} + +func LoadLastSeenTSIP(m *sqlutil.Migrations) { + m.AddMigration(UpLastSeenTSIP, DownLastSeenTSIP) +} + +func UpLastSeenTSIP(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE device_devices RENAME TO device_devices_tmp; + CREATE TABLE device_devices ( + access_token TEXT PRIMARY KEY, + session_id INTEGER, + device_id TEXT , + localpart TEXT , + created_ts BIGINT, + display_name TEXT, + last_seen_ts BIGINT, + ip TEXT, + user_agent TEXT, + UNIQUE (localpart, device_id) + ); + INSERT + INTO device_devices ( + access_token, session_id, device_id, localpart, created_ts, display_name, last_seen_ts, ip, user_agent + ) SELECT + access_token, session_id, device_id, localpart, created_ts, display_name, created_ts, '', '' + FROM device_devices_tmp; + DROP TABLE device_devices_tmp;`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownLastSeenTSIP(tx *sql.Tx) error { + _, err := tx.Exec(` +ALTER TABLE device_devices RENAME TO device_devices_tmp; +CREATE TABLE IF NOT EXISTS device_devices ( + access_token TEXT PRIMARY KEY, + session_id INTEGER, + device_id TEXT , + localpart TEXT , + created_ts BIGINT, + display_name TEXT, + UNIQUE (localpart, device_id) +); +INSERT +INTO device_devices ( + access_token, session_id, device_id, localpart, created_ts, display_name +) SELECT + access_token, session_id, device_id, localpart, created_ts, display_name +FROM device_devices_tmp; +DROP TABLE device_devices_tmp;`) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/userapi/storage/devices/sqlite3/deltas/20201001204705_last_seen_ts_ip.sql b/userapi/storage/devices/sqlite3/deltas/20201001204705_last_seen_ts_ip.sql deleted file mode 100644 index 887f90e0d..000000000 --- a/userapi/storage/devices/sqlite3/deltas/20201001204705_last_seen_ts_ip.sql +++ /dev/null @@ -1,44 +0,0 @@ --- +goose Up --- +goose StatementBegin -ALTER TABLE device_devices RENAME TO device_devices_tmp; -CREATE TABLE device_devices ( - access_token TEXT PRIMARY KEY, - session_id INTEGER, - device_id TEXT , - localpart TEXT , - created_ts BIGINT, - display_name TEXT, - last_seen_ts BIGINT, - ip TEXT, - user_agent TEXT, - UNIQUE (localpart, device_id) -); -INSERT -INTO device_devices ( - access_token, session_id, device_id, localpart, created_ts, display_name, last_seen_ts, ip, user_agent -) SELECT - access_token, session_id, device_id, localpart, created_ts, display_name, created_ts, '', '' -FROM device_devices_tmp; -DROP TABLE device_devices_tmp; --- +goose StatementEnd - --- +goose Down --- +goose StatementBegin -ALTER TABLE device_devices RENAME TO device_devices_tmp; -CREATE TABLE IF NOT EXISTS device_devices ( - access_token TEXT PRIMARY KEY, - session_id INTEGER, - device_id TEXT , - localpart TEXT , - created_ts BIGINT, - display_name TEXT, - UNIQUE (localpart, device_id) -); -INSERT -INTO device_devices ( - access_token, session_id, device_id, localpart, created_ts, display_name -) SELECT - access_token, session_id, device_id, localpart, created_ts, display_name -FROM device_devices_tmp; -DROP TABLE device_devices_tmp; --- +goose StatementEnd \ No newline at end of file diff --git a/userapi/storage/devices/sqlite3/devices_table.go b/userapi/storage/devices/sqlite3/devices_table.go index 6b0de10ee..26c03222a 100644 --- a/userapi/storage/devices/sqlite3/devices_table.go +++ b/userapi/storage/devices/sqlite3/devices_table.go @@ -98,13 +98,14 @@ type devicesStatements struct { serverName gomatrixserverlib.ServerName } +func (s *devicesStatements) execSchema(db *sql.DB) error { + _, err := db.Exec(devicesSchema) + return err +} + func (s *devicesStatements) prepare(db *sql.DB, writer sqlutil.Writer, server gomatrixserverlib.ServerName) (err error) { s.db = db s.writer = writer - _, err = db.Exec(devicesSchema) - if err != nil { - return - } if s.insertDeviceStmt, err = db.Prepare(insertDeviceSQL); err != nil { return } diff --git a/userapi/storage/devices/sqlite3/storage.go b/userapi/storage/devices/sqlite3/storage.go index cfaf4fd99..25888eae4 100644 --- a/userapi/storage/devices/sqlite3/storage.go +++ b/userapi/storage/devices/sqlite3/storage.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/dendrite/userapi/storage/devices/sqlite3/deltas" "github.com/matrix-org/gomatrixserverlib" _ "github.com/mattn/go-sqlite3" @@ -46,6 +47,17 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver } writer := sqlutil.NewExclusiveWriter() d := devicesStatements{} + + // Create tables before executing migrations so we don't fail if the table is missing, + // and THEN prepare statements so we don't fail due to referencing new columns + if err = d.execSchema(db); err != nil { + return nil, err + } + m := sqlutil.NewMigrations() + deltas.LoadLastSeenTSIP(m) + if err = m.RunDeltas(db, dbProperties); err != nil { + return nil, err + } if err = d.prepare(db, writer, serverName); err != nil { return nil, err }