Add signing key server tables

This commit is contained in:
Neil Alexander 2021-11-22 13:42:21 +00:00
parent 849e00e464
commit 59852a28ae
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
9 changed files with 446 additions and 0 deletions

68
federationapi/storage/cache/keydb.go vendored Normal file
View file

@ -0,0 +1,68 @@
package cache
import (
"context"
"errors"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/gomatrixserverlib"
)
// A Database implements gomatrixserverlib.KeyDatabase and is used to store
// the public keys for other matrix servers.
type KeyDatabase struct {
inner gomatrixserverlib.KeyDatabase
cache caching.ServerKeyCache
}
func NewKeyDatabase(inner gomatrixserverlib.KeyDatabase, cache caching.ServerKeyCache) (*KeyDatabase, error) {
if inner == nil {
return nil, errors.New("inner database can't be nil")
}
if cache == nil {
return nil, errors.New("cache can't be nil")
}
return &KeyDatabase{
inner: inner,
cache: cache,
}, nil
}
// FetcherName implements KeyFetcher
func (d KeyDatabase) FetcherName() string {
return "InMemoryKeyCache"
}
// FetchKeys implements gomatrixserverlib.KeyDatabase
func (d *KeyDatabase) FetchKeys(
ctx context.Context,
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
results := make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult)
for req, ts := range requests {
if res, cached := d.cache.GetServerKey(req, ts); cached {
results[req] = res
delete(requests, req)
}
}
fromDB, err := d.inner.FetchKeys(ctx, requests)
if err != nil {
return results, err
}
for req, res := range fromDB {
results[req] = res
d.cache.StoreServerKey(req, res)
}
return results, nil
}
// StoreKeys implements gomatrixserverlib.KeyDatabase
func (d *KeyDatabase) StoreKeys(
ctx context.Context,
keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
) error {
for req, res := range keyMap {
d.cache.StoreServerKey(req, res)
}
return d.inner.StoreKeys(ctx, keyMap)
}

View file

