From 59852a28aeee7f1c0831f523e78f139065c84c4e Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 22 Nov 2021 13:42:21 +0000 Subject: [PATCH] Add signing key server tables --- federationapi/storage/cache/keydb.go | 68 ++++++++ federationapi/storage/interface.go | 2 + .../storage/postgres/server_key_table.go | 146 +++++++++++++++++ federationapi/storage/postgres/storage.go | 5 + federationapi/storage/shared/storage.go | 1 + federationapi/storage/shared/storage_keys.go | 59 +++++++ .../storage/sqlite3/server_key_table.go | 155 ++++++++++++++++++ federationapi/storage/sqlite3/storage.go | 5 + federationapi/storage/tables/interface.go | 5 + 9 files changed, 446 insertions(+) create mode 100644 federationapi/storage/cache/keydb.go create mode 100644 federationapi/storage/postgres/server_key_table.go create mode 100644 federationapi/storage/shared/storage_keys.go create mode 100644 federationapi/storage/sqlite3/server_key_table.go diff --git a/federationapi/storage/cache/keydb.go b/federationapi/storage/cache/keydb.go new file mode 100644 index 000000000..2063dfc55 --- /dev/null +++ b/federationapi/storage/cache/keydb.go @@ -0,0 +1,68 @@ +package cache + +import ( + "context" + "errors" + + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/gomatrixserverlib" +) + +// A Database implements gomatrixserverlib.KeyDatabase and is used to store +// the public keys for other matrix servers. +type KeyDatabase struct { + inner gomatrixserverlib.KeyDatabase + cache caching.ServerKeyCache +} + +func NewKeyDatabase(inner gomatrixserverlib.KeyDatabase, cache caching.ServerKeyCache) (*KeyDatabase, error) { + if inner == nil { + return nil, errors.New("inner database can't be nil") + } + if cache == nil { + return nil, errors.New("cache can't be nil") + } + return &KeyDatabase{ + inner: inner, + cache: cache, + }, nil +} + +// FetcherName implements KeyFetcher +func (d KeyDatabase) FetcherName() string { + return "InMemoryKeyCache" +} + +// FetchKeys implements gomatrixserverlib.KeyDatabase +func (d *KeyDatabase) FetchKeys( + ctx context.Context, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + results := make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) + for req, ts := range requests { + if res, cached := d.cache.GetServerKey(req, ts); cached { + results[req] = res + delete(requests, req) + } + } + fromDB, err := d.inner.FetchKeys(ctx, requests) + if err != nil { + return results, err + } + for req, res := range fromDB { + results[req] = res + d.cache.StoreServerKey(req, res) + } + return results, nil +} + +// StoreKeys implements gomatrixserverlib.KeyDatabase +func (d *KeyDatabase) StoreKeys( + ctx context.Context, + keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, +) error { + for req, res := range keyMap { + d.cache.StoreServerKey(req, res) + } + return d.inner.StoreKeys(ctx, keyMap) +} diff --git a/federationapi/storage/interface.go b/federationapi/storage/interface.go index d55c66f85..3dfd69923 100644 --- a/federationapi/storage/interface.go +++ b/federationapi/storage/interface.go @@ -25,6 +25,8 @@ import ( type Database interface { internal.PartitionStorer + gomatrixserverlib.KeyDatabase + gomatrixserverlib.KeyFetcher UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) diff --git a/federationapi/storage/postgres/server_key_table.go b/federationapi/storage/postgres/server_key_table.go new file mode 100644 index 000000000..16e294e05 --- /dev/null +++ b/federationapi/storage/postgres/server_key_table.go @@ -0,0 +1,146 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const serverSigningKeysSchema = ` +-- A cache of signing keys downloaded from remote servers. +CREATE TABLE IF NOT EXISTS keydb_server_keys ( + -- The name of the matrix server the key is for. + server_name TEXT NOT NULL, + -- The ID of the server key. + server_key_id TEXT NOT NULL, + -- Combined server name and key ID separated by the ASCII unit separator + -- to make it easier to run bulk queries. + server_name_and_key_id TEXT NOT NULL, + -- When the key is valid until as a millisecond timestamp. + -- 0 if this is an expired key (in which case expired_ts will be non-zero) + valid_until_ts BIGINT NOT NULL, + -- When the key expired as a millisecond timestamp. + -- 0 if this is an active key (in which case valid_until_ts will be non-zero) + expired_ts BIGINT NOT NULL, + -- The base64-encoded public key. + server_key TEXT NOT NULL, + CONSTRAINT keydb_server_keys_unique UNIQUE (server_name, server_key_id) +); + +CREATE INDEX IF NOT EXISTS keydb_server_name_and_key_id ON keydb_server_keys (server_name_and_key_id); +` + +const bulkSelectServerSigningKeysSQL = "" + + "SELECT server_name, server_key_id, valid_until_ts, expired_ts, " + + " server_key FROM keydb_server_keys" + + " WHERE server_name_and_key_id = ANY($1)" + +const upsertServerSigningKeysSQL = "" + + "INSERT INTO keydb_server_keys (server_name, server_key_id," + + " server_name_and_key_id, valid_until_ts, expired_ts, server_key)" + + " VALUES ($1, $2, $3, $4, $5, $6)" + + " ON CONFLICT ON CONSTRAINT keydb_server_keys_unique" + + " DO UPDATE SET valid_until_ts = $4, expired_ts = $5, server_key = $6" + +type serverSigningKeyStatements struct { + bulkSelectServerKeysStmt *sql.Stmt + upsertServerKeysStmt *sql.Stmt +} + +func NewPostgresServerSigningKeysTable(db *sql.DB) (s *serverSigningKeyStatements, err error) { + s = &serverSigningKeyStatements{} + _, err = db.Exec(serverSigningKeysSchema) + if err != nil { + return + } + if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerSigningKeysSQL); err != nil { + return + } + if s.upsertServerKeysStmt, err = db.Prepare(upsertServerSigningKeysSQL); err != nil { + return + } + return s, nil +} + +func (s *serverSigningKeyStatements) BulkSelectServerKeys( + ctx context.Context, txn *sql.Tx, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + var nameAndKeyIDs []string + for request := range requests { + nameAndKeyIDs = append(nameAndKeyIDs, nameAndKeyID(request)) + } + stmt := s.bulkSelectServerKeysStmt + rows, err := stmt.QueryContext(ctx, pq.StringArray(nameAndKeyIDs)) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectServerKeys: rows.close() failed") + results := map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{} + for rows.Next() { + var serverName string + var keyID string + var key string + var validUntilTS int64 + var expiredTS int64 + if err = rows.Scan(&serverName, &keyID, &validUntilTS, &expiredTS, &key); err != nil { + return nil, err + } + r := gomatrixserverlib.PublicKeyLookupRequest{ + ServerName: gomatrixserverlib.ServerName(serverName), + KeyID: gomatrixserverlib.KeyID(keyID), + } + vk := gomatrixserverlib.VerifyKey{} + err = vk.Key.Decode(key) + if err != nil { + return nil, err + } + results[r] = gomatrixserverlib.PublicKeyLookupResult{ + VerifyKey: vk, + ValidUntilTS: gomatrixserverlib.Timestamp(validUntilTS), + ExpiredTS: gomatrixserverlib.Timestamp(expiredTS), + } + } + return results, rows.Err() +} + +func (s *serverSigningKeyStatements) UpsertServerKeys( + ctx context.Context, txn *sql.Tx, + request gomatrixserverlib.PublicKeyLookupRequest, + key gomatrixserverlib.PublicKeyLookupResult, +) error { + stmt := sqlutil.TxStmt(txn, s.upsertServerKeysStmt) + _, err := stmt.ExecContext( + ctx, + string(request.ServerName), + string(request.KeyID), + nameAndKeyID(request), + key.ValidUntilTS, + key.ExpiredTS, + key.Key.Encode(), + ) + return err +} + +func nameAndKeyID(request gomatrixserverlib.PublicKeyLookupRequest) string { + return string(request.ServerName) + "\x1F" + string(request.KeyID) +} diff --git a/federationapi/storage/postgres/storage.go b/federationapi/storage/postgres/storage.go index aecbedebe..33103615b 100644 --- a/federationapi/storage/postgres/storage.go +++ b/federationapi/storage/postgres/storage.go @@ -78,6 +78,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, fmt.Errorf("NewPostgresNotaryServerKeysMetadataTable: %s", err) } + serverSigningKeys, err := NewPostgresServerSigningKeysTable(d.db) + if err != nil { + return nil, err + } m := sqlutil.NewMigrations() deltas.LoadRemoveRoomsTable(m) if err = m.RunDeltas(d.db, dbProperties); err != nil { @@ -96,6 +100,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS FederationSenderOutboundPeeks: outboundPeeks, NotaryServerKeysJSON: notaryJSON, NotaryServerKeysMetadata: notaryMetadata, + ServerSigningKeys: serverSigningKeys, } if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationapi"); err != nil { return nil, err diff --git a/federationapi/storage/shared/storage.go b/federationapi/storage/shared/storage.go index ee36a6a81..2cc02396f 100644 --- a/federationapi/storage/shared/storage.go +++ b/federationapi/storage/shared/storage.go @@ -40,6 +40,7 @@ type Database struct { FederationSenderInboundPeeks tables.FederationSenderInboundPeeks NotaryServerKeysJSON tables.FederationSenderNotaryServerKeysJSON NotaryServerKeysMetadata tables.FederationSenderNotaryServerKeysMetadata + ServerSigningKeys tables.FederationSenderServerSigningKeys } // An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. diff --git a/federationapi/storage/shared/storage_keys.go b/federationapi/storage/shared/storage_keys.go new file mode 100644 index 000000000..3222b1224 --- /dev/null +++ b/federationapi/storage/shared/storage_keys.go @@ -0,0 +1,59 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + "database/sql" + + "github.com/matrix-org/gomatrixserverlib" +) + +// FetcherName implements KeyFetcher +func (d Database) FetcherName() string { + return "FederationAPIKeyDatabase" +} + +// FetchKeys implements gomatrixserverlib.KeyDatabase +func (d *Database) FetchKeys( + ctx context.Context, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + return d.ServerSigningKeys.BulkSelectServerKeys(ctx, nil, requests) +} + +// StoreKeys implements gomatrixserverlib.KeyDatabase +func (d *Database) StoreKeys( + ctx context.Context, + keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, +) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + var lastErr error + for request, keys := range keyMap { + if err := d.ServerSigningKeys.UpsertServerKeys(ctx, txn, request, keys); err != nil { + // Rather than returning immediately on error we try to insert the + // remaining keys. + // Since we are inserting the keys outside of a transaction it is + // possible for some of the inserts to succeed even though some + // of the inserts have failed. + // Ensuring that we always insert all the keys we can means that + // this behaviour won't depend on the iteration order of the map. + lastErr = err + } + } + return lastErr + }) +} diff --git a/federationapi/storage/sqlite3/server_key_table.go b/federationapi/storage/sqlite3/server_key_table.go new file mode 100644 index 000000000..a7c5ab42f --- /dev/null +++ b/federationapi/storage/sqlite3/server_key_table.go @@ -0,0 +1,155 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const serverSigningKeysSchema = ` +-- A cache of signing keys downloaded from remote servers. +CREATE TABLE IF NOT EXISTS keydb_server_keys ( + -- The name of the matrix server the key is for. + server_name TEXT NOT NULL, + -- The ID of the server key. + server_key_id TEXT NOT NULL, + -- Combined server name and key ID separated by the ASCII unit separator + -- to make it easier to run bulk queries. + server_name_and_key_id TEXT NOT NULL, + -- When the key is valid until as a millisecond timestamp. + -- 0 if this is an expired key (in which case expired_ts will be non-zero) + valid_until_ts BIGINT NOT NULL, + -- When the key expired as a millisecond timestamp. + -- 0 if this is an active key (in which case valid_until_ts will be non-zero) + expired_ts BIGINT NOT NULL, + -- The base64-encoded public key. + server_key TEXT NOT NULL, + UNIQUE (server_name, server_key_id) +); + +CREATE INDEX IF NOT EXISTS keydb_server_name_and_key_id ON keydb_server_keys (server_name_and_key_id); +` + +const bulkSelectServerSigningKeysSQL = "" + + "SELECT server_name, server_key_id, valid_until_ts, expired_ts, " + + " server_key FROM keydb_server_keys" + + " WHERE server_name_and_key_id IN ($1)" + +const upsertServerSigningKeysSQL = "" + + "INSERT INTO keydb_server_keys (server_name, server_key_id," + + " server_name_and_key_id, valid_until_ts, expired_ts, server_key)" + + " VALUES ($1, $2, $3, $4, $5, $6)" + + " ON CONFLICT (server_name, server_key_id)" + + " DO UPDATE SET valid_until_ts = $4, expired_ts = $5, server_key = $6" + +type serverSigningKeyStatements struct { + db *sql.DB + bulkSelectServerKeysStmt *sql.Stmt + upsertServerKeysStmt *sql.Stmt +} + +func NewSQLiteServerSigningKeysTable(db *sql.DB) (s *serverSigningKeyStatements, err error) { + s = &serverSigningKeyStatements{} + _, err = db.Exec(serverSigningKeysSchema) + if err != nil { + return + } + if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerSigningKeysSQL); err != nil { + return + } + if s.upsertServerKeysStmt, err = db.Prepare(upsertServerSigningKeysSQL); err != nil { + return + } + return s, nil +} + +func (s *serverSigningKeyStatements) BulkSelectServerKeys( + ctx context.Context, txn *sql.Tx, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + nameAndKeyIDs := make([]string, 0, len(requests)) + for request := range requests { + nameAndKeyIDs = append(nameAndKeyIDs, nameAndKeyID(request)) + } + results := make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, len(requests)) + iKeyIDs := make([]interface{}, len(nameAndKeyIDs)) + for i, v := range nameAndKeyIDs { + iKeyIDs[i] = v + } + + err := sqlutil.RunLimitedVariablesQuery( + ctx, bulkSelectServerSigningKeysSQL, s.db, iKeyIDs, sqlutil.SQLite3MaxVariables, + func(rows *sql.Rows) error { + for rows.Next() { + var serverName string + var keyID string + var key string + var validUntilTS int64 + var expiredTS int64 + if err := rows.Scan(&serverName, &keyID, &validUntilTS, &expiredTS, &key); err != nil { + return fmt.Errorf("bulkSelectServerKeys: %v", err) + } + r := gomatrixserverlib.PublicKeyLookupRequest{ + ServerName: gomatrixserverlib.ServerName(serverName), + KeyID: gomatrixserverlib.KeyID(keyID), + } + vk := gomatrixserverlib.VerifyKey{} + err := vk.Key.Decode(key) + if err != nil { + return fmt.Errorf("bulkSelectServerKeys: %v", err) + } + results[r] = gomatrixserverlib.PublicKeyLookupResult{ + VerifyKey: vk, + ValidUntilTS: gomatrixserverlib.Timestamp(validUntilTS), + ExpiredTS: gomatrixserverlib.Timestamp(expiredTS), + } + } + return nil + }, + ) + + if err != nil { + return nil, err + } + return results, nil +} + +func (s *serverSigningKeyStatements) UpsertServerKeys( + ctx context.Context, txn *sql.Tx, + request gomatrixserverlib.PublicKeyLookupRequest, + key gomatrixserverlib.PublicKeyLookupResult, +) error { + stmt := sqlutil.TxStmt(txn, s.upsertServerKeysStmt) + _, err := stmt.ExecContext( + ctx, + string(request.ServerName), + string(request.KeyID), + nameAndKeyID(request), + key.ValidUntilTS, + key.ExpiredTS, + key.Key.Encode(), + ) + return err +} + +func nameAndKeyID(request gomatrixserverlib.PublicKeyLookupRequest) string { + return string(request.ServerName) + "\x1F" + string(request.KeyID) +} diff --git a/federationapi/storage/sqlite3/storage.go b/federationapi/storage/sqlite3/storage.go index cdf071fc9..967db4d60 100644 --- a/federationapi/storage/sqlite3/storage.go +++ b/federationapi/storage/sqlite3/storage.go @@ -77,6 +77,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } + serverSigningKeys, err := NewSQLiteServerSigningKeysTable(d.db) + if err != nil { + return nil, err + } m := sqlutil.NewMigrations() deltas.LoadRemoveRoomsTable(m) if err = m.RunDeltas(d.db, dbProperties); err != nil { @@ -95,6 +99,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS FederationSenderInboundPeeks: inboundPeeks, NotaryServerKeysJSON: notaryKeys, NotaryServerKeysMetadata: notaryKeysMetadata, + ServerSigningKeys: serverSigningKeys, } if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationapi"); err != nil { return nil, err diff --git a/federationapi/storage/tables/interface.go b/federationapi/storage/tables/interface.go index be993daed..72cd7f36d 100644 --- a/federationapi/storage/tables/interface.go +++ b/federationapi/storage/tables/interface.go @@ -104,3 +104,8 @@ type FederationSenderNotaryServerKeysMetadata interface { // DeleteOldJSONResponses removes all responses which are not referenced in FederationSenderNotaryServerKeysMetadata DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error } + +type FederationSenderServerSigningKeys interface { + BulkSelectServerKeys(ctx context.Context, txn *sql.Tx, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) + UpsertServerKeys(ctx context.Context, txn *sql.Tx, request gomatrixserverlib.PublicKeyLookupRequest, key gomatrixserverlib.PublicKeyLookupResult) error +}