diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go index 2281f4bbf..08bbfedb8 100644 --- a/keyserver/internal/cross_signing.go +++ b/keyserver/internal/cross_signing.go @@ -362,6 +362,13 @@ func (a *KeyInternalAPI) processSelfSignatures( for targetKeyID, signature := range forTargetUserID { switch sig := signature.CrossSigningBody.(type) { case *gomatrixserverlib.CrossSigningKey: + for keyID := range sig.Keys { + split := strings.SplitN(string(keyID), ":", 2) + if len(split) > 1 && gomatrixserverlib.KeyID(split[1]) == targetKeyID { + targetKeyID = keyID // contains the ed25519: or other scheme + break + } + } for originUserID, forOriginUserID := range sig.Signatures { for originKeyID, originSig := range forOriginUserID { if err := a.DB.StoreCrossSigningSigsForTarget( diff --git a/keyserver/storage/postgres/cross_signing_sigs_table.go b/keyserver/storage/postgres/cross_signing_sigs_table.go index 40633c05c..b101e7ce5 100644 --- a/keyserver/storage/postgres/cross_signing_sigs_table.go +++ b/keyserver/storage/postgres/cross_signing_sigs_table.go @@ -33,8 +33,10 @@ CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs ( target_user_id TEXT NOT NULL, target_key_id TEXT NOT NULL, signature TEXT NOT NULL, - PRIMARY KEY (origin_user_id, target_user_id, target_key_id) + PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id) ); + +CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id); ` const selectCrossSigningSigsForTargetSQL = "" + @@ -44,7 +46,7 @@ const selectCrossSigningSigsForTargetSQL = "" + const upsertCrossSigningSigsForTargetSQL = "" + "INSERT INTO keyserver_cross_signing_sigs (origin_user_id, origin_key_id, target_user_id, target_key_id, signature)" + " VALUES($1, $2, $3, $4, $5)" + - " ON CONFLICT (origin_user_id, target_user_id, target_key_id) DO UPDATE SET (origin_key_id, signature) = ($2, $5)" + " ON CONFLICT (origin_user_id, origin_key_id, target_user_id, target_key_id) DO UPDATE SET signature = $5" const deleteCrossSigningSigsForTargetSQL = "" + "DELETE FROM keyserver_cross_signing_sigs WHERE target_user_id=$1 AND target_key_id=$2" diff --git a/keyserver/storage/postgres/deltas/2022042612000000_xsigning_idx.go b/keyserver/storage/postgres/deltas/2022042612000000_xsigning_idx.go new file mode 100644 index 000000000..12956e3b4 --- /dev/null +++ b/keyserver/storage/postgres/deltas/2022042612000000_xsigning_idx.go @@ -0,0 +1,52 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +func LoadFixCrossSigningSignatureIndexes(m *sqlutil.Migrations) { + m.AddMigration(UpFixCrossSigningSignatureIndexes, DownFixCrossSigningSignatureIndexes) +} + +func UpFixCrossSigningSignatureIndexes(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE keyserver_cross_signing_sigs DROP CONSTRAINT keyserver_cross_signing_sigs_pkey; + ALTER TABLE keyserver_cross_signing_sigs ADD PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id); + + CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id); + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownFixCrossSigningSignatureIndexes(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE keyserver_cross_signing_sigs DROP CONSTRAINT keyserver_cross_signing_sigs_pkey; + ALTER TABLE keyserver_cross_signing_sigs ADD PRIMARY KEY (origin_user_id, target_user_id, target_key_id); + + DROP INDEX IF EXISTS keyserver_cross_signing_sigs_idx; + `) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/keyserver/storage/postgres/storage.go b/keyserver/storage/postgres/storage.go index 136986885..d4c7e2cc7 100644 --- a/keyserver/storage/postgres/storage.go +++ b/keyserver/storage/postgres/storage.go @@ -54,6 +54,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) } m := sqlutil.NewMigrations() deltas.LoadRefactorKeyChanges(m) + deltas.LoadFixCrossSigningSignatureIndexes(m) if err = m.RunDeltas(db, dbProperties); err != nil { return nil, err } diff --git a/keyserver/storage/sqlite3/cross_signing_sigs_table.go b/keyserver/storage/sqlite3/cross_signing_sigs_table.go index 29ee889fd..36d562b8a 100644 --- a/keyserver/storage/sqlite3/cross_signing_sigs_table.go +++ b/keyserver/storage/sqlite3/cross_signing_sigs_table.go @@ -33,8 +33,10 @@ CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs ( target_user_id TEXT NOT NULL, target_key_id TEXT NOT NULL, signature TEXT NOT NULL, - PRIMARY KEY (origin_user_id, target_user_id, target_key_id) + PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id) ); + +CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id); ` const selectCrossSigningSigsForTargetSQL = "" + diff --git a/keyserver/storage/sqlite3/deltas/2022042612000000_xsigning_idx.go b/keyserver/storage/sqlite3/deltas/2022042612000000_xsigning_idx.go new file mode 100644 index 000000000..230e39fef --- /dev/null +++ b/keyserver/storage/sqlite3/deltas/2022042612000000_xsigning_idx.go @@ -0,0 +1,76 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +func LoadFixCrossSigningSignatureIndexes(m *sqlutil.Migrations) { + m.AddMigration(UpFixCrossSigningSignatureIndexes, DownFixCrossSigningSignatureIndexes) +} + +func UpFixCrossSigningSignatureIndexes(tx *sql.Tx) error { + _, err := tx.Exec(` + CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs_tmp ( + origin_user_id TEXT NOT NULL, + origin_key_id TEXT NOT NULL, + target_user_id TEXT NOT NULL, + target_key_id TEXT NOT NULL, + signature TEXT NOT NULL, + PRIMARY KEY (origin_user_id, origin_key_id, target_user_id, target_key_id) + ); + + INSERT INTO keyserver_cross_signing_sigs_tmp (origin_user_id, origin_key_id, target_user_id, target_key_id, signature) + SELECT origin_user_id, origin_key_id, target_user_id, target_key_id, signature FROM keyserver_cross_signing_sigs; + + DROP TABLE keyserver_cross_signing_sigs; + ALTER TABLE keyserver_cross_signing_sigs_tmp RENAME TO keyserver_cross_signing_sigs; + + CREATE INDEX IF NOT EXISTS keyserver_cross_signing_sigs_idx ON keyserver_cross_signing_sigs (origin_user_id, target_user_id, target_key_id); + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownFixCrossSigningSignatureIndexes(tx *sql.Tx) error { + _, err := tx.Exec(` + CREATE TABLE IF NOT EXISTS keyserver_cross_signing_sigs_tmp ( + origin_user_id TEXT NOT NULL, + origin_key_id TEXT NOT NULL, + target_user_id TEXT NOT NULL, + target_key_id TEXT NOT NULL, + signature TEXT NOT NULL, + PRIMARY KEY (origin_user_id, target_user_id, target_key_id) + ); + + INSERT INTO keyserver_cross_signing_sigs_tmp (origin_user_id, origin_key_id, target_user_id, target_key_id, signature) + SELECT origin_user_id, origin_key_id, target_user_id, target_key_id, signature FROM keyserver_cross_signing_sigs; + + DROP TABLE keyserver_cross_signing_sigs; + ALTER TABLE keyserver_cross_signing_sigs_tmp RENAME TO keyserver_cross_signing_sigs; + + DELETE INDEX IF EXISTS keyserver_cross_signing_sigs_idx; + `) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/keyserver/storage/sqlite3/storage.go b/keyserver/storage/sqlite3/storage.go index 0e0adceef..84d4cdf55 100644 --- a/keyserver/storage/sqlite3/storage.go +++ b/keyserver/storage/sqlite3/storage.go @@ -53,6 +53,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) m := sqlutil.NewMigrations() deltas.LoadRefactorKeyChanges(m) + deltas.LoadFixCrossSigningSignatureIndexes(m) if err = m.RunDeltas(db, dbProperties); err != nil { return nil, err } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 0a6711e5d..dbefbf3b0 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -81,7 +81,7 @@ type Database interface { // Returns a map following the format data[roomID] = []dataTypes // If no data is retrieved, returns an empty map // If there was an issue with the retrieval, returns an error - GetAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) + GetAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, types.StreamPosition, error) // UpsertAccountData keeps track of new or updated account data, by saving the type // of the new/updated data, and the user ID and room ID the data is related to (empty) // room ID means the data isn't specific to any room) diff --git a/syncapi/storage/postgres/account_data_table.go b/syncapi/storage/postgres/account_data_table.go index 25bdb1da3..22bb4d7fa 100644 --- a/syncapi/storage/postgres/account_data_table.go +++ b/syncapi/storage/postgres/account_data_table.go @@ -57,7 +57,7 @@ const insertAccountDataSQL = "" + " RETURNING id" const selectAccountDataInRangeSQL = "" + - "SELECT room_id, type FROM syncapi_account_data_type" + + "SELECT id, room_id, type FROM syncapi_account_data_type" + " WHERE user_id = $1 AND id > $2 AND id <= $3" + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" + @@ -103,7 +103,7 @@ func (s *accountDataStatements) SelectAccountDataInRange( userID string, r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter, -) (data map[string][]string, err error) { +) (data map[string][]string, pos types.StreamPosition, err error) { data = make(map[string][]string) rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High(), @@ -116,11 +116,12 @@ func (s *accountDataStatements) SelectAccountDataInRange( } defer internal.CloseAndLogIfError(ctx, rows, "selectAccountDataInRange: rows.close() failed") - for rows.Next() { - var dataType string - var roomID string + var dataType string + var roomID string + var id types.StreamPosition - if err = rows.Scan(&roomID, &dataType); err != nil { + for rows.Next() { + if err = rows.Scan(&id, &roomID, &dataType); err != nil { return } @@ -129,8 +130,11 @@ func (s *accountDataStatements) SelectAccountDataInRange( } else { data[roomID] = []string{dataType} } + if id > pos { + pos = id + } } - return data, rows.Err() + return data, pos, rows.Err() } func (s *accountDataStatements) SelectMaxAccountDataID( diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 97d8236b5..5a0a34105 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -265,7 +265,7 @@ func (d *Database) DeletePeeks( func (d *Database) GetAccountDataInRange( ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter, -) (map[string][]string, error) { +) (map[string][]string, types.StreamPosition, error) { return d.AccountData.SelectAccountDataInRange(ctx, userID, r, accountDataFilterPart) } diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go index 71a098177..e0d97ec32 100644 --- a/syncapi/storage/sqlite3/account_data_table.go +++ b/syncapi/storage/sqlite3/account_data_table.go @@ -43,7 +43,7 @@ const insertAccountDataSQL = "" + // further parameters are added by prepareWithFilters const selectAccountDataInRangeSQL = "" + - "SELECT room_id, type FROM syncapi_account_data_type" + + "SELECT id, room_id, type FROM syncapi_account_data_type" + " WHERE user_id = $1 AND id > $2 AND id <= $3" const selectMaxAccountDataIDSQL = "" + @@ -95,7 +95,7 @@ func (s *accountDataStatements) SelectAccountDataInRange( userID string, r types.Range, filter *gomatrixserverlib.EventFilter, -) (data map[string][]string, err error) { +) (data map[string][]string, pos types.StreamPosition, err error) { data = make(map[string][]string) stmt, params, err := prepareWithFilters( s.db, nil, selectAccountDataInRangeSQL, @@ -112,11 +112,12 @@ func (s *accountDataStatements) SelectAccountDataInRange( } defer internal.CloseAndLogIfError(ctx, rows, "selectAccountDataInRange: rows.close() failed") - for rows.Next() { - var dataType string - var roomID string + var dataType string + var roomID string + var id types.StreamPosition - if err = rows.Scan(&roomID, &dataType); err != nil { + for rows.Next() { + if err = rows.Scan(&id, &roomID, &dataType); err != nil { return } @@ -125,9 +126,12 @@ func (s *accountDataStatements) SelectAccountDataInRange( } else { data[roomID] = []string{dataType} } + if id > pos { + pos = id + } } - return data, nil + return data, pos, nil } func (s *accountDataStatements) SelectMaxAccountDataID( diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index f4ab33ce7..bc6e2da5c 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -27,7 +27,7 @@ import ( type AccountData interface { InsertAccountData(ctx context.Context, txn *sql.Tx, userID, roomID, dataType string) (pos types.StreamPosition, err error) // SelectAccountDataInRange returns a map of room ID to a list of `dataType`. - SelectAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error) + SelectAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, pos types.StreamPosition, err error) SelectMaxAccountDataID(ctx context.Context, txn *sql.Tx) (id int64, err error) } diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index 094c51485..99cd4a92a 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -43,7 +43,7 @@ func (p *AccountDataStreamProvider) IncrementalSync( To: to, } - dataTypes, err := p.DB.GetAccountDataInRange( + dataTypes, pos, err := p.DB.GetAccountDataInRange( ctx, req.Device.UserID, r, &req.Filter.AccountData, ) if err != nil { @@ -95,5 +95,5 @@ func (p *AccountDataStreamProvider) IncrementalSync( } } - return to + return pos }