fedsender: try to satisfy all notary key requests from the cache first (#1925)

* fedsender: try to satisfy all notary key requests from the cache first

* Linting
This commit is contained in:
kegsay 2021-07-16 11:35:42 +01:00 committed by GitHub
parent c102adaf43
commit 728061db03
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 82 additions and 23 deletions

View file

@ -197,16 +197,10 @@ func NotaryKeys(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
} else { } else {
kids := make([]gomatrixserverlib.KeyID, len(kidToCriteria))
i := 0
for kid := range kidToCriteria {
kids[i] = kid
i++
}
var resp federationSenderAPI.QueryServerKeysResponse var resp federationSenderAPI.QueryServerKeysResponse
err := fsAPI.QueryServerKeys(httpReq.Context(), &federationSenderAPI.QueryServerKeysRequest{ err := fsAPI.QueryServerKeys(httpReq.Context(), &federationSenderAPI.QueryServerKeysRequest{
ServerName: serverName, ServerName: serverName,
OptionalKeyIDs: kids, KeyIDToCriteria: kidToCriteria,
}, &resp) }, &resp)
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)

View file

@ -97,7 +97,17 @@ type FederationSenderInternalAPI interface {
type QueryServerKeysRequest struct { type QueryServerKeysRequest struct {
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
OptionalKeyIDs []gomatrixserverlib.KeyID KeyIDToCriteria map[gomatrixserverlib.KeyID]gomatrixserverlib.PublicKeyNotaryQueryCriteria
}
func (q *QueryServerKeysRequest) KeyIDs() []gomatrixserverlib.KeyID {
kids := make([]gomatrixserverlib.KeyID, len(q.KeyIDToCriteria))
i := 0
for keyID := range q.KeyIDToCriteria {
kids[i] = keyID
i++
}
return kids
} }
type QueryServerKeysResponse struct { type QueryServerKeysResponse struct {

View file

@ -25,29 +25,73 @@ func (f *FederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom(
return return
} }
func (a *FederationSenderInternalAPI) fetchServerKeysDirectly(ctx context.Context, serverName gomatrixserverlib.ServerName) (*gomatrixserverlib.ServerKeys, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(serverName, func() (interface{}, error) {
return a.federation.GetServerKeys(ctx, serverName)
})
if err != nil {
return nil, err
}
sks := ires.(gomatrixserverlib.ServerKeys)
return &sks, nil
}
func (a *FederationSenderInternalAPI) fetchServerKeysFromCache(
ctx context.Context, req *api.QueryServerKeysRequest,
) ([]gomatrixserverlib.ServerKeys, error) {
var results []gomatrixserverlib.ServerKeys
for keyID, criteria := range req.KeyIDToCriteria {
serverKeysResponses, _ := a.db.GetNotaryKeys(ctx, req.ServerName, []gomatrixserverlib.KeyID{keyID})
if len(serverKeysResponses) == 0 {
return nil, fmt.Errorf("failed to find server key response for key ID %s", keyID)
}
// we should only get 1 result as we only gave 1 key ID
sk := serverKeysResponses[0]
util.GetLogger(ctx).Infof("fetchServerKeysFromCache: minvalid:%v keys: %+v", criteria.MinimumValidUntilTS, sk)
if criteria.MinimumValidUntilTS != 0 {
// check if it's still valid. if they have the same value that's also valid
if sk.ValidUntilTS < criteria.MinimumValidUntilTS {
return nil, fmt.Errorf(
"found server response for key ID %s but it is no longer valid, min: %v valid_until: %v",
keyID, criteria.MinimumValidUntilTS, sk.ValidUntilTS,
)
}
}
results = append(results, sk)
}
return results, nil
}
func (a *FederationSenderInternalAPI) QueryServerKeys( func (a *FederationSenderInternalAPI) QueryServerKeys(
ctx context.Context, req *api.QueryServerKeysRequest, res *api.QueryServerKeysResponse, ctx context.Context, req *api.QueryServerKeysRequest, res *api.QueryServerKeysResponse,
) error { ) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*30) // attempt to satisfy the entire request from the cache first
defer cancel() results, err := a.fetchServerKeysFromCache(ctx, req)
ires, err := a.doRequest(req.ServerName, func() (interface{}, error) { if err == nil {
return a.federation.GetServerKeys(ctx, req.ServerName) // satisfied entirely from cache, return it
}) res.ServerKeys = results
return nil
}
util.GetLogger(ctx).WithField("server", req.ServerName).WithError(err).Warn("notary: failed to satisfy keys request entirely from cache, hitting direct")
serverKeys, err := a.fetchServerKeysDirectly(ctx, req.ServerName)
if err != nil { if err != nil {
// try to load from the cache // try to load as much as we can from the cache in a best effort basis
serverKeysResponses, dbErr := a.db.GetNotaryKeys(ctx, req.ServerName, req.OptionalKeyIDs) util.GetLogger(ctx).WithField("server", req.ServerName).WithError(err).Warn("notary: failed to ask server for keys, returning best effort keys")
serverKeysResponses, dbErr := a.db.GetNotaryKeys(ctx, req.ServerName, req.KeyIDs())
if dbErr != nil { if dbErr != nil {
return fmt.Errorf("server returned %s, and db returned %s", err, dbErr) return fmt.Errorf("notary: server returned %s, and db returned %s", err, dbErr)
} }
res.ServerKeys = serverKeysResponses res.ServerKeys = serverKeysResponses
return nil return nil
} }
serverKeys := ires.(gomatrixserverlib.ServerKeys)
// cache it! // cache it!
if err = a.db.UpdateNotaryKeys(context.Background(), req.ServerName, serverKeys); err != nil { if err = a.db.UpdateNotaryKeys(context.Background(), req.ServerName, *serverKeys); err != nil {
// non-fatal, still return the response // non-fatal, still return the response
util.GetLogger(ctx).WithError(err).Warn("failed to UpdateNotaryKeys") util.GetLogger(ctx).WithError(err).Warn("failed to UpdateNotaryKeys")
} }
res.ServerKeys = []gomatrixserverlib.ServerKeys{serverKeys} res.ServerKeys = []gomatrixserverlib.ServerKeys{*serverKeys}
return nil return nil
} }

View file

@ -17,6 +17,7 @@ package postgres
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/json"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/federationsender/storage/tables" "github.com/matrix-org/dendrite/federationsender/storage/tables"
@ -148,7 +149,11 @@ func (s *notaryServerKeysMetadataStatements) SelectKeys(ctx context.Context, txn
var results []gomatrixserverlib.ServerKeys var results []gomatrixserverlib.ServerKeys
for rows.Next() { for rows.Next() {
var sk gomatrixserverlib.ServerKeys var sk gomatrixserverlib.ServerKeys
if err := rows.Scan(&sk.Raw); err != nil { var raw string
if err = rows.Scan(&raw); err != nil {
return nil, err
}
if err = json.Unmarshal([]byte(raw), &sk); err != nil {
return nil, err return nil, err
} }
results = append(results, sk) results = append(results, sk)

View file

@ -17,6 +17,7 @@ package sqlite3
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/json"
"fmt" "fmt"
"strings" "strings"
@ -150,7 +151,11 @@ func (s *notaryServerKeysMetadataStatements) SelectKeys(ctx context.Context, txn
var results []gomatrixserverlib.ServerKeys var results []gomatrixserverlib.ServerKeys
for rows.Next() { for rows.Next() {
var sk gomatrixserverlib.ServerKeys var sk gomatrixserverlib.ServerKeys
if err := rows.Scan(&sk.Raw); err != nil { var raw string
if err = rows.Scan(&raw); err != nil {
return nil, err
}
if err = json.Unmarshal([]byte(raw), &sk); err != nil {
return nil, err return nil, err
} }
results = append(results, sk) results = append(results, sk)

View file

@ -534,3 +534,4 @@ Remote servers cannot set power levels in rooms without existing powerlevels
Remote servers should reject attempts by non-creators to set the power levels Remote servers should reject attempts by non-creators to set the power levels
Federation handles empty auth_events in state_ids sanely Federation handles empty auth_events in state_ids sanely
Key notary server should return an expired key if it can't find any others Key notary server should return an expired key if it can't find any others
Key notary server must not overwrite a valid key with a spurious result from the origin server