diff --git a/cmd/dendrite-upgrade-tests/main.go b/cmd/dendrite-upgrade-tests/main.go index 35852e8ac..3241234ac 100644 --- a/cmd/dendrite-upgrade-tests/main.go +++ b/cmd/dendrite-upgrade-tests/main.go @@ -189,7 +189,9 @@ func buildDendrite(httpClient *http.Client, dockerClient *client.Client, tmpDir, if err := decoder.Decode(&dl); err != nil { return "", fmt.Errorf("failed to decode build image output line: %w", err) } - log.Printf("%s: %s", branchOrTagName, dl.Stream) + if len(strings.TrimSpace(dl.Stream)) > 0 { + log.Printf("%s: %s", branchOrTagName, dl.Stream) + } if dl.Aux != nil { imgID, ok := dl.Aux["ID"] if ok { @@ -425,8 +427,10 @@ func cleanup(dockerClient *client.Client) { // ignore all errors, we are just cleaning up and don't want to fail just because we fail to cleanup containers, _ := dockerClient.ContainerList(context.Background(), types.ContainerListOptions{ Filters: label(dendriteUpgradeTestLabel), + All: true, }) for _, c := range containers { + log.Printf("Removing container: %v %v\n", c.ID, c.Names) s := time.Second _ = dockerClient.ContainerStop(context.Background(), c.ID, &s) _ = dockerClient.ContainerRemove(context.Background(), c.ID, types.ContainerRemoveOptions{ diff --git a/keyserver/api/api.go b/keyserver/api/api.go index eae14ae17..0eea2f0fa 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -69,7 +69,8 @@ type DeviceMessage struct { *DeviceKeys `json:"DeviceKeys,omitempty"` *eduapi.OutputCrossSigningKeyUpdate `json:"CrossSigningKeyUpdate,omitempty"` // A monotonically increasing number which represents device changes for this user. - StreamID int + StreamID int + DeviceChangeID int64 } // DeviceKeys represents a set of device keys for a single device diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index 0140a8f49..259249217 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -59,8 +59,7 @@ func (a *KeyInternalAPI) InputDeviceListUpdate( } func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { - partition := 0 - userIDs, latest, err := a.DB.KeyChanges(ctx, int32(partition), req.Offset, req.ToOffset) + userIDs, latest, err := a.DB.KeyChanges(ctx, req.Offset, req.ToOffset) if err != nil { res.Error = &api.KeyError{ Err: err.Error(), diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 03a221a6c..8cc50ea0d 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -40,16 +40,16 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { func NewInternalAPI( base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - _, consumer, producer := jetstream.Prepare(&cfg.Matrix.JetStream) + js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream) db, err := storage.NewDatabase(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to key server database") } keyChangeProducer := &producers.KeyChange{ - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), - Producer: producer, - DB: db, + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), + JetStream: js, + DB: db, } ap := &internal.KeyInternalAPI{ DB: db, diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go index c5ddf2c19..fd143c6cf 100644 --- a/keyserver/producers/keychange.go +++ b/keyserver/producers/keychange.go @@ -18,43 +18,47 @@ import ( "context" "encoding/json" - "github.com/Shopify/sarama" eduapi "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" ) // KeyChange produces key change events for the sync API and federation sender to consume type KeyChange struct { - Topic string - Producer sarama.SyncProducer - DB storage.Database + Topic string + JetStream nats.JetStreamContext + DB storage.Database } // ProduceKeyChanges creates new change events for each key func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error { userToDeviceCount := make(map[string]int) for _, key := range keys { - var m sarama.ProducerMessage - + id, err := p.DB.StoreKeyChange(context.Background(), key.UserID) + if err != nil { + return err + } + key.DeviceChangeID = id value, err := json.Marshal(key) if err != nil { return err } - m.Topic = string(p.Topic) - m.Key = sarama.StringEncoder(key.UserID) - m.Value = sarama.ByteEncoder(value) + m := &nats.Msg{ + Subject: p.Topic, + Header: nats.Header{}, + } + m.Header.Set(jetstream.UserID, key.UserID) + m.Data = value - partition, offset, err := p.Producer.SendMessage(&m) - if err != nil { - return err - } - err = p.DB.StoreKeyChange(context.Background(), partition, offset, key.UserID) + _, err = p.JetStream.PublishMsg(m) if err != nil { return err } + userToDeviceCount[key.UserID]++ } for userID, count := range userToDeviceCount { @@ -67,7 +71,6 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error { } func (p *KeyChange) ProduceSigningKeyUpdate(key eduapi.CrossSigningKeyUpdate) error { - var m sarama.ProducerMessage output := &api.DeviceMessage{ Type: api.TypeCrossSigningUpdate, OutputCrossSigningKeyUpdate: &eduapi.OutputCrossSigningKeyUpdate{ @@ -75,20 +78,25 @@ func (p *KeyChange) ProduceSigningKeyUpdate(key eduapi.CrossSigningKeyUpdate) er }, } + id, err := p.DB.StoreKeyChange(context.Background(), key.UserID) + if err != nil { + return err + } + output.DeviceChangeID = id + value, err := json.Marshal(output) if err != nil { return err } - m.Topic = string(p.Topic) - m.Key = sarama.StringEncoder(key.UserID) - m.Value = sarama.ByteEncoder(value) - - partition, offset, err := p.Producer.SendMessage(&m) - if err != nil { - return err + m := &nats.Msg{ + Subject: p.Topic, + Header: nats.Header{}, } - err = p.DB.StoreKeyChange(context.Background(), partition, offset, key.UserID) + m.Header.Set(jetstream.UserID, key.UserID) + m.Data = value + + _, err = p.JetStream.PublishMsg(m) if err != nil { return err } diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go index 99842bc58..87feae47d 100644 --- a/keyserver/storage/interface.go +++ b/keyserver/storage/interface.go @@ -66,14 +66,14 @@ type Database interface { // cannot be claimed or if none exist for this (user, device, algorithm), instead it is omitted from the returned slice. ClaimKeys(ctx context.Context, userToDeviceToAlgorithm map[string]map[string]string) ([]api.OneTimeKeys, error) - // StoreKeyChange stores key change metadata after the change has been sent to Kafka. `userID` is the the user who has changed - // their keys in some way. - StoreKeyChange(ctx context.Context, partition int32, offset int64, userID string) error + // StoreKeyChange stores key change metadata and returns the device change ID which represents the position in the /sync stream for this device change. + // `userID` is the the user who has changed their keys in some way. + StoreKeyChange(ctx context.Context, userID string) (int64, error) // KeyChanges returns a list of user IDs who have modified their keys from the offset given (exclusive) to the offset given (inclusive). // A to offset of sarama.OffsetNewest means no upper limit. // Returns the offset of the latest key change. - KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) + KeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) // StaleDeviceLists returns a list of user IDs ending with the domains provided who have stale device lists. // If no domains are given, all user IDs with stale device lists are returned. diff --git a/keyserver/storage/postgres/deltas/2022012016470000_key_changes.go b/keyserver/storage/postgres/deltas/2022012016470000_key_changes.go new file mode 100644 index 000000000..e5bcf08d1 --- /dev/null +++ b/keyserver/storage/postgres/deltas/2022012016470000_key_changes.go @@ -0,0 +1,79 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpRefactorKeyChanges, DownRefactorKeyChanges) +} + +func LoadRefactorKeyChanges(m *sqlutil.Migrations) { + m.AddMigration(UpRefactorKeyChanges, DownRefactorKeyChanges) +} + +func UpRefactorKeyChanges(tx *sql.Tx) error { + // start counting from the last max offset, else 0. We need to do a count(*) first to see if there + // even are entries in this table to know if we can query for log_offset. Without the count then + // the query to SELECT the max log offset fails on new Dendrite instances as log_offset doesn't + // exist on that table. Even though we discard the error, the txn is tainted and gets aborted :/ + var count int + _ = tx.QueryRow(`SELECT count(*) FROM keyserver_key_changes`).Scan(&count) + if count > 0 { + var maxOffset int64 + _ = tx.QueryRow(`SELECT coalesce(MAX(log_offset), 0) AS offset FROM keyserver_key_changes`).Scan(&maxOffset) + if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE IF NOT EXISTS keyserver_key_changes_seq START %d`, maxOffset)); err != nil { + return fmt.Errorf("failed to CREATE SEQUENCE for key changes, starting at %d: %s", maxOffset, err) + } + } + + _, err := tx.Exec(` + -- make the new table + DROP TABLE IF EXISTS keyserver_key_changes; + CREATE TABLE IF NOT EXISTS keyserver_key_changes ( + change_id BIGINT PRIMARY KEY DEFAULT nextval('keyserver_key_changes_seq'), + user_id TEXT NOT NULL, + CONSTRAINT keyserver_key_changes_unique_per_user UNIQUE (user_id) + ); + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownRefactorKeyChanges(tx *sql.Tx) error { + _, err := tx.Exec(` + -- Drop all data and revert back, we can't keep the data as Kafka offsets determine the numbers + DROP SEQUENCE IF EXISTS keyserver_key_changes_seq; + DROP TABLE IF EXISTS keyserver_key_changes; + CREATE TABLE IF NOT EXISTS keyserver_key_changes ( + partition BIGINT NOT NULL, + log_offset BIGINT NOT NULL, + user_id TEXT NOT NULL, + CONSTRAINT keyserver_key_changes_unique UNIQUE (partition, log_offset) + ); + `) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/keyserver/storage/postgres/key_changes_table.go b/keyserver/storage/postgres/key_changes_table.go index df4b47e79..20d227c24 100644 --- a/keyserver/storage/postgres/key_changes_table.go +++ b/keyserver/storage/postgres/key_changes_table.go @@ -26,27 +26,25 @@ import ( var keyChangesSchema = ` -- Stores key change information about users. Used to determine when to send updated device lists to clients. +CREATE SEQUENCE IF NOT EXISTS keyserver_key_changes_seq; CREATE TABLE IF NOT EXISTS keyserver_key_changes ( - partition BIGINT NOT NULL, - log_offset BIGINT NOT NULL, + change_id BIGINT PRIMARY KEY DEFAULT nextval('keyserver_key_changes_seq'), user_id TEXT NOT NULL, - CONSTRAINT keyserver_key_changes_unique UNIQUE (partition, log_offset) + CONSTRAINT keyserver_key_changes_unique_per_user UNIQUE (user_id) ); ` -// Replace based on partition|offset - we should never insert duplicates unless the kafka logs are wiped. -// Rather than falling over, just overwrite (though this will mean clients with an existing sync token will -// miss out on updates). TODO: Ideally we would detect when kafka logs are purged then purge this table too. +// Replace based on user ID. We don't care how many times the user's keys have changed, only that they +// have changed, hence we can just keep bumping the change ID for this user. const upsertKeyChangeSQL = "" + - "INSERT INTO keyserver_key_changes (partition, log_offset, user_id)" + - " VALUES ($1, $2, $3)" + - " ON CONFLICT ON CONSTRAINT keyserver_key_changes_unique" + - " DO UPDATE SET user_id = $3" + "INSERT INTO keyserver_key_changes (user_id)" + + " VALUES ($1)" + + " ON CONFLICT ON CONSTRAINT keyserver_key_changes_unique_per_user" + + " DO UPDATE SET change_id = nextval('keyserver_key_changes_seq')" + + " RETURNING change_id" -// select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just -// take the max offset value as the latest offset. const selectKeyChangesSQL = "" + - "SELECT user_id, MAX(log_offset) FROM keyserver_key_changes WHERE partition = $1 AND log_offset > $2 AND log_offset <= $3 GROUP BY user_id" + "SELECT user_id, change_id FROM keyserver_key_changes WHERE change_id > $1 AND change_id <= $2" type keyChangesStatements struct { db *sql.DB @@ -59,31 +57,32 @@ func NewPostgresKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) { db: db, } _, err := db.Exec(keyChangesSchema) - if err != nil { - return nil, err - } - if s.upsertKeyChangeStmt, err = db.Prepare(upsertKeyChangeSQL); err != nil { - return nil, err - } - if s.selectKeyChangesStmt, err = db.Prepare(selectKeyChangesSQL); err != nil { - return nil, err - } - return s, nil + return s, err } -func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition int32, offset int64, userID string) error { - _, err := s.upsertKeyChangeStmt.ExecContext(ctx, partition, offset, userID) - return err +func (s *keyChangesStatements) Prepare() (err error) { + if s.upsertKeyChangeStmt, err = s.db.Prepare(upsertKeyChangeSQL); err != nil { + return err + } + if s.selectKeyChangesStmt, err = s.db.Prepare(selectKeyChangesSQL); err != nil { + return err + } + return nil +} + +func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID string) (changeID int64, err error) { + err = s.upsertKeyChangeStmt.QueryRowContext(ctx, userID).Scan(&changeID) + return } func (s *keyChangesStatements) SelectKeyChanges( - ctx context.Context, partition int32, fromOffset, toOffset int64, + ctx context.Context, fromOffset, toOffset int64, ) (userIDs []string, latestOffset int64, err error) { if toOffset == sarama.OffsetNewest { toOffset = math.MaxInt64 } latestOffset = fromOffset - rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset) + rows, err := s.selectKeyChangesStmt.QueryContext(ctx, fromOffset, toOffset) if err != nil { return nil, 0, err } diff --git a/keyserver/storage/postgres/storage.go b/keyserver/storage/postgres/storage.go index 52f3a7f6b..b71cc1a7a 100644 --- a/keyserver/storage/postgres/storage.go +++ b/keyserver/storage/postgres/storage.go @@ -16,6 +16,7 @@ package postgres import ( "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/keyserver/storage/postgres/deltas" "github.com/matrix-org/dendrite/keyserver/storage/shared" "github.com/matrix-org/dendrite/setup/config" ) @@ -51,6 +52,14 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) if err != nil { return nil, err } + m := sqlutil.NewMigrations() + deltas.LoadRefactorKeyChanges(m) + if err = m.RunDeltas(db, dbProperties); err != nil { + return nil, err + } + if err = kc.Prepare(); err != nil { + return nil, err + } d := &shared.Database{ DB: db, Writer: sqlutil.NewDummyWriter(), diff --git a/keyserver/storage/shared/storage.go b/keyserver/storage/shared/storage.go index 5bd8be368..5914d28e1 100644 --- a/keyserver/storage/shared/storage.go +++ b/keyserver/storage/shared/storage.go @@ -135,14 +135,16 @@ func (d *Database) ClaimKeys(ctx context.Context, userToDeviceToAlgorithm map[st return result, err } -func (d *Database) StoreKeyChange(ctx context.Context, partition int32, offset int64, userID string) error { - return d.Writer.Do(nil, nil, func(_ *sql.Tx) error { - return d.KeyChangesTable.InsertKeyChange(ctx, partition, offset, userID) +func (d *Database) StoreKeyChange(ctx context.Context, userID string) (id int64, err error) { + err = d.Writer.Do(nil, nil, func(_ *sql.Tx) error { + id, err = d.KeyChangesTable.InsertKeyChange(ctx, userID) + return err }) + return } -func (d *Database) KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) { - return d.KeyChangesTable.SelectKeyChanges(ctx, partition, fromOffset, toOffset) +func (d *Database) KeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) { + return d.KeyChangesTable.SelectKeyChanges(ctx, fromOffset, toOffset) } // StaleDeviceLists returns a list of user IDs ending with the domains provided who have stale device lists. diff --git a/keyserver/storage/sqlite3/deltas/2022012016470000_key_changes.go b/keyserver/storage/sqlite3/deltas/2022012016470000_key_changes.go new file mode 100644 index 000000000..fbc548c38 --- /dev/null +++ b/keyserver/storage/sqlite3/deltas/2022012016470000_key_changes.go @@ -0,0 +1,76 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpRefactorKeyChanges, DownRefactorKeyChanges) +} + +func LoadRefactorKeyChanges(m *sqlutil.Migrations) { + m.AddMigration(UpRefactorKeyChanges, DownRefactorKeyChanges) +} + +func UpRefactorKeyChanges(tx *sql.Tx) error { + // start counting from the last max offset, else 0. + var maxOffset int64 + var userID string + _ = tx.QueryRow(`SELECT user_id, MAX(log_offset) FROM keyserver_key_changes GROUP BY user_id`).Scan(&userID, &maxOffset) + + _, err := tx.Exec(` + -- make the new table + DROP TABLE IF EXISTS keyserver_key_changes; + CREATE TABLE IF NOT EXISTS keyserver_key_changes ( + change_id INTEGER PRIMARY KEY AUTOINCREMENT, + -- The key owner + user_id TEXT NOT NULL, + UNIQUE (user_id) + ); + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + // to start counting from maxOffset, insert a row with that value + if userID != "" { + _, err = tx.Exec(`INSERT INTO keyserver_key_changes(change_id, user_id) VALUES($1, $2)`, maxOffset, userID) + return err + } + return nil +} + +func DownRefactorKeyChanges(tx *sql.Tx) error { + _, err := tx.Exec(` + -- Drop all data and revert back, we can't keep the data as Kafka offsets determine the numbers + DROP TABLE IF EXISTS keyserver_key_changes; + CREATE TABLE IF NOT EXISTS keyserver_key_changes ( + partition BIGINT NOT NULL, + offset BIGINT NOT NULL, + -- The key owner + user_id TEXT NOT NULL, + UNIQUE (partition, offset) + ); + `) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/keyserver/storage/sqlite3/key_changes_table.go b/keyserver/storage/sqlite3/key_changes_table.go index b4753ccc5..d43c15ca9 100644 --- a/keyserver/storage/sqlite3/key_changes_table.go +++ b/keyserver/storage/sqlite3/key_changes_table.go @@ -27,27 +27,22 @@ import ( var keyChangesSchema = ` -- Stores key change information about users. Used to determine when to send updated device lists to clients. CREATE TABLE IF NOT EXISTS keyserver_key_changes ( - partition BIGINT NOT NULL, - offset BIGINT NOT NULL, + change_id INTEGER PRIMARY KEY AUTOINCREMENT, -- The key owner user_id TEXT NOT NULL, - UNIQUE (partition, offset) + UNIQUE (user_id) ); ` -// Replace based on partition|offset - we should never insert duplicates unless the kafka logs are wiped. -// Rather than falling over, just overwrite (though this will mean clients with an existing sync token will -// miss out on updates). TODO: Ideally we would detect when kafka logs are purged then purge this table too. +// Replace based on user ID. We don't care how many times the user's keys have changed, only that they +// have changed, hence we can just keep bumping the change ID for this user. const upsertKeyChangeSQL = "" + - "INSERT INTO keyserver_key_changes (partition, offset, user_id)" + - " VALUES ($1, $2, $3)" + - " ON CONFLICT (partition, offset)" + - " DO UPDATE SET user_id = $3" + "INSERT OR REPLACE INTO keyserver_key_changes (user_id)" + + " VALUES ($1)" + + " RETURNING change_id" -// select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just -// take the max offset value as the latest offset. const selectKeyChangesSQL = "" + - "SELECT user_id, MAX(offset) FROM keyserver_key_changes WHERE partition = $1 AND offset > $2 AND offset <= $3 GROUP BY user_id" + "SELECT user_id, change_id FROM keyserver_key_changes WHERE change_id > $1 AND change_id <= $2" type keyChangesStatements struct { db *sql.DB @@ -60,31 +55,32 @@ func NewSqliteKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) { db: db, } _, err := db.Exec(keyChangesSchema) - if err != nil { - return nil, err - } - if s.upsertKeyChangeStmt, err = db.Prepare(upsertKeyChangeSQL); err != nil { - return nil, err - } - if s.selectKeyChangesStmt, err = db.Prepare(selectKeyChangesSQL); err != nil { - return nil, err - } - return s, nil + return s, err } -func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition int32, offset int64, userID string) error { - _, err := s.upsertKeyChangeStmt.ExecContext(ctx, partition, offset, userID) - return err +func (s *keyChangesStatements) Prepare() (err error) { + if s.upsertKeyChangeStmt, err = s.db.Prepare(upsertKeyChangeSQL); err != nil { + return err + } + if s.selectKeyChangesStmt, err = s.db.Prepare(selectKeyChangesSQL); err != nil { + return err + } + return nil +} + +func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID string) (changeID int64, err error) { + err = s.upsertKeyChangeStmt.QueryRowContext(ctx, userID).Scan(&changeID) + return } func (s *keyChangesStatements) SelectKeyChanges( - ctx context.Context, partition int32, fromOffset, toOffset int64, + ctx context.Context, fromOffset, toOffset int64, ) (userIDs []string, latestOffset int64, err error) { if toOffset == sarama.OffsetNewest { toOffset = math.MaxInt64 } latestOffset = fromOffset - rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset) + rows, err := s.selectKeyChangesStmt.QueryContext(ctx, fromOffset, toOffset) if err != nil { return nil, 0, err } diff --git a/keyserver/storage/sqlite3/storage.go b/keyserver/storage/sqlite3/storage.go index ee1746cd6..50ce00d05 100644 --- a/keyserver/storage/sqlite3/storage.go +++ b/keyserver/storage/sqlite3/storage.go @@ -17,6 +17,7 @@ package sqlite3 import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/keyserver/storage/shared" + "github.com/matrix-org/dendrite/keyserver/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/setup/config" ) @@ -49,6 +50,15 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) if err != nil { return nil, err } + + m := sqlutil.NewMigrations() + deltas.LoadRefactorKeyChanges(m) + if err = m.RunDeltas(db, dbProperties); err != nil { + return nil, err + } + if err = kc.Prepare(); err != nil { + return nil, err + } d := &shared.Database{ DB: db, Writer: sqlutil.NewExclusiveWriter(), diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go index 4e0a8af1d..2f8cf809b 100644 --- a/keyserver/storage/storage_test.go +++ b/keyserver/storage/storage_test.go @@ -44,15 +44,18 @@ func MustNotError(t *testing.T, err error) { func TestKeyChanges(t *testing.T) { db, clean := MustCreateDatabase(t) defer clean() - MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost")) - MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@bob:localhost")) - MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@charlie:localhost")) - userIDs, latest, err := db.KeyChanges(ctx, 0, 1, sarama.OffsetNewest) + _, err := db.StoreKeyChange(ctx, "@alice:localhost") + MustNotError(t, err) + deviceChangeIDB, err := db.StoreKeyChange(ctx, "@bob:localhost") + MustNotError(t, err) + deviceChangeIDC, err := db.StoreKeyChange(ctx, "@charlie:localhost") + MustNotError(t, err) + userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, sarama.OffsetNewest) if err != nil { t.Fatalf("Failed to KeyChanges: %s", err) } - if latest != 2 { - t.Fatalf("KeyChanges: got latest=%d want 2", latest) + if latest != deviceChangeIDC { + t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeIDC) } if !reflect.DeepEqual(userIDs, []string{"@charlie:localhost"}) { t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs) @@ -62,15 +65,21 @@ func TestKeyChanges(t *testing.T) { func TestKeyChangesNoDupes(t *testing.T) { db, clean := MustCreateDatabase(t) defer clean() - MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost")) - MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@alice:localhost")) - MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@alice:localhost")) - userIDs, latest, err := db.KeyChanges(ctx, 0, 0, sarama.OffsetNewest) + deviceChangeIDA, err := db.StoreKeyChange(ctx, "@alice:localhost") + MustNotError(t, err) + deviceChangeIDB, err := db.StoreKeyChange(ctx, "@alice:localhost") + MustNotError(t, err) + if deviceChangeIDA == deviceChangeIDB { + t.Fatalf("Expected change ID to be different even when inserting key change for the same user, got %d for both changes", deviceChangeIDA) + } + deviceChangeID, err := db.StoreKeyChange(ctx, "@alice:localhost") + MustNotError(t, err) + userIDs, latest, err := db.KeyChanges(ctx, 0, sarama.OffsetNewest) if err != nil { t.Fatalf("Failed to KeyChanges: %s", err) } - if latest != 2 { - t.Fatalf("KeyChanges: got latest=%d want 2", latest) + if latest != deviceChangeID { + t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeID) } if !reflect.DeepEqual(userIDs, []string{"@alice:localhost"}) { t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs) @@ -80,15 +89,18 @@ func TestKeyChangesNoDupes(t *testing.T) { func TestKeyChangesUpperLimit(t *testing.T) { db, clean := MustCreateDatabase(t) defer clean() - MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost")) - MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@bob:localhost")) - MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@charlie:localhost")) - userIDs, latest, err := db.KeyChanges(ctx, 0, 0, 1) + deviceChangeIDA, err := db.StoreKeyChange(ctx, "@alice:localhost") + MustNotError(t, err) + deviceChangeIDB, err := db.StoreKeyChange(ctx, "@bob:localhost") + MustNotError(t, err) + _, err = db.StoreKeyChange(ctx, "@charlie:localhost") + MustNotError(t, err) + userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDA, deviceChangeIDB) if err != nil { t.Fatalf("Failed to KeyChanges: %s", err) } - if latest != 1 { - t.Fatalf("KeyChanges: got latest=%d want 1", latest) + if latest != deviceChangeIDB { + t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeIDB) } if !reflect.DeepEqual(userIDs, []string{"@bob:localhost"}) { t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs) diff --git a/keyserver/storage/tables/interface.go b/keyserver/storage/tables/interface.go index 612eeb867..0d94c94cc 100644 --- a/keyserver/storage/tables/interface.go +++ b/keyserver/storage/tables/interface.go @@ -44,10 +44,12 @@ type DeviceKeys interface { } type KeyChanges interface { - InsertKeyChange(ctx context.Context, partition int32, offset int64, userID string) error + InsertKeyChange(ctx context.Context, userID string) (int64, error) // SelectKeyChanges returns the set (de-duplicated) of users who have changed their keys between the two offsets. // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of sarama.OffsetNewest means no upper offset. - SelectKeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) + SelectKeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) + + Prepare() error } type StaleDeviceLists interface { diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index d63e4832b..97685cc04 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -17,7 +17,6 @@ package consumers import ( "context" "encoding/json" - "sync" "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" @@ -34,16 +33,14 @@ import ( // OutputKeyChangeEventConsumer consumes events that originated in the key server. type OutputKeyChangeEventConsumer struct { - ctx context.Context - keyChangeConsumer *internal.ContinualConsumer - db storage.Database - notifier *notifier.Notifier - stream types.StreamProvider - serverName gomatrixserverlib.ServerName // our server name - rsAPI roomserverAPI.RoomserverInternalAPI - keyAPI api.KeyInternalAPI - partitionToOffset map[int32]int64 - partitionToOffsetMu sync.Mutex + ctx context.Context + keyChangeConsumer *internal.ContinualConsumer + db storage.Database + notifier *notifier.Notifier + stream types.StreamProvider + serverName gomatrixserverlib.ServerName // our server name + rsAPI roomserverAPI.RoomserverInternalAPI + keyAPI api.KeyInternalAPI } // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. @@ -69,16 +66,14 @@ func NewOutputKeyChangeEventConsumer( } s := &OutputKeyChangeEventConsumer{ - ctx: process.Context(), - keyChangeConsumer: &consumer, - db: store, - serverName: serverName, - keyAPI: keyAPI, - rsAPI: rsAPI, - partitionToOffset: make(map[int32]int64), - partitionToOffsetMu: sync.Mutex{}, - notifier: notifier, - stream: stream, + ctx: process.Context(), + keyChangeConsumer: &consumer, + db: store, + serverName: serverName, + keyAPI: keyAPI, + rsAPI: rsAPI, + notifier: notifier, + stream: stream, } consumer.ProcessMessage = s.onMessage @@ -88,24 +83,10 @@ func NewOutputKeyChangeEventConsumer( // Start consuming from the key server func (s *OutputKeyChangeEventConsumer) Start() error { - offsets, err := s.keyChangeConsumer.StartOffsets() - s.partitionToOffsetMu.Lock() - for _, o := range offsets { - s.partitionToOffset[o.Partition] = o.Offset - } - s.partitionToOffsetMu.Unlock() - return err -} - -func (s *OutputKeyChangeEventConsumer) updateOffset(msg *sarama.ConsumerMessage) { - s.partitionToOffsetMu.Lock() - defer s.partitionToOffsetMu.Unlock() - s.partitionToOffset[msg.Partition] = msg.Offset + return s.keyChangeConsumer.Start() } func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - defer s.updateOffset(msg) - var m api.DeviceMessage if err := json.Unmarshal(msg.Value, &m); err != nil { logrus.WithError(err).Errorf("failed to read device message from key change topic") @@ -118,15 +99,15 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } switch m.Type { case api.TypeCrossSigningUpdate: - return s.onCrossSigningMessage(m, msg.Offset) + return s.onCrossSigningMessage(m, m.DeviceChangeID) case api.TypeDeviceKeyUpdate: fallthrough default: - return s.onDeviceKeyMessage(m, msg.Offset) + return s.onDeviceKeyMessage(m, m.DeviceChangeID) } } -func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64) error { +func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, deviceChangeID int64) error { if m.DeviceKeys == nil { return nil } @@ -143,7 +124,7 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, o } // make sure we get our own key updates too! queryRes.UserIDsToCount[output.UserID] = 1 - posUpdate := types.StreamPosition(offset) + posUpdate := types.StreamPosition(deviceChangeID) s.stream.Advance(posUpdate) for userID := range queryRes.UserIDsToCount { @@ -153,7 +134,7 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, o return nil } -func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, offset int64) error { +func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, deviceChangeID int64) error { output := m.CrossSigningKeyUpdate // work out who we need to notify about the new key var queryRes roomserverAPI.QuerySharedUsersResponse @@ -167,7 +148,7 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage } // make sure we get our own key updates too! queryRes.UserIDsToCount[output.UserID] = 1 - posUpdate := types.StreamPosition(offset) + posUpdate := types.StreamPosition(deviceChangeID) s.stream.Advance(posUpdate) for userID := range queryRes.UserIDsToCount {