@ -25,6 +25,8 @@ import (
type Database interface { type Database interface {
internal.PartitionStorer internal.PartitionStorer
gomatrixserverlib.KeyDatabase
gomatrixserverlib.KeyFetcher
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)

View file

@ -0,0 +1,146 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const serverSigningKeysSchema = `
-- A cache of signing keys downloaded from remote servers.
CREATE TABLE IF NOT EXISTS keydb_server_keys (
-- The name of the matrix server the key is for.
server_name TEXT NOT NULL,
-- The ID of the server key.
server_key_id TEXT NOT NULL,
-- Combined server name and key ID separated by the ASCII unit separator
-- to make it easier to run bulk queries.
server_name_and_key_id TEXT NOT NULL,
-- When the key is valid until as a millisecond timestamp.
-- 0 if this is an expired key (in which case expired_ts will be non-zero)
valid_until_ts BIGINT NOT NULL,
-- When the key expired as a millisecond timestamp.
-- 0 if this is an active key (in which case valid_until_ts will be non-zero)
expired_ts BIGINT NOT NULL,
-- The base64-encoded public key.
server_key TEXT NOT NULL,
CONSTRAINT keydb_server_keys_unique UNIQUE (server_name, server_key_id)
);
CREATE INDEX IF NOT EXISTS keydb_server_name_and_key_id ON keydb_server_keys (server_name_and_key_id);
`
const bulkSelectServerSigningKeysSQL = "" +
"SELECT server_name, server_key_id, valid_until_ts, expired_ts, " +
" server_key FROM keydb_server_keys" +
" WHERE server_name_and_key_id = ANY($1)"
const upsertServerSigningKeysSQL = "" +
"INSERT INTO keydb_server_keys (server_name, server_key_id," +
" server_name_and_key_id, valid_until_ts, expired_ts, server_key)" +
" VALUES ($1, $2, $3, $4, $5, $6)" +
" ON CONFLICT ON CONSTRAINT keydb_server_keys_unique" +
" DO UPDATE SET valid_until_ts = $4, expired_ts = $5, server_key = $6"
type serverSigningKeyStatements struct {
bulkSelectServerKeysStmt *sql.Stmt
upsertServerKeysStmt *sql.Stmt
}
func NewPostgresServerSigningKeysTable(db *sql.DB) (s *serverSigningKeyStatements, err error) {
s = &serverSigningKeyStatements{}
_, err = db.Exec(serverSigningKeysSchema)
if err != nil {
return
}
if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerSigningKeysSQL); err != nil {
return
}
if s.upsertServerKeysStmt, err = db.Prepare(upsertServerSigningKeysSQL); err != nil {
return
}
return s, nil
}
func (s *serverSigningKeyStatements) BulkSelectServerKeys(
ctx context.Context, txn *sql.Tx,
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
var nameAndKeyIDs []string
for request := range requests {
nameAndKeyIDs = append(nameAndKeyIDs, nameAndKeyID(request))
}
stmt := s.bulkSelectServerKeysStmt
rows, err := stmt.QueryContext(ctx, pq.StringArray(nameAndKeyIDs))
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectServerKeys: rows.close() failed")
results := map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{}
for rows.Next() {
var serverName string
var keyID string
var key string
var validUntilTS int64
var expiredTS int64
if err = rows.Scan(&serverName, &keyID, &validUntilTS, &expiredTS, &key); err != nil {
return nil, err
}
r := gomatrixserverlib.PublicKeyLookupRequest{
ServerName: gomatrixserverlib.ServerName(serverName),
KeyID: gomatrixserverlib.KeyID(keyID),
}
vk := gomatrixserverlib.VerifyKey{}
err = vk.Key.Decode(key)
if err != nil {
return nil, err
}
results[r] = gomatrixserverlib.PublicKeyLookupResult{
VerifyKey: vk,
ValidUntilTS: gomatrixserverlib.Timestamp(validUntilTS),
ExpiredTS: gomatrixserverlib.Timestamp(expiredTS),
}
}
return results, rows.Err()
}
func (s *serverSigningKeyStatements) UpsertServerKeys(
ctx context.Context, txn *sql.Tx,
request gomatrixserverlib.PublicKeyLookupRequest,
key gomatrixserverlib.PublicKeyLookupResult,
) error {
stmt := sqlutil.TxStmt(txn, s.upsertServerKeysStmt)
_, err := stmt.ExecContext(
ctx,
string(request.ServerName),
string(request.KeyID),
nameAndKeyID(request),
key.ValidUntilTS,
key.ExpiredTS,
key.Key.Encode(),
)
return err
}
func nameAndKeyID(request gomatrixserverlib.PublicKeyLookupRequest) string {
return string(request.ServerName) + "\x1F" + string(request.KeyID)
}

View file

@ -78,6 +78,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil { if err != nil {
return nil, fmt.Errorf("NewPostgresNotaryServerKeysMetadataTable: %s", err) return nil, fmt.Errorf("NewPostgresNotaryServerKeysMetadataTable: %s", err)
} }
serverSigningKeys, err := NewPostgresServerSigningKeysTable(d.db)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations() m := sqlutil.NewMigrations()
deltas.LoadRemoveRoomsTable(m) deltas.LoadRemoveRoomsTable(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil { if err = m.RunDeltas(d.db, dbProperties); err != nil {
@ -96,6 +100,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
FederationSenderOutboundPeeks: outboundPeeks, FederationSenderOutboundPeeks: outboundPeeks,
NotaryServerKeysJSON: notaryJSON, NotaryServerKeysJSON: notaryJSON,
NotaryServerKeysMetadata: notaryMetadata, NotaryServerKeysMetadata: notaryMetadata,
ServerSigningKeys: serverSigningKeys,
} }
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationapi"); err != nil { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationapi"); err != nil {
return nil, err return nil, err

View file

@ -40,6 +40,7 @@ type Database struct {
FederationSenderInboundPeeks tables.FederationSenderInboundPeeks FederationSenderInboundPeeks tables.FederationSenderInboundPeeks
NotaryServerKeysJSON tables.FederationSenderNotaryServerKeysJSON NotaryServerKeysJSON tables.FederationSenderNotaryServerKeysJSON
NotaryServerKeysMetadata tables.FederationSenderNotaryServerKeysMetadata NotaryServerKeysMetadata tables.FederationSenderNotaryServerKeysMetadata
ServerSigningKeys tables.FederationSenderServerSigningKeys
} }
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. // An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.

View file

@ -0,0 +1,59 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package shared
import (
"context"
"database/sql"
"github.com/matrix-org/gomatrixserverlib"
)
// FetcherName implements KeyFetcher
func (d Database) FetcherName() string {
return "FederationAPIKeyDatabase"
}
// FetchKeys implements gomatrixserverlib.KeyDatabase
func (d *Database) FetchKeys(
ctx context.Context,
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
return d.ServerSigningKeys.BulkSelectServerKeys(ctx, nil, requests)
}
// StoreKeys implements gomatrixserverlib.KeyDatabase
func (d *Database) StoreKeys(
ctx context.Context,
keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
var lastErr error
for request, keys := range keyMap {
if err := d.ServerSigningKeys.UpsertServerKeys(ctx, txn, request, keys); err != nil {
// Rather than returning immediately on error we try to insert the
// remaining keys.
// Since we are inserting the keys outside of a transaction it is
// possible for some of the inserts to succeed even though some
// of the inserts have failed.
// Ensuring that we always insert all the keys we can means that
// this behaviour won't depend on the iteration order of the map.
lastErr = err
}
}
return lastErr
})
}

