From a7e67e65a8662387f1a5ba6860698743f9dbd60f Mon Sep 17 00:00:00 2001 From: Kegsay Date: Thu, 30 Jul 2020 18:00:56 +0100 Subject: [PATCH] Notify clients when devices are deleted (#1233) * Recheck device lists when join/leave events come in * Add PerformDeviceDeletion * Notify clients when devices are deleted * Unbreak things * Remove debug logging --- build/gobind/monolith.go | 6 +++-- clientapi/routing/device.go | 28 ++++++++++---------- clientapi/routing/routing.go | 4 +-- cmd/dendrite-demo-libp2p/main.go | 6 +++-- cmd/dendrite-demo-yggdrasil/main.go | 6 +++-- cmd/dendrite-key-server/main.go | 3 ++- cmd/dendrite-monolith-server/main.go | 5 ++-- cmd/dendrite-user-api-server/main.go | 2 +- cmd/dendritejs/main.go | 6 +++-- keyserver/api/api.go | 4 +++ keyserver/internal/internal.go | 13 +++++++++- keyserver/inthttp/client.go | 5 ++++ keyserver/keyserver.go | 4 +-- keyserver/producers/keychange.go | 9 ++++--- keyserver/storage/interface.go | 3 ++- syncapi/consumers/roomserver.go | 23 ++++++++++++++++ syncapi/internal/keychange_test.go | 3 +++ syncapi/syncapi.go | 18 ++++++------- sytest-whitelist | 1 + userapi/api/api.go | 10 +++++++ userapi/internal/api.go | 39 ++++++++++++++++++++++++++++ userapi/inthttp/client.go | 13 ++++++++++ userapi/inthttp/server.go | 13 ++++++++++ userapi/userapi.go | 4 ++- userapi/userapi_test.go | 2 +- 25 files changed, 183 insertions(+), 47 deletions(-) diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index 5d0d11fda..e2ff79c34 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -118,7 +118,9 @@ func (m *DendriteMonolith) Start() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices) + keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, base.KafkaProducer) + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices, keyAPI) + keyAPI.SetUserAPI(userAPI) rsAPI := roomserver.NewInternalAPI( base, keyRing, federation, @@ -156,7 +158,7 @@ func (m *DendriteMonolith) Start() { RoomserverAPI: rsAPI, UserAPI: userAPI, StateAPI: stateAPI, - KeyAPI: keyserver.NewInternalAPI(base.Cfg, federation, userAPI, base.KafkaProducer), + KeyAPI: keyAPI, ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider( ygg, fsAPI, federation, ), diff --git a/clientapi/routing/device.go b/clientapi/routing/device.go index 01310400a..11c6c7827 100644 --- a/clientapi/routing/device.go +++ b/clientapi/routing/device.go @@ -165,7 +165,7 @@ func UpdateDeviceByID( // DeleteDeviceById handles DELETE requests to /devices/{deviceId} func DeleteDeviceById( - req *http.Request, userInteractiveAuth *auth.UserInteractive, deviceDB devices.Database, device *api.Device, + req *http.Request, userInteractiveAuth *auth.UserInteractive, userAPI api.UserInternalAPI, device *api.Device, deviceID string, ) util.JSONResponse { ctx := req.Context() @@ -197,8 +197,12 @@ func DeleteDeviceById( } } - if err := deviceDB.RemoveDevice(ctx, deviceID, localpart); err != nil { - util.GetLogger(ctx).WithError(err).Error("deviceDB.RemoveDevice failed") + var res api.PerformDeviceDeletionResponse + if err := userAPI.PerformDeviceDeletion(ctx, &api.PerformDeviceDeletionRequest{ + UserID: device.UserID, + DeviceIDs: []string{deviceID}, + }, &res); err != nil { + util.GetLogger(ctx).WithError(err).Error("userAPI.PerformDeviceDeletion failed") return jsonerror.InternalServerError() } @@ -210,26 +214,24 @@ func DeleteDeviceById( // DeleteDevices handles POST requests to /delete_devices func DeleteDevices( - req *http.Request, deviceDB devices.Database, device *api.Device, + req *http.Request, userAPI api.UserInternalAPI, device *api.Device, ) util.JSONResponse { - localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") - return jsonerror.InternalServerError() - } - ctx := req.Context() payload := devicesDeleteJSON{} if err := json.NewDecoder(req.Body).Decode(&payload); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("json.NewDecoder.Decode failed") + util.GetLogger(ctx).WithError(err).Error("json.NewDecoder.Decode failed") return jsonerror.InternalServerError() } defer req.Body.Close() // nolint: errcheck - if err := deviceDB.RemoveDevices(ctx, localpart, payload.Devices); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("deviceDB.RemoveDevices failed") + var res api.PerformDeviceDeletionResponse + if err := userAPI.PerformDeviceDeletion(ctx, &api.PerformDeviceDeletionRequest{ + UserID: device.UserID, + DeviceIDs: payload.Devices, + }, &res); err != nil { + util.GetLogger(ctx).WithError(err).Error("userAPI.PerformDeviceDeletion failed") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index ebb141ef8..6c40db865 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -654,13 +654,13 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return DeleteDeviceById(req, userInteractiveAuth, deviceDB, device, vars["deviceID"]) + return DeleteDeviceById(req, userInteractiveAuth, userAPI, device, vars["deviceID"]) }), ).Methods(http.MethodDelete, http.MethodOptions) r0mux.Handle("/delete_devices", httputil.MakeAuthAPI("delete_devices", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { - return DeleteDevices(req, deviceDB, device) + return DeleteDevices(req, userAPI, device) }), ).Methods(http.MethodPost, http.MethodOptions) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index ed7310181..4e774acc9 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -141,7 +141,9 @@ func main() { accountDB := base.Base.CreateAccountsDB() deviceDB := base.Base.CreateDeviceDB() federation := createFederationClient(base) - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil) + keyAPI := keyserver.NewInternalAPI(base.Base.Cfg, federation, base.Base.KafkaProducer) + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil, keyAPI) + keyAPI.SetUserAPI(userAPI) serverKeyAPI := serverkeyapi.NewInternalAPI( base.Base.Cfg, federation, base.Base.Caches, @@ -186,7 +188,7 @@ func main() { ServerKeyAPI: serverKeyAPI, StateAPI: stateAPI, UserAPI: userAPI, - KeyAPI: keyserver.NewInternalAPI(base.Base.Cfg, federation, userAPI, base.Base.KafkaProducer), + KeyAPI: keyAPI, ExtPublicRoomsProvider: provider, } monolith.AddAllPublicRoutes(base.Base.PublicAPIMux) diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 5fd761029..0655a2a3b 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -103,7 +103,9 @@ func main() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil) + keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, base.KafkaProducer) + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil, keyAPI) + keyAPI.SetUserAPI(userAPI) rsComponent := roomserver.NewInternalAPI( base, keyRing, federation, @@ -142,7 +144,7 @@ func main() { RoomserverAPI: rsAPI, UserAPI: userAPI, StateAPI: stateAPI, - KeyAPI: keyserver.NewInternalAPI(base.Cfg, federation, userAPI, base.KafkaProducer), + KeyAPI: keyAPI, //ServerKeyAPI: serverKeyAPI, ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider( ygg, fsAPI, federation, diff --git a/cmd/dendrite-key-server/main.go b/cmd/dendrite-key-server/main.go index d58d475a2..94ea819f8 100644 --- a/cmd/dendrite-key-server/main.go +++ b/cmd/dendrite-key-server/main.go @@ -24,7 +24,8 @@ func main() { base := setup.NewBaseDendrite(cfg, "KeyServer", true) defer base.Close() // nolint: errcheck - intAPI := keyserver.NewInternalAPI(base.Cfg, base.CreateFederationClient(), base.UserAPIClient(), base.KafkaProducer) + intAPI := keyserver.NewInternalAPI(base.Cfg, base.CreateFederationClient(), base.KafkaProducer) + intAPI.SetUserAPI(base.UserAPIClient()) keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index b312579cb..bce5fce08 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -76,7 +76,9 @@ func main() { serverKeyAPI = base.ServerKeyAPIClient() } keyRing := serverKeyAPI.KeyRing() - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices) + keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, base.KafkaProducer) + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices, keyAPI) + keyAPI.SetUserAPI(userAPI) rsImpl := roomserver.NewInternalAPI( base, keyRing, federation, @@ -119,7 +121,6 @@ func main() { rsImpl.SetFederationSenderAPI(fsAPI) stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) - keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, userAPI, base.KafkaProducer) monolith := setup.Monolith{ Config: base.Cfg, diff --git a/cmd/dendrite-user-api-server/main.go b/cmd/dendrite-user-api-server/main.go index 4257da3f3..e6d61da10 100644 --- a/cmd/dendrite-user-api-server/main.go +++ b/cmd/dendrite-user-api-server/main.go @@ -27,7 +27,7 @@ func main() { accountDB := base.CreateAccountsDB() deviceDB := base.CreateDeviceDB() - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices) + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices, base.KeyServerHTTPClient()) userapi.AddInternalRoutes(base.InternalAPIMux, userAPI) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 70e5279cf..0df53e06d 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -196,7 +196,9 @@ func main() { accountDB := base.CreateAccountsDB() deviceDB := base.CreateDeviceDB() federation := createFederationClient(cfg, node) - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil) + keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, base.KafkaProducer) + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil, keyAPI) + keyAPI.SetUserAPI(userAPI) fetcher := &libp2pKeyFetcher{} keyRing := gomatrixserverlib.KeyRing{ @@ -233,7 +235,7 @@ func main() { RoomserverAPI: rsAPI, StateAPI: stateAPI, UserAPI: userAPI, - KeyAPI: keyserver.NewInternalAPI(base.Cfg, federation, userAPI, base.KafkaProducer), + KeyAPI: keyAPI, //ServerKeyAPI: serverKeyAPI, ExtPublicRoomsProvider: p2pPublicRoomProvider, } diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 98bcd9442..6795498fe 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -19,9 +19,13 @@ import ( "encoding/json" "strings" "time" + + userapi "github.com/matrix-org/dendrite/userapi/api" ) type KeyInternalAPI interface { + // SetUserAPI assigns a user API to query when extracting device names. + SetUserAPI(i userapi.UserInternalAPI) PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse) // PerformClaimKeys claims one-time keys for use in pre-key messages PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index 703713538..480d1084e 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -40,6 +40,10 @@ type KeyInternalAPI struct { Producer *producers.KeyChange } +func (a *KeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) { + a.UserAPI = i +} + func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { if req.Partition < 0 { req.Partition = a.Producer.DefaultPartition() @@ -272,6 +276,10 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU var keysToStore []api.DeviceKeys // assert that the user ID / device ID are not lying for each key for _, key := range req.DeviceKeys { + if len(key.KeyJSON) == 0 { + keysToStore = append(keysToStore, key) + continue // deleted keys don't need sanity checking + } gotUserID := gjson.GetBytes(key.KeyJSON, "user_id").Str gotDeviceID := gjson.GetBytes(key.KeyJSON, "device_id").Str if gotUserID == key.UserID && gotDeviceID == key.DeviceID { @@ -286,6 +294,7 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU ), }) } + // get existing device keys so we can check for changes existingKeys := make([]api.DeviceKeys, len(keysToStore)) for i := range keysToStore { @@ -358,7 +367,9 @@ func (a *KeyInternalAPI) emitDeviceKeyChanges(existing, new []api.DeviceKeys) er for _, newKey := range new { exists := false for _, existingKey := range existing { - if bytes.Equal(existingKey.KeyJSON, newKey.KeyJSON) { + // Do not treat the absence of keys as equal, or else we will not emit key changes + // when users delete devices which never had a key to begin with as both KeyJSONs are nil. + if bytes.Equal(existingKey.KeyJSON, newKey.KeyJSON) && len(existingKey.KeyJSON) > 0 { exists = true break } diff --git a/keyserver/inthttp/client.go b/keyserver/inthttp/client.go index cd9cf70d4..3f9690b51 100644 --- a/keyserver/inthttp/client.go +++ b/keyserver/inthttp/client.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver/api" + userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/opentracing/opentracing-go" ) @@ -52,6 +53,10 @@ type httpKeyInternalAPI struct { httpClient *http.Client } +func (h *httpKeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) { + // no-op: doesn't need it +} + func (h *httpKeyInternalAPI) PerformClaimKeys( ctx context.Context, request *api.PerformClaimKeysRequest, diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index c748d7ce3..36bedf34b 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/storage" - userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -37,7 +36,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - cfg *config.Dendrite, fedClient *gomatrixserverlib.FederationClient, userAPI userapi.UserInternalAPI, producer sarama.SyncProducer, + cfg *config.Dendrite, fedClient *gomatrixserverlib.FederationClient, producer sarama.SyncProducer, ) api.KeyInternalAPI { db, err := storage.NewDatabase( string(cfg.Database.E2EKey), @@ -55,7 +54,6 @@ func NewInternalAPI( DB: db, ThisServer: cfg.Matrix.ServerName, FedClient: fedClient, - UserAPI: userAPI, Producer: keyChangeProducer, } } diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go index c51d9f55d..6035b67bd 100644 --- a/keyserver/producers/keychange.go +++ b/keyserver/producers/keychange.go @@ -63,10 +63,11 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceKeys) error { return err } logrus.WithFields(logrus.Fields{ - "user_id": key.UserID, - "device_id": key.DeviceID, - "partition": partition, - "offset": offset, + "user_id": key.UserID, + "device_id": key.DeviceID, + "partition": partition, + "offset": offset, + "len_key_bytes": len(key.KeyJSON), }).Infof("Produced to key change topic '%s'", p.Topic) } return nil diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go index 7a4fce6f5..fade75228 100644 --- a/keyserver/storage/interface.go +++ b/keyserver/storage/interface.go @@ -32,7 +32,8 @@ type Database interface { // DeviceKeysJSON populates the KeyJSON for the given keys. If any proided `keys` have a `KeyJSON` already then it will be replaced. DeviceKeysJSON(ctx context.Context, keys []api.DeviceKeys) error - // StoreDeviceKeys persists the given keys. Keys with the same user ID and device ID will be replaced. + // StoreDeviceKeys persists the given keys. Keys with the same user ID and device ID will be replaced. An empty KeyJSON removes the key + // for this (user, device). // Returns an error if there was a problem storing the keys. StoreDeviceKeys(ctx context.Context, keys []api.DeviceKeys) error diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index da4a5366c..f8cdcd5c1 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -35,6 +35,7 @@ type OutputRoomEventConsumer struct { rsConsumer *internal.ContinualConsumer db storage.Database notifier *sync.Notifier + keyChanges *OutputKeyChangeEventConsumer } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -44,6 +45,7 @@ func NewOutputRoomEventConsumer( n *sync.Notifier, store storage.Database, rsAPI api.RoomserverInternalAPI, + keyChanges *OutputKeyChangeEventConsumer, ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ @@ -56,6 +58,7 @@ func NewOutputRoomEventConsumer( db: store, notifier: n, rsAPI: rsAPI, + keyChanges: keyChanges, } consumer.ProcessMessage = s.onMessage @@ -160,9 +163,29 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) + s.notifyKeyChanges(&ev) + return nil } +func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.HeaderedEvent) { + if ev.Type() != gomatrixserverlib.MRoomMember || ev.StateKey() == nil { + return + } + membership, err := ev.Membership() + if err != nil { + return + } + switch membership { + case gomatrixserverlib.Join: + s.keyChanges.OnJoinEvent(ev) + case gomatrixserverlib.Ban: + fallthrough + case gomatrixserverlib.Leave: + s.keyChanges.OnLeaveEvent(ev) + } +} + func (s *OutputRoomEventConsumer) onNewInviteEvent( ctx context.Context, msg api.OutputNewInviteEvent, ) error { diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 3f18696c4..2c3d154d5 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -10,6 +10,7 @@ import ( "github.com/matrix-org/dendrite/currentstateserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" ) @@ -29,6 +30,8 @@ type mockKeyAPI struct{} func (k *mockKeyAPI) PerformUploadKeys(ctx context.Context, req *keyapi.PerformUploadKeysRequest, res *keyapi.PerformUploadKeysResponse) { } +func (k *mockKeyAPI) SetUserAPI(i userapi.UserInternalAPI) {} + // PerformClaimKeys claims one-time keys for use in pre-key messages func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) { } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 754cd5026..5198d59b1 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -64,8 +64,16 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI) + keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( + cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent), + consumer, notifier, keyAPI, currentStateAPI, syncDB, + ) + if err = keyChangeConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start key change consumer") + } + roomConsumer := consumers.NewOutputRoomEventConsumer( - cfg, consumer, notifier, syncDB, rsAPI, + cfg, consumer, notifier, syncDB, rsAPI, keyChangeConsumer, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") @@ -92,13 +100,5 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start send-to-device consumer") } - keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent), - consumer, notifier, keyAPI, currentStateAPI, syncDB, - ) - if err = keyChangeConsumer.Start(); err != nil { - logrus.WithError(err).Panicf("failed to start key change consumer") - } - routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) } diff --git a/sytest-whitelist b/sytest-whitelist index 341df8a9a..03baf4d44 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -129,6 +129,7 @@ Can claim one time key using POST Can claim remote one time key using POST Local device key changes appear in v2 /sync Local device key changes appear in /keys/changes +Local delete device changes appear in v2 /sync Get left notifs for other users in sync and /keys/changes when user leaves Can add account data Can add account data to room diff --git a/userapi/api/api.go b/userapi/api/api.go index 5791403ff..5c964c4fd 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -27,6 +27,7 @@ type UserInternalAPI interface { InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error PerformAccountCreation(ctx context.Context, req *PerformAccountCreationRequest, res *PerformAccountCreationResponse) error PerformDeviceCreation(ctx context.Context, req *PerformDeviceCreationRequest, res *PerformDeviceCreationResponse) error + PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error @@ -47,6 +48,15 @@ type InputAccountDataRequest struct { type InputAccountDataResponse struct { } +type PerformDeviceDeletionRequest struct { + UserID string + // The devices to delete + DeviceIDs []string +} + +type PerformDeviceDeletionResponse struct { +} + // QueryDeviceInfosRequest is the request to QueryDeviceInfos type QueryDeviceInfosRequest struct { DeviceIDs []string diff --git a/userapi/internal/api.go b/userapi/internal/api.go index 5b1541967..738023dd6 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -25,10 +25,12 @@ import ( "github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" + keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/devices" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" ) type UserInternalAPI struct { @@ -37,6 +39,7 @@ type UserInternalAPI struct { ServerName gomatrixserverlib.ServerName // AppServices is the list of all registered AS AppServices []config.ApplicationService + KeyAPI keyapi.KeyInternalAPI } func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error { @@ -104,6 +107,42 @@ func (a *UserInternalAPI) PerformDeviceCreation(ctx context.Context, req *api.Pe return nil } +func (a *UserInternalAPI) PerformDeviceDeletion(ctx context.Context, req *api.PerformDeviceDeletionRequest, res *api.PerformDeviceDeletionResponse) error { + util.GetLogger(ctx).WithField("user_id", req.UserID).WithField("devices", req.DeviceIDs).Info("PerformDeviceDeletion") + local, domain, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + return err + } + if domain != a.ServerName { + return fmt.Errorf("cannot PerformDeviceDeletion of remote users: got %s want %s", domain, a.ServerName) + } + err = a.DeviceDB.RemoveDevices(ctx, local, req.DeviceIDs) + if err != nil { + return err + } + // create empty device keys and upload them to delete what was once there and trigger device list changes + deviceKeys := make([]keyapi.DeviceKeys, len(req.DeviceIDs)) + for i, did := range req.DeviceIDs { + deviceKeys[i] = keyapi.DeviceKeys{ + UserID: req.UserID, + DeviceID: did, + KeyJSON: nil, + } + } + + var uploadRes keyapi.PerformUploadKeysResponse + a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{ + DeviceKeys: deviceKeys, + }, &uploadRes) + if uploadRes.Error != nil { + return fmt.Errorf("Failed to delete device keys: %v", uploadRes.Error) + } + if len(uploadRes.KeyErrors) > 0 { + return fmt.Errorf("Failed to delete device keys, key errors: %+v", uploadRes.KeyErrors) + } + return nil +} + func (a *UserInternalAPI) QueryProfile(ctx context.Context, req *api.QueryProfileRequest, res *api.QueryProfileResponse) error { local, domain, err := gomatrixserverlib.SplitID('@', req.UserID) if err != nil { diff --git a/userapi/inthttp/client.go b/userapi/inthttp/client.go index 3e1ac0662..47e2110f9 100644 --- a/userapi/inthttp/client.go +++ b/userapi/inthttp/client.go @@ -30,6 +30,7 @@ const ( PerformDeviceCreationPath = "/userapi/performDeviceCreation" PerformAccountCreationPath = "/userapi/performAccountCreation" + PerformDeviceDeletionPath = "/userapi/performDeviceDeletion" QueryProfilePath = "/userapi/queryProfile" QueryAccessTokenPath = "/userapi/queryAccessToken" @@ -91,6 +92,18 @@ func (h *httpUserInternalAPI) PerformDeviceCreation( return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } +func (h *httpUserInternalAPI) PerformDeviceDeletion( + ctx context.Context, + request *api.PerformDeviceDeletionRequest, + response *api.PerformDeviceDeletionResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformDeviceDeletion") + defer span.Finish() + + apiURL := h.apiURL + PerformDeviceDeletionPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + func (h *httpUserInternalAPI) QueryProfile( ctx context.Context, request *api.QueryProfileRequest, diff --git a/userapi/inthttp/server.go b/userapi/inthttp/server.go index d29f4d442..ebb9bf4e8 100644 --- a/userapi/inthttp/server.go +++ b/userapi/inthttp/server.go @@ -52,6 +52,19 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(PerformDeviceDeletionPath, + httputil.MakeInternalAPI("performDeviceDeletion", func(req *http.Request) util.JSONResponse { + request := api.PerformDeviceDeletionRequest{} + response := api.PerformDeviceDeletionResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := s.PerformDeviceDeletion(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) internalAPIMux.Handle(QueryProfilePath, httputil.MakeInternalAPI("queryProfile", func(req *http.Request) util.JSONResponse { request := api.QueryProfileRequest{} diff --git a/userapi/userapi.go b/userapi/userapi.go index 7aadec06a..c4ab90bac 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -17,6 +17,7 @@ package userapi import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/internal/config" + keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/internal" "github.com/matrix-org/dendrite/userapi/inthttp" @@ -34,12 +35,13 @@ func AddInternalRoutes(router *mux.Router, intAPI api.UserInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI(accountDB accounts.Database, deviceDB devices.Database, - serverName gomatrixserverlib.ServerName, appServices []config.ApplicationService) api.UserInternalAPI { + serverName gomatrixserverlib.ServerName, appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI) api.UserInternalAPI { return &internal.UserInternalAPI{ AccountDB: accountDB, DeviceDB: deviceDB, ServerName: serverName, AppServices: appServices, + KeyAPI: keyAPI, } } diff --git a/userapi/userapi_test.go b/userapi/userapi_test.go index 163b10ec7..dab1ec716 100644 --- a/userapi/userapi_test.go +++ b/userapi/userapi_test.go @@ -32,7 +32,7 @@ func MustMakeInternalAPI(t *testing.T) (api.UserInternalAPI, accounts.Database, t.Fatalf("failed to create device DB: %s", err) } - return userapi.NewInternalAPI(accountDB, deviceDB, serverName, nil), accountDB, deviceDB + return userapi.NewInternalAPI(accountDB, deviceDB, serverName, nil, nil), accountDB, deviceDB } func TestQueryProfile(t *testing.T) {