From eb0efa46361ca2d80050ef871d0a8f20652398af Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 4 Aug 2021 17:56:29 +0100 Subject: [PATCH] Cross-signing groundwork (#1953) * Cross-signing groundwork * Update to matrix-org/gomatrixserverlib#274 * Fix gobind builds, which stops unit tests in CI from yelling * Some changes from review comments * Fix build by passing in UIA * Update to matrix-org/gomatrixserverlib@bec8d22 * Process master/self-signing keys from devices call * nolint * Enum-ify the key type in the database * Process self-signing key too * Fix sanity check in device list updater * Fix check * Fix sytest, hopefully * Fix build --- build/gobind-pinecone/monolith.go | 2 +- build/gobind-yggdrasil/monolith.go | 2 +- clientapi/auth/password.go | 4 +- clientapi/auth/user_interactive.go | 4 +- clientapi/jsonerror/jsonerror.go | 12 + clientapi/routing/key_crosssigning.go | 125 ++++++ clientapi/routing/keys.go | 10 +- clientapi/routing/routing.go | 22 +- cmd/dendrite-demo-libp2p/main.go | 2 +- cmd/dendrite-demo-pinecone/main.go | 2 +- cmd/dendrite-demo-yggdrasil/main.go | 2 +- cmd/dendrite-monolith-server/main.go | 2 +- .../personalities/keyserver.go | 2 +- cmd/dendritejs-pinecone/main.go | 2 +- cmd/dendritejs/main.go | 2 +- keyserver/api/api.go | 33 +- keyserver/consumers/eduserver.go | 61 +++ keyserver/internal/cross_signing.go | 389 ++++++++++++++++++ keyserver/internal/device_list_update.go | 27 +- keyserver/internal/device_list_update_test.go | 13 +- keyserver/internal/internal.go | 83 +++- keyserver/inthttp/client.go | 50 ++- keyserver/inthttp/server.go | 22 + keyserver/keyserver.go | 29 +- setup/config/config_kafka.go | 1 + syncapi/internal/keychange_test.go | 4 + sytest-whitelist | 3 + 27 files changed, 860 insertions(+), 50 deletions(-) create mode 100644 clientapi/routing/key_crosssigning.go create mode 100644 keyserver/consumers/eduserver.go create mode 100644 keyserver/internal/cross_signing.go diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index e30057ed4..202284042 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -299,7 +299,7 @@ func (m *DendriteMonolith) Start() { base, federation, rsAPI, keyRing, true, ) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI) + keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) m.userAPI = userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) keyAPI.SetUserAPI(m.userAPI) diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index d5aedeac6..6fc5f244b 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -119,7 +119,7 @@ func (m *DendriteMonolith) Start() { base, federation, rsAPI, keyRing, true, ) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) + keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) keyAPI.SetUserAPI(userAPI) diff --git a/clientapi/auth/password.go b/clientapi/auth/password.go index bf4a95366..a66e2fe76 100644 --- a/clientapi/auth/password.go +++ b/clientapi/auth/password.go @@ -52,7 +52,7 @@ func (t *LoginTypePassword) Login(ctx context.Context, req interface{}) (*Login, if username == "" { return nil, &util.JSONResponse{ Code: http.StatusUnauthorized, - JSON: jsonerror.BadJSON("'user' must be supplied."), + JSON: jsonerror.BadJSON("A username must be supplied."), } } localpart, err := userutil.ParseUsernameParam(username, &t.Config.Matrix.ServerName) @@ -68,7 +68,7 @@ func (t *LoginTypePassword) Login(ctx context.Context, req interface{}) (*Login, // but that would leak the existence of the user. return nil, &util.JSONResponse{ Code: http.StatusForbidden, - JSON: jsonerror.Forbidden("username or password was incorrect, or the account does not exist"), + JSON: jsonerror.Forbidden("The username or password was incorrect or the account does not exist."), } } return &r.Login, nil diff --git a/clientapi/auth/user_interactive.go b/clientapi/auth/user_interactive.go index 9e0db68e0..30469fc47 100644 --- a/clientapi/auth/user_interactive.go +++ b/clientapi/auth/user_interactive.go @@ -220,7 +220,7 @@ func (u *UserInteractive) Verify(ctx context.Context, bodyBytes []byte, device * if !ok { return nil, &util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.BadJSON("unknown auth.type: " + authType), + JSON: jsonerror.BadJSON("Unknown auth.type: " + authType), } } @@ -231,7 +231,7 @@ func (u *UserInteractive) Verify(ctx context.Context, bodyBytes []byte, device * if !u.IsSingleStageFlow(authType) { return nil, &util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.Unknown("missing or unknown auth.session"), + JSON: jsonerror.Unknown("The auth.session is missing or unknown."), } } } diff --git a/clientapi/jsonerror/jsonerror.go b/clientapi/jsonerror/jsonerror.go index 7f8f264b7..c42b25bea 100644 --- a/clientapi/jsonerror/jsonerror.go +++ b/clientapi/jsonerror/jsonerror.go @@ -125,6 +125,18 @@ func GuestAccessForbidden(msg string) *MatrixError { return &MatrixError{"M_GUEST_ACCESS_FORBIDDEN", msg} } +// InvalidSignature is an error which is returned when the client tries +// to upload invalid signatures. +func InvalidSignature(msg string) *MatrixError { + return &MatrixError{"M_INVALID_SIGNATURE", msg} +} + +// MissingParam is an error that is returned when a parameter was incorrect, +// traditionally with cross-signing. +func MissingParam(msg string) *MatrixError { + return &MatrixError{"M_MISSING_PARAM", msg} +} + type IncompatibleRoomVersionError struct { RoomVersion string `json:"room_version"` Error string `json:"error"` diff --git a/clientapi/routing/key_crosssigning.go b/clientapi/routing/key_crosssigning.go new file mode 100644 index 000000000..3c103fd72 --- /dev/null +++ b/clientapi/routing/key_crosssigning.go @@ -0,0 +1,125 @@ +// Copyright 2021 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "encoding/json" + "io/ioutil" + "net/http" + + "github.com/matrix-org/dendrite/clientapi/auth" + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/setup/config" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/dendrite/userapi/storage/accounts" + "github.com/matrix-org/util" +) + +func UploadCrossSigningDeviceKeys( + req *http.Request, userInteractiveAuth *auth.UserInteractive, + keyserverAPI api.KeyInternalAPI, device *userapi.Device, + accountDB accounts.Database, cfg *config.ClientAPI, +) util.JSONResponse { + uploadReq := &api.PerformUploadDeviceKeysRequest{} + uploadRes := &api.PerformUploadDeviceKeysResponse{} + + ctx := req.Context() + defer req.Body.Close() // nolint:errcheck + bodyBytes, err := ioutil.ReadAll(req.Body) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.BadJSON("The request body could not be read: " + err.Error()), + } + } + + if _, err := userInteractiveAuth.Verify(ctx, bodyBytes, device); err != nil { + return *err + } + + if err = json.Unmarshal(bodyBytes, &uploadReq); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.BadJSON("The request body could not be unmarshalled: " + err.Error()), + } + } + + uploadReq.UserID = device.UserID + keyserverAPI.PerformUploadDeviceKeys(req.Context(), uploadReq, uploadRes) + + if err := uploadRes.Error; err != nil { + switch { + case err.IsInvalidSignature: + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidSignature(err.Error()), + } + case err.IsMissingParam: + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingParam(err.Error()), + } + default: + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.Unknown(err.Error()), + } + } + } + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: struct{}{}, + } +} + +func UploadCrossSigningDeviceSignatures(req *http.Request, keyserverAPI api.KeyInternalAPI, device *userapi.Device) util.JSONResponse { + uploadReq := &api.PerformUploadDeviceSignaturesRequest{} + uploadRes := &api.PerformUploadDeviceSignaturesResponse{} + + if err := httputil.UnmarshalJSONRequest(req, &uploadReq.Signatures); err != nil { + return *err + } + + uploadReq.UserID = device.UserID + keyserverAPI.PerformUploadDeviceSignatures(req.Context(), uploadReq, uploadRes) + + if err := uploadRes.Error; err != nil { + switch { + case err.IsInvalidSignature: + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidSignature(err.Error()), + } + case err.IsMissingParam: + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingParam(err.Error()), + } + default: + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.Unknown(err.Error()), + } + } + } + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: struct{}{}, + } +} diff --git a/clientapi/routing/keys.go b/clientapi/routing/keys.go index e22336428..2d65ac353 100644 --- a/clientapi/routing/keys.go +++ b/clientapi/routing/keys.go @@ -100,7 +100,7 @@ func (r *queryKeysRequest) GetTimeout() time.Duration { return time.Duration(r.Timeout) * time.Millisecond } -func QueryKeys(req *http.Request, keyAPI api.KeyInternalAPI) util.JSONResponse { +func QueryKeys(req *http.Request, keyAPI api.KeyInternalAPI, device *userapi.Device) util.JSONResponse { var r queryKeysRequest resErr := httputil.UnmarshalJSONRequest(req, &r) if resErr != nil { @@ -108,6 +108,7 @@ func QueryKeys(req *http.Request, keyAPI api.KeyInternalAPI) util.JSONResponse { } queryRes := api.QueryKeysResponse{} keyAPI.QueryKeys(req.Context(), &api.QueryKeysRequest{ + UserID: device.UserID, UserToDevices: r.DeviceKeys, Timeout: r.GetTimeout(), // TODO: Token? @@ -115,8 +116,11 @@ func QueryKeys(req *http.Request, keyAPI api.KeyInternalAPI) util.JSONResponse { return util.JSONResponse{ Code: 200, JSON: map[string]interface{}{ - "device_keys": queryRes.DeviceKeys, - "failures": queryRes.Failures, + "device_keys": queryRes.DeviceKeys, + "master_keys": queryRes.MasterKeys, + "self_signing_keys": queryRes.SelfSigningKeys, + "user_signing_keys": queryRes.UserSigningKeys, + "failures": queryRes.Failures, }, } } diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 541c76282..c6be8939d 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -64,7 +64,9 @@ func Setup( rateLimits := newRateLimits(&cfg.RateLimiting) userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg) - unstableFeatures := make(map[string]bool) + unstableFeatures := map[string]bool{ + //"org.matrix.e2e_cross_signing": true, + } for _, msc := range cfg.MSCs.MSCs { unstableFeatures["org.matrix."+msc] = true } @@ -1066,6 +1068,22 @@ func Setup( // Deleting E2E Backup Keys + // Cross-signing device keys + + postDeviceSigningKeys := httputil.MakeAuthAPI("post_device_signing_keys", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + return UploadCrossSigningDeviceKeys(req, userInteractiveAuth, keyAPI, device, accountDB, cfg) + }) + + postDeviceSigningSignatures := httputil.MakeAuthAPI("post_device_signing_signatures", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + return UploadCrossSigningDeviceSignatures(req, keyAPI, device) + }) + + r0mux.Handle("/keys/device_signing/upload", postDeviceSigningKeys).Methods(http.MethodPost, http.MethodOptions) + r0mux.Handle("/keys/signatures/upload", postDeviceSigningSignatures).Methods(http.MethodPost, http.MethodOptions) + + unstableMux.Handle("/keys/device_signing/upload", postDeviceSigningKeys).Methods(http.MethodPost, http.MethodOptions) + unstableMux.Handle("/keys/signatures/upload", postDeviceSigningSignatures).Methods(http.MethodPost, http.MethodOptions) + // Supplying a device ID is deprecated. r0mux.Handle("/keys/upload/{deviceID}", httputil.MakeAuthAPI("keys_upload", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { @@ -1079,7 +1097,7 @@ func Setup( ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/keys/query", httputil.MakeAuthAPI("keys_query", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { - return QueryKeys(req, keyAPI) + return QueryKeys(req, keyAPI, device) }), ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/keys/claim", diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 1c511ce70..4caa617f1 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -147,7 +147,7 @@ func main() { accountDB := base.Base.CreateAccountsDB() federation := createFederationClient(base) - keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation) + keyAPI := keyserver.NewInternalAPI(&base.Base, &base.Base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index d763f080e..1933c5ad8 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -179,7 +179,7 @@ func main() { base, federation, rsAPI, keyRing, true, ) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI) + keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index fc33a41f7..dbdb9a76a 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -102,7 +102,7 @@ func main() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) + keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 9974fe666..bb313e57a 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -112,7 +112,7 @@ func main() { // This is different to rsAPI which can be the http client which doesn't need this dependency rsImpl.SetFederationSenderAPI(fsAPI) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI) + keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) keyAPI.SetUserAPI(userAPI) if traceInternal { diff --git a/cmd/dendrite-polylith-multi/personalities/keyserver.go b/cmd/dendrite-polylith-multi/personalities/keyserver.go index d7fc9f4fb..8a99d7797 100644 --- a/cmd/dendrite-polylith-multi/personalities/keyserver.go +++ b/cmd/dendrite-polylith-multi/personalities/keyserver.go @@ -22,7 +22,7 @@ import ( func KeyServer(base *setup.BaseDendrite, cfg *config.Dendrite) { fsAPI := base.FederationSenderHTTPClient() - intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI) + intAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) intAPI.SetUserAPI(base.UserAPIClient()) keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI) diff --git a/cmd/dendritejs-pinecone/main.go b/cmd/dendritejs-pinecone/main.go index 0201b2916..b44c609c6 100644 --- a/cmd/dendritejs-pinecone/main.go +++ b/cmd/dendritejs-pinecone/main.go @@ -184,7 +184,7 @@ func startup() { accountDB := base.CreateAccountsDB() federation := conn.CreateFederationClient(base, pSessions) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) + keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index d5a845ae0..d8fc8b837 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -192,7 +192,7 @@ func main() { accountDB := base.CreateAccountsDB() federation := createFederationClient(cfg, node) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) + keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 5cb287bc1..f46a9ee26 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -32,6 +32,8 @@ type KeyInternalAPI interface { PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse) // PerformClaimKeys claims one-time keys for use in pre-key messages PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) + PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse) + PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse) QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse) QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse) QueryOneTimeKeys(ctx context.Context, req *QueryOneTimeKeysRequest, res *QueryOneTimeKeysResponse) @@ -40,7 +42,9 @@ type KeyInternalAPI interface { // KeyError is returned if there was a problem performing/querying the server type KeyError struct { - Err string + Err string `json:"error"` + IsInvalidSignature bool `json:"is_invalid_signature,omitempty"` // M_INVALID_SIGNATURE + IsMissingParam bool `json:"is_missing_param,omitempty"` // M_MISSING_PARAM } func (k *KeyError) Error() string { @@ -151,7 +155,30 @@ type PerformClaimKeysResponse struct { Error *KeyError } +type PerformUploadDeviceKeysRequest struct { + gomatrixserverlib.CrossSigningKeys + // The user that uploaded the key, should be populated by the clientapi. + UserID string `json:"user_id"` +} + +type PerformUploadDeviceKeysResponse struct { + Error *KeyError +} + +type PerformUploadDeviceSignaturesRequest struct { + Signatures map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice + // The user that uploaded the sig, should be populated by the clientapi. + UserID string `json:"user_id"` +} + +type PerformUploadDeviceSignaturesResponse struct { + Error *KeyError +} + type QueryKeysRequest struct { + // The user ID asking for the keys, e.g. if from a client API request. + // Will not be populated if the key request came from federation. + UserID string // Maps user IDs to a list of devices UserToDevices map[string][]string Timeout time.Duration @@ -162,6 +189,10 @@ type QueryKeysResponse struct { Failures map[string]interface{} // Map of user_id to device_id to device_key DeviceKeys map[string]map[string]json.RawMessage + // Maps of user_id to cross signing key + MasterKeys map[string]gomatrixserverlib.CrossSigningKey + SelfSigningKeys map[string]gomatrixserverlib.CrossSigningKey + UserSigningKeys map[string]gomatrixserverlib.CrossSigningKey // Set if there was a fatal error processing this query Error *KeyError } diff --git a/keyserver/consumers/eduserver.go b/keyserver/consumers/eduserver.go new file mode 100644 index 000000000..d764950bc --- /dev/null +++ b/keyserver/consumers/eduserver.go @@ -0,0 +1,61 @@ +package consumers + +import ( + "fmt" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/keyserver/storage" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" + + "github.com/Shopify/sarama" +) + +type OutputSigningKeyUpdateConsumer struct { + eduServerConsumer *internal.ContinualConsumer + keyDB storage.Database + keyAPI api.KeyInternalAPI + serverName string +} + +func NewOutputSigningKeyUpdateConsumer( + process *process.ProcessContext, + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + keyDB storage.Database, + keyAPI api.KeyInternalAPI, +) *OutputSigningKeyUpdateConsumer { + consumer := internal.ContinualConsumer{ + Process: process, + ComponentName: "keyserver/eduserver", + Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate), + Consumer: kafkaConsumer, + PartitionStore: keyDB, + } + s := &OutputSigningKeyUpdateConsumer{ + eduServerConsumer: &consumer, + keyDB: keyDB, + keyAPI: keyAPI, + serverName: string(cfg.Global.ServerName), + } + consumer.ProcessMessage = s.onMessage + + return s +} + +func (s *OutputSigningKeyUpdateConsumer) Start() error { + return s.eduServerConsumer.Start() +} + +func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { + /* + var output eduapi.OutputSigningKeyUpdate + if err := json.Unmarshal(msg.Value, &output); err != nil { + log.WithError(err).Errorf("eduserver output log: message parse failure") + return nil + } + return nil + */ + return fmt.Errorf("TODO") +} diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go new file mode 100644 index 000000000..802d02b36 --- /dev/null +++ b/keyserver/internal/cross_signing.go @@ -0,0 +1,389 @@ +// Copyright 2021 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "crypto/ed25519" + "encoding/json" + "fmt" + "strings" + + "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/keyserver/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" +) + +func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpose gomatrixserverlib.CrossSigningKeyPurpose) error { + // Is there exactly one key? + if len(key.Keys) != 1 { + return fmt.Errorf("should contain exactly one key") + } + + // Does the key ID match the key value? Iterates exactly once + for keyID, keyData := range key.Keys { + b64 := keyData.Encode() + tokens := strings.Split(string(keyID), ":") + if len(tokens) != 2 { + return fmt.Errorf("key ID is incorrectly formatted") + } + if tokens[1] != b64 { + return fmt.Errorf("key ID isn't correct") + } + } + + // Does the key claim to be from the right user? + if userID != key.UserID { + return fmt.Errorf("key has a user ID mismatch") + } + + // Does the key contain the correct purpose? + useful := false + for _, usage := range key.Usage { + if usage == purpose { + useful = true + break + } + } + if !useful { + return fmt.Errorf("key does not contain correct usage purpose") + } + + return nil +} + +// nolint:gocyclo +func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) { + var masterKey gomatrixserverlib.Base64Bytes + hasMasterKey := false + + if len(req.MasterKey.Keys) > 0 { + if err := sanityCheckKey(req.MasterKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err != nil { + res.Error = &api.KeyError{ + Err: "Master key sanity check failed: " + err.Error(), + } + return + } + hasMasterKey = true + for _, keyData := range req.MasterKey.Keys { // iterates once, because sanityCheckKey requires one key + masterKey = keyData + } + } + + if len(req.SelfSigningKey.Keys) > 0 { + if err := sanityCheckKey(req.SelfSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeSelfSigning); err != nil { + res.Error = &api.KeyError{ + Err: "Self-signing key sanity check failed: " + err.Error(), + } + return + } + } + + if len(req.UserSigningKey.Keys) > 0 { + if err := sanityCheckKey(req.UserSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeUserSigning); err != nil { + res.Error = &api.KeyError{ + Err: "User-signing key sanity check failed: " + err.Error(), + } + return + } + } + + // If the user hasn't given a new master key, then let's go and get their + // existing keys from the database. + if !hasMasterKey { + existingKeys, err := a.DB.CrossSigningKeysForUser(ctx, req.UserID) + if err != nil { + res.Error = &api.KeyError{ + Err: "Retrieving cross-signing keys from database failed: " + err.Error(), + } + return + } + + masterKey, hasMasterKey = existingKeys[gomatrixserverlib.CrossSigningKeyPurposeMaster] + } + + // If the user isn't a local user and we haven't successfully found a key + // through any local means then ask over federation. + if !hasMasterKey { + _, host, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + res.Error = &api.KeyError{ + Err: "Retrieving cross-signing keys from federation failed: " + err.Error(), + } + return + } + keys, err := a.FedClient.QueryKeys(ctx, host, map[string][]string{ + req.UserID: {}, + }) + if err != nil { + res.Error = &api.KeyError{ + Err: "Retrieving cross-signing keys from federation failed: " + err.Error(), + } + return + } + switch k := keys.MasterKeys[req.UserID].CrossSigningBody.(type) { + case *gomatrixserverlib.CrossSigningKey: + if err := sanityCheckKey(*k, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err != nil { + res.Error = &api.KeyError{ + Err: "Master key sanity check failed: " + err.Error(), + } + return + } + default: + res.Error = &api.KeyError{ + Err: "Unexpected type for master key retrieved from federation", + } + return + } + } + + // If we still don't have a master key at this point then there's nothing else + // we can do - we've checked both the request and the database. + if !hasMasterKey { + res.Error = &api.KeyError{ + Err: "No master key was found, either in the database or in the request!", + IsMissingParam: true, + } + return + } + + // The key ID is basically the key itself. + masterKeyID := gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", masterKey.Encode())) + + // Work out which things we need to verify the signatures for. + toVerify := make(map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey, 3) + toStore := types.CrossSigningKeyMap{} + if len(req.MasterKey.Keys) > 0 { + toVerify[gomatrixserverlib.CrossSigningKeyPurposeMaster] = req.MasterKey + } + if len(req.SelfSigningKey.Keys) > 0 { + toVerify[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning] = req.SelfSigningKey + } + if len(req.UserSigningKey.Keys) > 0 { + toVerify[gomatrixserverlib.CrossSigningKeyPurposeUserSigning] = req.UserSigningKey + } + for purpose, key := range toVerify { + // Collect together the key IDs we need to verify with. This will include + // all of the key IDs specified in the signatures. We don't do this for + // the master key because we have no means to verify the signatures - we + // instead just need to store them. + if purpose != gomatrixserverlib.CrossSigningKeyPurposeMaster { + // Marshal the specific key back into JSON so that we can verify the + // signature of it. + keyJSON, err := json.Marshal(key) + if err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("The JSON of the key section is invalid: %s", err.Error()), + } + return + } + + // Now check if the subkey is signed by the master key. + if err := gomatrixserverlib.VerifyJSON(req.UserID, masterKeyID, ed25519.PublicKey(masterKey), keyJSON); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("The %q sub-key failed master key signature verification: %s", purpose, err.Error()), + IsInvalidSignature: true, + } + return + } + } + + // If we've reached this point then all the signatures are valid so + // add the key to the list of keys to store. + for _, keyData := range key.Keys { // iterates once, see sanityCheckKey + toStore[purpose] = keyData + } + } + + if err := a.DB.StoreCrossSigningKeysForUser(ctx, req.UserID, toStore); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.DB.StoreCrossSigningKeysForUser: %s", err), + } + } +} + +func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req *api.PerformUploadDeviceSignaturesRequest, res *api.PerformUploadDeviceSignaturesResponse) { + selfSignatures := map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{} + otherSignatures := map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{} + + for userID, forUserID := range req.Signatures { + for keyID, keyOrDevice := range forUserID { + switch key := keyOrDevice.CrossSigningBody.(type) { + case *gomatrixserverlib.CrossSigningKey: + if key.UserID == req.UserID { + if _, ok := selfSignatures[userID]; !ok { + selfSignatures[userID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{} + } + selfSignatures[userID][keyID] = keyOrDevice + } else { + if _, ok := otherSignatures[userID]; !ok { + otherSignatures[userID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{} + } + otherSignatures[userID][keyID] = keyOrDevice + } + + case *gomatrixserverlib.DeviceKeys: + if key.UserID == req.UserID { + if _, ok := selfSignatures[userID]; !ok { + selfSignatures[userID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{} + } + selfSignatures[userID][keyID] = keyOrDevice + } else { + if _, ok := otherSignatures[userID]; !ok { + otherSignatures[userID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{} + } + otherSignatures[userID][keyID] = keyOrDevice + } + + default: + continue + } + } + } + + if err := a.processSelfSignatures(ctx, req.UserID, selfSignatures); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.processSelfSignatures: %s", err), + } + return + } + + if err := a.processOtherSignatures(ctx, req.UserID, otherSignatures); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.processOtherSignatures: %s", err), + } + return + } +} + +func (a *KeyInternalAPI) processSelfSignatures( + ctx context.Context, _ string, + signatures map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice, +) error { + // Here we will process: + // * The user signing their own devices using their self-signing key + // * The user signing their master key using one of their devices + + for targetUserID, forTargetUserID := range signatures { + for targetKeyID, signature := range forTargetUserID { + switch sig := signature.CrossSigningBody.(type) { + case *gomatrixserverlib.CrossSigningKey: + for originUserID, forOriginUserID := range sig.Signatures { + for originKeyID, originSig := range forOriginUserID { + if err := a.DB.StoreCrossSigningSigsForTarget( + ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig, + ); err != nil { + return fmt.Errorf("a.DB.StoreCrossSigningKeysForTarget: %w", err) + } + } + } + + case *gomatrixserverlib.DeviceKeys: + for originUserID, forOriginUserID := range sig.Signatures { + for originKeyID, originSig := range forOriginUserID { + if err := a.DB.StoreCrossSigningSigsForTarget( + ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig, + ); err != nil { + return fmt.Errorf("a.DB.StoreCrossSigningKeysForTarget: %w", err) + } + } + } + + default: + return fmt.Errorf("unexpected type assertion") + } + } + } + + return nil +} + +func (a *KeyInternalAPI) processOtherSignatures( + ctx context.Context, userID string, + signatures map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice, +) error { + // Here we will process: + // * A user signing someone else's master keys using their user-signing keys + + return nil +} + +func (a *KeyInternalAPI) crossSigningKeysFromDatabase( + ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse, +) { + for userID := range req.UserToDevices { + keys, err := a.DB.CrossSigningKeysForUser(ctx, userID) + if err != nil { + logrus.WithError(err).Errorf("Failed to get cross-signing keys for user %q", userID) + continue + } + + for keyType, keyData := range keys { + b64 := keyData.Encode() + keyID := gomatrixserverlib.KeyID("ed25519:" + b64) + key := gomatrixserverlib.CrossSigningKey{ + UserID: userID, + Usage: []gomatrixserverlib.CrossSigningKeyPurpose{ + keyType, + }, + Keys: map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{ + keyID: keyData, + }, + } + + sigs, err := a.DB.CrossSigningSigsForTarget(ctx, userID, keyID) + if err != nil { + logrus.WithError(err).Errorf("Failed to get cross-signing signatures for user %q key %q", userID, keyID) + continue + } + + appendSignature := func(originUserID string, originKeyID gomatrixserverlib.KeyID, signature gomatrixserverlib.Base64Bytes) { + if key.Signatures == nil { + key.Signatures = types.CrossSigningSigMap{} + } + if _, ok := key.Signatures[originUserID]; !ok { + key.Signatures[originUserID] = make(map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes) + } + key.Signatures[originUserID][originKeyID] = signature + } + + for originUserID, forOrigin := range sigs { + for originKeyID, signature := range forOrigin { + switch { + case req.UserID != "" && originUserID == req.UserID: + // Include signatures that we created + appendSignature(originUserID, originKeyID, signature) + case originUserID == userID: + // Include signatures that were created by the person whose key + // we are processing + appendSignature(originUserID, originKeyID, signature) + } + } + } + + switch keyType { + case gomatrixserverlib.CrossSigningKeyPurposeMaster: + res.MasterKeys[userID] = key + + case gomatrixserverlib.CrossSigningKeyPurposeSelfSigning: + res.SelfSigningKeys[userID] = key + + case gomatrixserverlib.CrossSigningKeyPurposeUserSigning: + res.UserSigningKeys[userID] = key + } + } + } +} diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 47bfb72c3..91d4b53d8 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -82,6 +82,7 @@ type DeviceListUpdater struct { mu *sync.Mutex // protects UserIDToMutex db DeviceListUpdaterDatabase + api DeviceListUpdaterAPI producer KeyChangeProducer fedClient fedsenderapi.FederationClient workerChans []chan gomatrixserverlib.ServerName @@ -114,6 +115,10 @@ type DeviceListUpdaterDatabase interface { DeviceKeysJSON(ctx context.Context, keys []api.DeviceMessage) error } +type DeviceListUpdaterAPI interface { + PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) +} + // KeyChangeProducer is the interface for producers.KeyChange useful for testing. type KeyChangeProducer interface { ProduceKeyChanges(keys []api.DeviceMessage) error @@ -121,13 +126,14 @@ type KeyChangeProducer interface { // NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale. func NewDeviceListUpdater( - db DeviceListUpdaterDatabase, producer KeyChangeProducer, fedClient fedsenderapi.FederationClient, - numWorkers int, + db DeviceListUpdaterDatabase, api DeviceListUpdaterAPI, producer KeyChangeProducer, + fedClient fedsenderapi.FederationClient, numWorkers int, ) *DeviceListUpdater { return &DeviceListUpdater{ userIDToMutex: make(map[string]*sync.Mutex), mu: &sync.Mutex{}, db: db, + api: api, producer: producer, fedClient: fedClient, workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers), @@ -367,6 +373,23 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam } continue } + if res.MasterKey != nil || res.SelfSigningKey != nil { + uploadReq := &api.PerformUploadDeviceKeysRequest{ + UserID: userID, + } + uploadRes := &api.PerformUploadDeviceKeysResponse{} + if res.MasterKey != nil { + if err = sanityCheckKey(*res.MasterKey, userID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err == nil { + uploadReq.MasterKey = *res.MasterKey + } + } + if res.SelfSigningKey != nil { + if err = sanityCheckKey(*res.SelfSigningKey, userID, gomatrixserverlib.CrossSigningKeyPurposeSelfSigning); err == nil { + uploadReq.SelfSigningKey = *res.SelfSigningKey + } + } + u.api.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes) + } err = u.updateDeviceList(&res) if err != nil { logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it") diff --git a/keyserver/internal/device_list_update_test.go b/keyserver/internal/device_list_update_test.go index eab2a78d8..7c170de28 100644 --- a/keyserver/internal/device_list_update_test.go +++ b/keyserver/internal/device_list_update_test.go @@ -95,6 +95,13 @@ func (d *mockDeviceListUpdaterDatabase) DeviceKeysJSON(ctx context.Context, keys return nil } +type mockDeviceListUpdaterAPI struct { +} + +func (d *mockDeviceListUpdaterAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) { + +} + type roundTripper struct { fn func(*http.Request) (*http.Response, error) } @@ -122,8 +129,9 @@ func TestUpdateHavePrevID(t *testing.T) { return true }, } + ap := &mockDeviceListUpdaterAPI{} producer := &mockKeyChangeProducer{} - updater := NewDeviceListUpdater(db, producer, nil, 1) + updater := NewDeviceListUpdater(db, ap, producer, nil, 1) event := gomatrixserverlib.DeviceListUpdateEvent{ DeviceDisplayName: "Foo Bar", Deleted: false, @@ -166,6 +174,7 @@ func TestUpdateNoPrevID(t *testing.T) { return false }, } + ap := &mockDeviceListUpdaterAPI{} producer := &mockKeyChangeProducer{} remoteUserID := "@alice:example.somewhere" var wg sync.WaitGroup @@ -193,7 +202,7 @@ func TestUpdateNoPrevID(t *testing.T) { `)), }, nil }) - updater := NewDeviceListUpdater(db, producer, fedClient, 2) + updater := NewDeviceListUpdater(db, ap, producer, fedClient, 2) if err := updater.Start(); err != nil { t.Fatalf("failed to start updater: %s", err) } diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index f53a07619..c5711e73c 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -221,9 +221,17 @@ func (a *KeyInternalAPI) QueryDeviceMessages(ctx context.Context, req *api.Query func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse) { res.DeviceKeys = make(map[string]map[string]json.RawMessage) + res.MasterKeys = make(map[string]gomatrixserverlib.CrossSigningKey) + res.SelfSigningKeys = make(map[string]gomatrixserverlib.CrossSigningKey) + res.UserSigningKeys = make(map[string]gomatrixserverlib.CrossSigningKey) res.Failures = make(map[string]interface{}) + + // get cross-signing keys from the database + a.crossSigningKeysFromDatabase(ctx, req, res) + // make a map from domain to device keys domainToDeviceKeys := make(map[string]map[string][]string) + domainToCrossSigningKeys := make(map[string]map[string]struct{}) for userID, deviceIDs := range req.UserToDevices { _, serverName, err := gomatrixserverlib.SplitID('@', userID) if err != nil { @@ -274,16 +282,30 @@ func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysReques domainToDeviceKeys[domain] = make(map[string][]string) domainToDeviceKeys[domain][userID] = append(domainToDeviceKeys[domain][userID], deviceIDs...) } + // work out if our cross-signing request for this user was + // satisfied, if not add them to the list of things to fetch + if _, ok := res.MasterKeys[userID]; !ok { + if _, ok := domainToCrossSigningKeys[domain]; !ok { + domainToCrossSigningKeys[domain] = make(map[string]struct{}) + } + domainToCrossSigningKeys[domain][userID] = struct{}{} + } + if _, ok := res.SelfSigningKeys[userID]; !ok { + if _, ok := domainToCrossSigningKeys[domain]; !ok { + domainToCrossSigningKeys[domain] = make(map[string]struct{}) + } + domainToCrossSigningKeys[domain][userID] = struct{}{} + } } // attempt to satisfy key queries from the local database first as we should get device updates pushed to us domainToDeviceKeys = a.remoteKeysFromDatabase(ctx, res, domainToDeviceKeys) - if len(domainToDeviceKeys) == 0 { + if len(domainToDeviceKeys) == 0 && len(domainToCrossSigningKeys) == 0 { return // nothing to query } // perform key queries for remote devices - a.queryRemoteKeys(ctx, req.Timeout, res, domainToDeviceKeys) + a.queryRemoteKeys(ctx, req.Timeout, res, domainToDeviceKeys, domainToCrossSigningKeys) } func (a *KeyInternalAPI) remoteKeysFromDatabase( @@ -313,18 +335,30 @@ func (a *KeyInternalAPI) remoteKeysFromDatabase( } func (a *KeyInternalAPI) queryRemoteKeys( - ctx context.Context, timeout time.Duration, res *api.QueryKeysResponse, domainToDeviceKeys map[string]map[string][]string, + ctx context.Context, timeout time.Duration, res *api.QueryKeysResponse, + domainToDeviceKeys map[string]map[string][]string, domainToCrossSigningKeys map[string]map[string]struct{}, ) { resultCh := make(chan *gomatrixserverlib.RespQueryKeys, len(domainToDeviceKeys)) // allows us to wait until all federation servers have been poked var wg sync.WaitGroup - wg.Add(len(domainToDeviceKeys)) // mutex for writing directly to res (e.g failures) var respMu sync.Mutex + domains := map[string]struct{}{} + for domain := range domainToDeviceKeys { + domains[domain] = struct{}{} + } + for domain := range domainToCrossSigningKeys { + domains[domain] = struct{}{} + } + wg.Add(len(domains)) + // fan out - for domain, deviceKeys := range domainToDeviceKeys { - go a.queryRemoteKeysOnServer(ctx, domain, deviceKeys, &wg, &respMu, timeout, resultCh, res) + for domain := range domains { + go a.queryRemoteKeysOnServer( + ctx, domain, domainToDeviceKeys[domain], domainToCrossSigningKeys[domain], + &wg, &respMu, timeout, resultCh, res, + ) } // Close the result channel when the goroutines have quit so the for .. range exits @@ -344,12 +378,29 @@ func (a *KeyInternalAPI) queryRemoteKeys( res.DeviceKeys[userID][deviceID] = keyJSON } } + + for userID, body := range result.MasterKeys { + switch b := body.CrossSigningBody.(type) { + case *gomatrixserverlib.CrossSigningKey: + res.MasterKeys[userID] = *b + } + } + + for userID, body := range result.SelfSigningKeys { + switch b := body.CrossSigningBody.(type) { + case *gomatrixserverlib.CrossSigningKey: + res.SelfSigningKeys[userID] = *b + } + } + + // TODO: do we want to persist these somewhere now + // that we have fetched them? } } func (a *KeyInternalAPI) queryRemoteKeysOnServer( - ctx context.Context, serverName string, devKeys map[string][]string, wg *sync.WaitGroup, - respMu *sync.Mutex, timeout time.Duration, resultCh chan<- *gomatrixserverlib.RespQueryKeys, + ctx context.Context, serverName string, devKeys map[string][]string, crossSigningKeys map[string]struct{}, + wg *sync.WaitGroup, respMu *sync.Mutex, timeout time.Duration, resultCh chan<- *gomatrixserverlib.RespQueryKeys, res *api.QueryKeysResponse, ) { defer wg.Done() @@ -358,14 +409,24 @@ func (a *KeyInternalAPI) queryRemoteKeysOnServer( // for users who we do not have any knowledge about, try to start doing device list updates for them // by hitting /users/devices - otherwise fallback to /keys/query which has nicer bulk properties but // lack a stream ID. - var userIDsForAllDevices []string + userIDsForAllDevices := map[string]struct{}{} for userID, deviceIDs := range devKeys { if len(deviceIDs) == 0 { - userIDsForAllDevices = append(userIDsForAllDevices, userID) + userIDsForAllDevices[userID] = struct{}{} delete(devKeys, userID) } } - for _, userID := range userIDsForAllDevices { + // for cross-signing keys, it's probably easier just to hit /keys/query if we aren't already doing + // a device list update, so we'll populate those back into the /keys/query list if not + for userID := range crossSigningKeys { + if devKeys == nil { + devKeys = map[string][]string{} + } + if _, ok := userIDsForAllDevices[userID]; !ok { + devKeys[userID] = []string{} + } + } + for userID := range userIDsForAllDevices { err := a.Updater.ManualUpdate(context.Background(), gomatrixserverlib.ServerName(serverName), userID) if err != nil { logrus.WithFields(logrus.Fields{ diff --git a/keyserver/inthttp/client.go b/keyserver/inthttp/client.go index 982000222..b685ae1a5 100644 --- a/keyserver/inthttp/client.go +++ b/keyserver/inthttp/client.go @@ -27,13 +27,15 @@ import ( // HTTP paths for the internal HTTP APIs const ( - InputDeviceListUpdatePath = "/keyserver/inputDeviceListUpdate" - PerformUploadKeysPath = "/keyserver/performUploadKeys" - PerformClaimKeysPath = "/keyserver/performClaimKeys" - QueryKeysPath = "/keyserver/queryKeys" - QueryKeyChangesPath = "/keyserver/queryKeyChanges" - QueryOneTimeKeysPath = "/keyserver/queryOneTimeKeys" - QueryDeviceMessagesPath = "/keyserver/queryDeviceMessages" + InputDeviceListUpdatePath = "/keyserver/inputDeviceListUpdate" + PerformUploadKeysPath = "/keyserver/performUploadKeys" + PerformClaimKeysPath = "/keyserver/performClaimKeys" + PerformUploadDeviceKeysPath = "/keyserver/performUploadDeviceKeys" + PerformUploadDeviceSignaturesPath = "/keyserver/performUploadDeviceSignatures" + QueryKeysPath = "/keyserver/queryKeys" + QueryKeyChangesPath = "/keyserver/queryKeyChanges" + QueryOneTimeKeysPath = "/keyserver/queryOneTimeKeys" + QueryDeviceMessagesPath = "/keyserver/queryDeviceMessages" ) // NewKeyServerClient creates a KeyInternalAPI implemented by talking to a HTTP POST API. @@ -175,3 +177,37 @@ func (h *httpKeyInternalAPI) QueryKeyChanges( } } } + +func (h *httpKeyInternalAPI) PerformUploadDeviceKeys( + ctx context.Context, + request *api.PerformUploadDeviceKeysRequest, + response *api.PerformUploadDeviceKeysResponse, +) { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformUploadDeviceKeys") + defer span.Finish() + + apiURL := h.apiURL + PerformUploadDeviceKeysPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) + if err != nil { + response.Error = &api.KeyError{ + Err: err.Error(), + } + } +} + +func (h *httpKeyInternalAPI) PerformUploadDeviceSignatures( + ctx context.Context, + request *api.PerformUploadDeviceSignaturesRequest, + response *api.PerformUploadDeviceSignaturesResponse, +) { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformUploadDeviceSignatures") + defer span.Finish() + + apiURL := h.apiURL + PerformUploadDeviceSignaturesPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) + if err != nil { + response.Error = &api.KeyError{ + Err: err.Error(), + } + } +} diff --git a/keyserver/inthttp/server.go b/keyserver/inthttp/server.go index 7dfaed2e7..ab096c156 100644 --- a/keyserver/inthttp/server.go +++ b/keyserver/inthttp/server.go @@ -58,6 +58,28 @@ func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(PerformUploadDeviceKeysPath, + httputil.MakeInternalAPI("performUploadDeviceKeys", func(req *http.Request) util.JSONResponse { + request := api.PerformUploadDeviceKeysRequest{} + response := api.PerformUploadDeviceKeysResponse{} + if err := json.NewDecoder(req.Body).Decode(&request.CrossSigningKeys); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + s.PerformUploadDeviceKeys(req.Context(), &request, &response) + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + internalAPIMux.Handle(PerformUploadDeviceSignaturesPath, + httputil.MakeInternalAPI("performUploadDeviceSignatures", func(req *http.Request) util.JSONResponse { + request := api.PerformUploadDeviceSignaturesRequest{} + response := api.PerformUploadDeviceSignaturesResponse{} + if err := json.NewDecoder(req.Body).Decode(&request.Signatures); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + s.PerformUploadDeviceSignatures(req.Context(), &request, &response) + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) internalAPIMux.Handle(QueryKeysPath, httputil.MakeInternalAPI("queryKeys", func(req *http.Request) util.JSONResponse { request := api.QueryKeysRequest{} diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 7e8fc2e0d..fcfe24de8 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -18,10 +18,12 @@ import ( "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/keyserver/consumers" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/storage" + "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/kafka" "github.com/sirupsen/logrus" @@ -36,9 +38,9 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, + base *setup.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + consumer, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) db, err := storage.NewDatabase(&cfg.Database) if err != nil { @@ -49,17 +51,26 @@ func NewInternalAPI( Producer: producer, DB: db, } - updater := internal.NewDeviceListUpdater(db, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable + ap := &internal.KeyInternalAPI{ + DB: db, + ThisServer: cfg.Matrix.ServerName, + FedClient: fedClient, + Producer: keyChangeProducer, + } + updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable + ap.Updater = updater go func() { if err := updater.Start(); err != nil { logrus.WithError(err).Panicf("failed to start device list updater") } }() - return &internal.KeyInternalAPI{ - DB: db, - ThisServer: cfg.Matrix.ServerName, - FedClient: fedClient, - Producer: keyChangeProducer, - Updater: updater, + + keyconsumer := consumers.NewOutputSigningKeyUpdateConsumer( + base.ProcessContext, base.Cfg, consumer, db, ap, + ) + if err := keyconsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer") } + + return ap } diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go index 361914287..15b3ad713 100644 --- a/setup/config/config_kafka.go +++ b/setup/config/config_kafka.go @@ -10,6 +10,7 @@ const ( TopicOutputRoomEvent = "OutputRoomEvent" TopicOutputClientData = "OutputClientData" TopicOutputReceiptEvent = "OutputReceiptEvent" + TopicOutputSigningKeyUpdate = "OutputSigningKeyUpdate" ) type Kafka struct { diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 44c4a4dd3..38be34f65 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -33,6 +33,10 @@ func (k *mockKeyAPI) SetUserAPI(i userapi.UserInternalAPI) {} // PerformClaimKeys claims one-time keys for use in pre-key messages func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) { } +func (k *mockKeyAPI) PerformUploadDeviceKeys(ctx context.Context, req *keyapi.PerformUploadDeviceKeysRequest, res *keyapi.PerformUploadDeviceKeysResponse) { +} +func (k *mockKeyAPI) PerformUploadDeviceSignatures(ctx context.Context, req *keyapi.PerformUploadDeviceSignaturesRequest, res *keyapi.PerformUploadDeviceSignaturesResponse) { +} func (k *mockKeyAPI) QueryKeys(ctx context.Context, req *keyapi.QueryKeysRequest, res *keyapi.QueryKeysResponse) { } func (k *mockKeyAPI) QueryKeyChanges(ctx context.Context, req *keyapi.QueryKeyChangesRequest, res *keyapi.QueryKeyChangesResponse) { diff --git a/sytest-whitelist b/sytest-whitelist index 6bb3d770a..27109e602 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -550,3 +550,6 @@ Will not back up to an old backup version Can create more than 10 backup versions Can delete backup Deleted & recreated backups are empty +Can upload self-signing keys +Fails to upload self-signing keys with no auth +Fails to upload self-signing key without master key