View file

@ -0,0 +1,155 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const serverSigningKeysSchema = `
-- A cache of signing keys downloaded from remote servers.
CREATE TABLE IF NOT EXISTS keydb_server_keys (
-- The name of the matrix server the key is for.
server_name TEXT NOT NULL,
-- The ID of the server key.
server_key_id TEXT NOT NULL,
-- Combined server name and key ID separated by the ASCII unit separator
-- to make it easier to run bulk queries.
server_name_and_key_id TEXT NOT NULL,
-- When the key is valid until as a millisecond timestamp.
-- 0 if this is an expired key (in which case expired_ts will be non-zero)
valid_until_ts BIGINT NOT NULL,
-- When the key expired as a millisecond timestamp.
-- 0 if this is an active key (in which case valid_until_ts will be non-zero)
expired_ts BIGINT NOT NULL,
-- The base64-encoded public key.
server_key TEXT NOT NULL,
UNIQUE (server_name, server_key_id)
);
CREATE INDEX IF NOT EXISTS keydb_server_name_and_key_id ON keydb_server_keys (server_name_and_key_id);
`
const bulkSelectServerSigningKeysSQL = "" +
"SELECT server_name, server_key_id, valid_until_ts, expired_ts, " +
" server_key FROM keydb_server_keys" +
" WHERE server_name_and_key_id IN ($1)"
const upsertServerSigningKeysSQL = "" +
"INSERT INTO keydb_server_keys (server_name, server_key_id," +
" server_name_and_key_id, valid_until_ts, expired_ts, server_key)" +
" VALUES ($1, $2, $3, $4, $5, $6)" +
" ON CONFLICT (server_name, server_key_id)" +
" DO UPDATE SET valid_until_ts = $4, expired_ts = $5, server_key = $6"
type serverSigningKeyStatements struct {
db *sql.DB
bulkSelectServerKeysStmt *sql.Stmt
upsertServerKeysStmt *sql.Stmt
}
func NewSQLiteServerSigningKeysTable(db *sql.DB) (s *serverSigningKeyStatements, err error) {
s = &serverSigningKeyStatements{}
_, err = db.Exec(serverSigningKeysSchema)
if err != nil {
return
}
if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerSigningKeysSQL); err != nil {
return
}
if s.upsertServerKeysStmt, err = db.Prepare(upsertServerSigningKeysSQL); err != nil {
return
}
return s, nil
}
func (s *serverSigningKeyStatements) BulkSelectServerKeys(
ctx context.Context, txn *sql.Tx,
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
nameAndKeyIDs := make([]string, 0, len(requests))
for request := range requests {
nameAndKeyIDs = append(nameAndKeyIDs, nameAndKeyID(request))
}
results := make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, len(requests))
iKeyIDs := make([]interface{}, len(nameAndKeyIDs))
for i, v := range nameAndKeyIDs {
iKeyIDs[i] = v
}
err := sqlutil.RunLimitedVariablesQuery(
ctx, bulkSelectServerSigningKeysSQL, s.db, iKeyIDs, sqlutil.SQLite3MaxVariables,
func(rows *sql.Rows) error {
for rows.Next() {
var serverName string
var keyID string
var key string
var validUntilTS int64
var expiredTS int64
if err := rows.Scan(&serverName, &keyID, &validUntilTS, &expiredTS, &key); err != nil {
return fmt.Errorf("bulkSelectServerKeys: %v", err)
}
r := gomatrixserverlib.PublicKeyLookupRequest{
ServerName: gomatrixserverlib.ServerName(serverName),
KeyID: gomatrixserverlib.KeyID(keyID),
}
vk := gomatrixserverlib.VerifyKey{}
err := vk.Key.Decode(key)
if err != nil {
return fmt.Errorf("bulkSelectServerKeys: %v", err)
}
results[r] = gomatrixserverlib.PublicKeyLookupResult{
VerifyKey: vk,
ValidUntilTS: gomatrixserverlib.Timestamp(validUntilTS),
ExpiredTS: gomatrixserverlib.Timestamp(expiredTS),
}
}
return nil
},
)
if err != nil {
return nil, err
}
return results, nil
}
func (s *serverSigningKeyStatements) UpsertServerKeys(
ctx context.Context, txn *sql.Tx,
request gomatrixserverlib.PublicKeyLookupRequest,
key gomatrixserverlib.PublicKeyLookupResult,
) error {
stmt := sqlutil.TxStmt(txn, s.upsertServerKeysStmt)
_, err := stmt.ExecContext(
ctx,
string(request.ServerName),
string(request.KeyID),
nameAndKeyID(request),
key.ValidUntilTS,
key.ExpiredTS,
key.Key.Encode(),
)
return err
}
func nameAndKeyID(request gomatrixserverlib.PublicKeyLookupRequest) string {
return string(request.ServerName) + "\x1F" + string(request.KeyID)
}

