From 7511cf0c789646f7b198fa1d8ce294093fba21b7 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 15 Jul 2021 15:53:37 +0100 Subject: [PATCH] Add notary server tables for postgres --- federationsender/storage/interface.go | 6 + .../postgres/notary_server_keys_json_table.go | 64 +++++++ .../notary_server_keys_metadata_table.go | 162 ++++++++++++++++++ federationsender/storage/postgres/storage.go | 11 ++ federationsender/storage/shared/storage.go | 45 +++++ federationsender/storage/tables/interface.go | 24 +++ 6 files changed, 312 insertions(+) create mode 100644 federationsender/storage/postgres/notary_server_keys_json_table.go create mode 100644 federationsender/storage/postgres/notary_server_keys_metadata_table.go diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index 9c5ac0042..58c8a7cfa 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -66,4 +66,10 @@ type Database interface { RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) + + // Update the notary with the given server keys from the given server name. + UpdateNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, serverKeys gomatrixserverlib.ServerKeys) error + // Query the notary for the server keys for the given server. If `optKeyIDs` is not empty, multiple server keys may be returned (between 1 - len(optKeyIDs)) + // such that the combination of all server keys will include all the `optKeyIDs`. + GetNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, optKeyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error) } diff --git a/federationsender/storage/postgres/notary_server_keys_json_table.go b/federationsender/storage/postgres/notary_server_keys_json_table.go new file mode 100644 index 000000000..42e58ba79 --- /dev/null +++ b/federationsender/storage/postgres/notary_server_keys_json_table.go @@ -0,0 +1,64 @@ +// Copyright 2021 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/federationsender/storage/tables" + "github.com/matrix-org/gomatrixserverlib" +) + +const notaryServerKeysJSONSchema = ` +CREATE SEQUENCE IF NOT EXISTS federationsender_notary_server_keys_json_pkey; +CREATE TABLE IF NOT EXISTS federationsender_notary_server_keys_json ( + notary_id BIGINT PRIMARY KEY NOT NULL DEFAULT nextval('federationsender_notary_server_keys_json_pkey'), + response_json TEXT NOT NULL, + server_name TEXT NOT NULL, + valid_until BIGINT NOT NULL +); +` + +const insertServerKeysJSONSQL = "" + + "INSERT INTO federationsender_notary_server_keys_json (response_json, server_name, valid_until) VALUES ($1, $2, $3)" + + " RETURNING notary_id" + +type notaryServerKeysStatements struct { + db *sql.DB + insertServerKeysJSONStmt *sql.Stmt +} + +func NewPostgresNotaryServerKeysTable(db *sql.DB) (s *notaryServerKeysStatements, err error) { + s = ¬aryServerKeysStatements{ + db: db, + } + _, err = db.Exec(notaryServerKeysJSONSchema) + if err != nil { + return + } + + if s.insertServerKeysJSONStmt, err = db.Prepare(insertServerKeysJSONSQL); err != nil { + return + } + return +} + +func (s *notaryServerKeysStatements) InsertJSONResponse( + ctx context.Context, txn *sql.Tx, keyQueryResponseJSON gomatrixserverlib.ServerKeys, serverName gomatrixserverlib.ServerName, validUntil gomatrixserverlib.Timestamp, +) (tables.NotaryID, error) { + var notaryID tables.NotaryID + return notaryID, txn.Stmt(s.insertServerKeysJSONStmt).QueryRowContext(ctx, string(keyQueryResponseJSON.Raw), serverName, validUntil).Scan(¬aryID) +} diff --git a/federationsender/storage/postgres/notary_server_keys_metadata_table.go b/federationsender/storage/postgres/notary_server_keys_metadata_table.go new file mode 100644 index 000000000..3c3e6d143 --- /dev/null +++ b/federationsender/storage/postgres/notary_server_keys_metadata_table.go @@ -0,0 +1,162 @@ +// Copyright 2021 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/federationsender/storage/tables" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/gomatrixserverlib" +) + +const notaryServerKeysMetadataSchema = ` +CREATE TABLE IF NOT EXISTS federationsender_notary_server_keys_metadata ( + notary_id BIGINT NOT NULL, + server_name TEXT NOT NULL, + key_id TEXT NOT NULL, + UNIQUE (server_name, key_id) +); +` + +const upsertServerKeysSQL = "" + + "INSERT INTO federationsender_notary_server_keys_metadata (notary_id, server_name, key_id) VALUES ($1, $2, $3)" + + " ON CONFLICT (server_name, key_id) DO UPDATE SET notary_id = $1" + +// for a given (server_name, key_id), find the existing notary ID and valid until. Used to check if we will replace it +// JOINs with the json table +const selectNotaryKeyMetadataSQL = ` + SELECT federationsender_notary_server_keys_metadata.notary_id, valid_until FROM federationsender_notary_server_keys_json + JOIN federationsender_notary_server_keys_metadata ON + federationsender_notary_server_keys_metadata.notary_id = federationsender_notary_server_keys_json.notary_id + WHERE federationsender_notary_server_keys_metadata.server_name = $1 AND federationsender_notary_server_keys_metadata.key_id = $2 +` + +// select the response which has the highest valid_until value +// JOINs with the json table +const selectNotaryKeyResponsesSQL = ` + SELECT response_json FROM federationsender_notary_server_keys_json + WHERE server_name = $1 AND valid_until = ( + SELECT MAX(valid_until) FROM federationsender_notary_server_keys_json WHERE server_name = $1 + ) +` + +// select the responses which have the given key IDs +// JOINs with the json table +const selectNotaryKeyResponsesWithKeyIDsSQL = ` + SELECT response_json FROM federationsender_notary_server_keys_json + JOIN federationsender_notary_server_keys_metadata ON + federationsender_notary_server_keys_metadata.notary_id = federationsender_notary_server_keys_json.notary_id + WHERE federationsender_notary_server_keys_json.server_name = $1 AND federationsender_notary_server_keys_metadata.key_id = ANY ($2) + GROUP BY federationsender_notary_server_keys_json.notary_id +` + +// JOINs with the metadata table +const deleteUnusedServerKeysJSONSQL = ` + DELETE FROM federationsender_notary_server_keys_json WHERE federationsender_notary_server_keys_json.notary_id NOT IN ( + SELECT DISTINCT notary_id FROM federationsender_notary_server_keys_metadata + ) +` + +type notaryServerKeysMetadataStatements struct { + db *sql.DB + upsertServerKeysStmt *sql.Stmt + selectNotaryKeyResponsesStmt *sql.Stmt + selectNotaryKeyResponsesWithKeyIDsStmt *sql.Stmt + selectNotaryKeyMetadataStmt *sql.Stmt + deleteUnusedServerKeysJSONStmt *sql.Stmt +} + +func NewPostgresNotaryServerKeysMetadataTable(db *sql.DB) (s *notaryServerKeysMetadataStatements, err error) { + s = ¬aryServerKeysMetadataStatements{ + db: db, + } + _, err = db.Exec(notaryServerKeysMetadataSchema) + if err != nil { + return + } + + if s.upsertServerKeysStmt, err = db.Prepare(upsertServerKeysSQL); err != nil { + return + } + if s.selectNotaryKeyResponsesStmt, err = db.Prepare(selectNotaryKeyResponsesSQL); err != nil { + return + } + if s.selectNotaryKeyResponsesWithKeyIDsStmt, err = db.Prepare(selectNotaryKeyResponsesWithKeyIDsSQL); err != nil { + return + } + if s.selectNotaryKeyMetadataStmt, err = db.Prepare(selectNotaryKeyMetadataSQL); err != nil { + return + } + if s.deleteUnusedServerKeysJSONStmt, err = db.Prepare(deleteUnusedServerKeysJSONSQL); err != nil { + return + } + return +} + +func (s *notaryServerKeysMetadataStatements) UpsertKey( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, keyID gomatrixserverlib.KeyID, newNotaryID tables.NotaryID, newValidUntil gomatrixserverlib.Timestamp, +) (tables.NotaryID, error) { + notaryID := newNotaryID + // see if the existing notary ID a) exists, b) has a longer valid_until + var existingNotaryID tables.NotaryID + var existingValidUntil gomatrixserverlib.Timestamp + if err := txn.Stmt(s.selectNotaryKeyMetadataStmt).QueryRowContext(ctx, serverName, keyID).Scan(&existingNotaryID, &existingValidUntil); err != nil { + if err != sql.ErrNoRows { + return 0, err + } + } + if existingValidUntil.Time().After(newValidUntil.Time()) { + // the existing valid_until is valid longer, so use that. + return existingNotaryID, nil + } + // overwrite the notary_id for this (server_name, key_id) tuple + _, err := txn.Stmt(s.upsertServerKeysStmt).ExecContext(ctx, notaryID, serverName, keyID) + return notaryID, err +} + +func (s *notaryServerKeysMetadataStatements) SelectKeys(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, keyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error) { + var rows *sql.Rows + var err error + if len(keyIDs) == 0 { + rows, err = txn.Stmt(s.selectNotaryKeyResponsesStmt).QueryContext(ctx, string(serverName)) + } else { + keyIDstr := make([]string, len(keyIDs)) + for i := range keyIDs { + keyIDstr[i] = string(keyIDs[i]) + } + rows, err = txn.Stmt(s.selectNotaryKeyResponsesWithKeyIDsStmt).QueryContext(ctx, string(serverName), pq.StringArray(keyIDstr)) + } + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectNotaryKeyResponsesStmt close failed") + var results []gomatrixserverlib.ServerKeys + for rows.Next() { + var sk gomatrixserverlib.ServerKeys + if err := rows.Scan(&sk.Raw); err != nil { + return nil, err + } + results = append(results, sk) + } + return results, nil +} + +func (s *notaryServerKeysMetadataStatements) DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error { + _, err := txn.Stmt(s.deleteUnusedServerKeysJSONStmt).ExecContext(ctx) + return err +} diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index 5edc08ad7..5507bad78 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -17,6 +17,7 @@ package postgres import ( "database/sql" + "fmt" "github.com/matrix-org/dendrite/federationsender/storage/postgres/deltas" "github.com/matrix-org/dendrite/federationsender/storage/shared" @@ -69,6 +70,14 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } + notaryJSON, err := NewPostgresNotaryServerKeysTable(d.db) + if err != nil { + return nil, fmt.Errorf("NewPostgresNotaryServerKeysTable: %s", err) + } + notaryMetadata, err := NewPostgresNotaryServerKeysMetadataTable(d.db) + if err != nil { + return nil, fmt.Errorf("NewPostgresNotaryServerKeysMetadataTable: %s", err) + } m := sqlutil.NewMigrations() deltas.LoadRemoveRoomsTable(m) if err = m.RunDeltas(d.db, dbProperties); err != nil { @@ -85,6 +94,8 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS FederationSenderBlacklist: blacklist, FederationSenderInboundPeeks: inboundPeeks, FederationSenderOutboundPeeks: outboundPeeks, + NotaryServerKeysJSON: notaryJSON, + NotaryServerKeysMetadata: notaryMetadata, } if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { return nil, err diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index 2d4099229..6f4b1986a 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "fmt" + "time" "github.com/matrix-org/dendrite/federationsender/storage/tables" "github.com/matrix-org/dendrite/federationsender/types" @@ -37,6 +38,8 @@ type Database struct { FederationSenderBlacklist tables.FederationSenderBlacklist FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks FederationSenderInboundPeeks tables.FederationSenderInboundPeeks + NotaryServerKeysJSON tables.FederationSenderNotaryServerKeysJSON + NotaryServerKeysMetadata tables.FederationSenderNotaryServerKeysMetadata } // An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. @@ -197,3 +200,45 @@ func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserver func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) { return d.FederationSenderInboundPeeks.SelectInboundPeeks(ctx, nil, roomID) } + +func (d *Database) UpdateNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, serverKeys gomatrixserverlib.ServerKeys) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + validUntil := serverKeys.ValidUntilTS + // Servers MUST use the lesser of this field and 7 days into the future when determining if a key is valid. + // This is to avoid a situation where an attacker publishes a key which is valid for a significant amount of + // time without a way for the homeserver owner to revoke it. + // https://spec.matrix.org/unstable/server-server-api/#querying-keys-through-another-server + weekIntoFuture := time.Now().Add(7 * 24 * time.Hour) + if weekIntoFuture.Before(validUntil.Time()) { + validUntil = gomatrixserverlib.AsTimestamp(weekIntoFuture) + } + notaryID, err := d.NotaryServerKeysJSON.InsertJSONResponse(ctx, txn, serverKeys, serverName, validUntil) + if err != nil { + return err + } + // update the metadata for the keys + for keyID := range serverKeys.OldVerifyKeys { + _, err = d.NotaryServerKeysMetadata.UpsertKey(ctx, txn, serverName, keyID, notaryID, validUntil) + if err != nil { + return err + } + } + for keyID := range serverKeys.VerifyKeys { + _, err = d.NotaryServerKeysMetadata.UpsertKey(ctx, txn, serverName, keyID, notaryID, validUntil) + if err != nil { + return err + } + } + return nil + }) +} + +func (d *Database) GetNotaryKeys( + ctx context.Context, serverName gomatrixserverlib.ServerName, optKeyIDs []gomatrixserverlib.KeyID, +) (sks []gomatrixserverlib.ServerKeys, err error) { + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + sks, err = d.NotaryServerKeysMetadata.SelectKeys(ctx, txn, serverName, optKeyIDs) + return err + }) + return sks, err +} diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index 995b6f47a..663a4cb20 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -22,6 +22,8 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +type NotaryID int64 + type FederationSenderQueuePDUs interface { InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error @@ -80,3 +82,25 @@ type FederationSenderInboundPeeks interface { DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error) DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) } + +// FederationSenderNotaryServerKeysJSON contains the byte-for-byte responses from servers which contain their keys and is signed by them. +type FederationSenderNotaryServerKeysJSON interface { + // InsertJSONResponse inserts a new response JSON. Useless on its own, needs querying via FederationSenderNotaryServerKeysMetadata + // `validUntil` should be the value of `valid_until_ts` with the 7-day check applied from: + // "Servers MUST use the lesser of this field and 7 days into the future when determining if a key is valid. + // This is to avoid a situation where an attacker publishes a key which is valid for a significant amount of time + // without a way for the homeserver owner to revoke it."" + InsertJSONResponse(ctx context.Context, txn *sql.Tx, keyQueryResponseJSON gomatrixserverlib.ServerKeys, serverName gomatrixserverlib.ServerName, validUntil gomatrixserverlib.Timestamp) (NotaryID, error) +} + +// FederationSenderNotaryServerKeysMetadata persists the metadata for FederationSenderNotaryServerKeysJSON +type FederationSenderNotaryServerKeysMetadata interface { + // UpsertKey updates or inserts a (server_name, key_id) tuple, pointing it via NotaryID at the the response which has the longest valid_until_ts + // `newNotaryID` and `newValidUntil` should be the notary ID / valid_until which has this (server_name, key_id) tuple already, e.g one you just inserted. + UpsertKey(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, keyID gomatrixserverlib.KeyID, newNotaryID NotaryID, newValidUntil gomatrixserverlib.Timestamp) (NotaryID, error) + // SelectKeys returns the signed JSON objects which contain the given key IDs. This will be at most the length of `keyIDs` and at least 1 (assuming + // the keys exist in the first place). If `keyIDs` is empty, the signed JSON object with the longest valid_until_ts will be returned. + SelectKeys(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, keyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error) + // DeleteOldJSONResponses removes all responses which are not referenced in FederationSenderNotaryServerKeysMetadata + DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error +}