From 470933789b4ae08cd33c0a1de3656673eb0ebe70 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 21 Jul 2020 17:46:47 +0100 Subject: [PATCH] Perform outbound federation hits for querying/claiming E2E keys (#1212) * Perform outbound federation hits for querying/claiming E2E keys Untested currently because we need the receiving end to work before sytest will be happy. * Linting --- cmd/dendrite-demo-libp2p/main.go | 2 +- cmd/dendrite-demo-yggdrasil/main.go | 2 +- cmd/dendrite-key-server/main.go | 2 +- cmd/dendrite-monolith-server/main.go | 2 +- cmd/dendritejs/main.go | 2 +- go.mod | 3 +- go.sum | 19 +---- keyserver/internal/internal.go | 120 +++++++++++++++++++++++++-- keyserver/keyserver.go | 4 +- 9 files changed, 122 insertions(+), 34 deletions(-) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 432df4abc..79c331424 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -186,7 +186,7 @@ func main() { ServerKeyAPI: serverKeyAPI, StateAPI: stateAPI, UserAPI: userAPI, - KeyAPI: keyserver.NewInternalAPI(base.Base.Cfg), + KeyAPI: keyserver.NewInternalAPI(base.Base.Cfg, federation), ExtPublicRoomsProvider: provider, } monolith.AddAllPublicRoutes(base.Base.PublicAPIMux) diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 7ab900000..4e4855b6e 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -140,7 +140,7 @@ func main() { RoomserverAPI: rsAPI, UserAPI: userAPI, StateAPI: stateAPI, - KeyAPI: keyserver.NewInternalAPI(base.Cfg), + KeyAPI: keyserver.NewInternalAPI(base.Cfg, federation), //ServerKeyAPI: serverKeyAPI, ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider( ygg, fsAPI, federation, diff --git a/cmd/dendrite-key-server/main.go b/cmd/dendrite-key-server/main.go index 04f42d8b6..7dabc258a 100644 --- a/cmd/dendrite-key-server/main.go +++ b/cmd/dendrite-key-server/main.go @@ -24,7 +24,7 @@ func main() { base := setup.NewBaseDendrite(cfg, "KeyServer", true) defer base.Close() // nolint: errcheck - intAPI := keyserver.NewInternalAPI(base.Cfg) + intAPI := keyserver.NewInternalAPI(base.Cfg, base.CreateFederationClient()) keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index e21c9a50a..93d62343d 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -119,7 +119,7 @@ func main() { rsImpl.SetFederationSenderAPI(fsAPI) stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) - keyAPI := keyserver.NewInternalAPI(base.Cfg) + keyAPI := keyserver.NewInternalAPI(base.Cfg, federation) monolith := setup.Monolith{ Config: base.Cfg, diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 55023a42c..3d58d957f 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -233,7 +233,7 @@ func main() { RoomserverAPI: rsAPI, StateAPI: stateAPI, UserAPI: userAPI, - KeyAPI: keyserver.NewInternalAPI(base.Cfg), + KeyAPI: keyserver.NewInternalAPI(base.Cfg, federation), //ServerKeyAPI: serverKeyAPI, ExtPublicRoomsProvider: p2pPublicRoomProvider, } diff --git a/go.mod b/go.mod index 19ea0e907..dfdc66444 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200714093724-6c9a3db6c0ed + github.com/matrix-org/gomatrixserverlib v0.0.0-20200721145051-cea6eafced2b github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible @@ -39,7 +39,6 @@ require ( github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200715104113-1046b00c3be3 go.uber.org/atomic v1.4.0 golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 - golang.org/x/mobile v0.0.0-20200629153529-33b80540585f // indirect gopkg.in/h2non/bimg.v1 v1.0.18 gopkg.in/yaml.v2 v2.2.8 ) diff --git a/go.sum b/go.sum index 0bb79dc9f..a7c8a05b4 100644 --- a/go.sum +++ b/go.sum @@ -12,7 +12,6 @@ github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOv github.com/Arceliar/phony v0.0.0-20191006174943-d0c68492aca0 h1:p3puK8Sl2xK+2FnnIvY/C0N1aqJo2kbEsdAzU+Tnv48= github.com/Arceliar/phony v0.0.0-20191006174943-d0c68492aca0/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/Shopify/sarama v1.26.1 h1:3jnfWKD7gVwbB1KSy/lE0szA9duPuSFLViK0o/d3DgA= github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= @@ -422,8 +421,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 h1:Yb+Wlf github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200714093724-6c9a3db6c0ed h1:b3PPbP+vzI68obhizd9O8/NzpiGOdj3uJXYz9S62PfE= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200714093724-6c9a3db6c0ed/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200721145051-cea6eafced2b h1:ul/Jc5q5+QBHNvhd9idfglOwyGf/Tc3ittINEbKJPsQ= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200721145051-cea6eafced2b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= @@ -680,7 +679,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90Pveol golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -693,19 +691,10 @@ golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 h1:Q7tZBpemrlsc2I7IyODzhtallWRSm4Q0d09pL6XbQtU= golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4= -golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= -golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= -golang.org/x/mobile v0.0.0-20200629153529-33b80540585f h1:9MxnlCHwn6IfUTinHBBzcBhmrX4OXfRmi954tWGKq+M= -golang.org/x/mobile v0.0.0-20200629153529-33b80540585f/go.mod h1:skQtrUTUwhdJvXM/2KKJzY8pDgNr9I/FOMqDVRPBUS4= -golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= -golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= -golang.org/x/mod v0.1.1-0.20191209134235-331c550502dd/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -792,11 +781,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd h1:/e+gpKk9r3dJobndpTytxS2gOy6m5uvpg+ISQoEcusQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69 h1:yBHHx+XZqXJBm6Exke3N7V9gnlsyXxoCPEb1yVenjfk= -golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index 041732dc4..e406dab4f 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -19,6 +19,8 @@ import ( "context" "encoding/json" "fmt" + "sync" + "time" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage" @@ -30,6 +32,7 @@ import ( type KeyInternalAPI struct { DB storage.Database ThisServer gomatrixserverlib.ServerName + FedClient *gomatrixserverlib.FederationClient } func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) { @@ -66,15 +69,67 @@ func (a *KeyInternalAPI) PerformClaimKeys(ctx context.Context, req *api.PerformC mergeInto(res.OneTimeKeys, keys) delete(domainToDeviceKeys, string(a.ThisServer)) } - // TODO: claim remote keys + // claim remote keys + a.claimRemoteKeys(ctx, req.Timeout, res, domainToDeviceKeys) +} +func (a *KeyInternalAPI) claimRemoteKeys( + ctx context.Context, timeout time.Duration, res *api.PerformClaimKeysResponse, domainToDeviceKeys map[string]map[string]map[string]string, +) { + resultCh := make(chan *gomatrixserverlib.RespClaimKeys, len(domainToDeviceKeys)) + // allows us to wait until all federation servers have been poked + var wg sync.WaitGroup + wg.Add(len(domainToDeviceKeys)) + // mutex for failures + var failMu sync.Mutex + + // fan out + for d, k := range domainToDeviceKeys { + go func(domain string, keysToClaim map[string]map[string]string) { + defer wg.Done() + fedCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + claimKeyRes, err := a.FedClient.ClaimKeys(fedCtx, gomatrixserverlib.ServerName(domain), keysToClaim) + if err != nil { + failMu.Lock() + res.Failures[domain] = map[string]interface{}{ + "message": err.Error(), + } + failMu.Unlock() + return + } + resultCh <- &claimKeyRes + }(d, k) + } + + // Close the result channel when the goroutines have quit so the for .. range exits + go func() { + wg.Wait() + close(resultCh) + }() + + for result := range resultCh { + for userID, nest := range result.OneTimeKeys { + res.OneTimeKeys[userID] = make(map[string]map[string]json.RawMessage) + for deviceID, nest2 := range nest { + res.OneTimeKeys[userID][deviceID] = make(map[string]json.RawMessage) + for keyIDWithAlgo, otk := range nest2 { + keyJSON, err := json.Marshal(otk) + if err != nil { + continue + } + res.OneTimeKeys[userID][deviceID][keyIDWithAlgo] = keyJSON + } + } + } + } } func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse) { res.DeviceKeys = make(map[string]map[string]json.RawMessage) res.Failures = make(map[string]interface{}) // make a map from domain to device keys - domainToUserToDevice := make(map[string][]api.DeviceKeys) + domainToDeviceKeys := make(map[string]map[string][]string) for userID, deviceIDs := range req.UserToDevices { _, serverName, err := gomatrixserverlib.SplitID('@', userID) if err != nil { @@ -100,16 +155,63 @@ func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysReques res.DeviceKeys[userID][dk.DeviceID] = dk.KeyJSON } } else { - for _, deviceID := range deviceIDs { - domainToUserToDevice[domain] = append(domainToUserToDevice[domain], api.DeviceKeys{ - UserID: userID, - DeviceID: deviceID, - }) - } + domainToDeviceKeys[domain] = make(map[string][]string) + domainToDeviceKeys[domain][userID] = append(domainToDeviceKeys[domain][userID], deviceIDs...) } } // TODO: set device display names when they are known - // TODO: perform key queries for remote devices + + // perform key queries for remote devices + a.queryRemoteKeys(ctx, req.Timeout, res, domainToDeviceKeys) +} + +func (a *KeyInternalAPI) queryRemoteKeys( + ctx context.Context, timeout time.Duration, res *api.QueryKeysResponse, domainToDeviceKeys map[string]map[string][]string, +) { + resultCh := make(chan *gomatrixserverlib.RespQueryKeys, len(domainToDeviceKeys)) + // allows us to wait until all federation servers have been poked + var wg sync.WaitGroup + wg.Add(len(domainToDeviceKeys)) + // mutex for failures + var failMu sync.Mutex + + // fan out + for domain, deviceKeys := range domainToDeviceKeys { + go func(serverName string, devKeys map[string][]string) { + defer wg.Done() + fedCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + queryKeysResp, err := a.FedClient.QueryKeys(fedCtx, gomatrixserverlib.ServerName(serverName), devKeys) + if err != nil { + failMu.Lock() + res.Failures[serverName] = map[string]interface{}{ + "message": err.Error(), + } + failMu.Unlock() + return + } + resultCh <- &queryKeysResp + }(domain, deviceKeys) + } + + // Close the result channel when the goroutines have quit so the for .. range exits + go func() { + wg.Wait() + close(resultCh) + }() + + for result := range resultCh { + for userID, nest := range result.DeviceKeys { + res.DeviceKeys[userID] = make(map[string]json.RawMessage) + for deviceID, deviceKey := range nest { + keyJSON, err := json.Marshal(deviceKey) + if err != nil { + continue + } + res.DeviceKeys[userID][deviceID] = keyJSON + } + } + } } func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) { diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 3c70fc21b..714b59f0b 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/storage" + "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -32,7 +33,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. -func NewInternalAPI(cfg *config.Dendrite) api.KeyInternalAPI { +func NewInternalAPI(cfg *config.Dendrite, fedClient *gomatrixserverlib.FederationClient) api.KeyInternalAPI { db, err := storage.NewDatabase( string(cfg.Database.E2EKey), cfg.DbProperties(), @@ -43,5 +44,6 @@ func NewInternalAPI(cfg *config.Dendrite) api.KeyInternalAPI { return &internal.KeyInternalAPI{ DB: db, ThisServer: cfg.Matrix.ServerName, + FedClient: fedClient, } }