View file

@ -77,6 +77,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil { if err != nil {
return nil, err return nil, err
} }
serverSigningKeys, err := NewSQLiteServerSigningKeysTable(d.db)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations() m := sqlutil.NewMigrations()
deltas.LoadRemoveRoomsTable(m) deltas.LoadRemoveRoomsTable(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil { if err = m.RunDeltas(d.db, dbProperties); err != nil {
@ -95,6 +99,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
FederationSenderInboundPeeks: inboundPeeks, FederationSenderInboundPeeks: inboundPeeks,
NotaryServerKeysJSON: notaryKeys, NotaryServerKeysJSON: notaryKeys,
NotaryServerKeysMetadata: notaryKeysMetadata, NotaryServerKeysMetadata: notaryKeysMetadata,
ServerSigningKeys: serverSigningKeys,
} }
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationapi"); err != nil { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationapi"); err != nil {
return nil, err return nil, err

View file

@ -104,3 +104,8 @@ type FederationSenderNotaryServerKeysMetadata interface {
// DeleteOldJSONResponses removes all responses which are not referenced in FederationSenderNotaryServerKeysMetadata // DeleteOldJSONResponses removes all responses which are not referenced in FederationSenderNotaryServerKeysMetadata
DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error
} }
type FederationSenderServerSigningKeys interface {
BulkSelectServerKeys(ctx context.Context, txn *sql.Tx, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error)
UpsertServerKeys(ctx context.Context, txn *sql.Tx, request gomatrixserverlib.PublicKeyLookupRequest, key gomatrixserverlib.PublicKeyLookupResult) error
}