Notify clients when devices are deleted

This commit is contained in:
Kegan Dougal 2020-07-30 17:22:54 +01:00
parent 5b193ca674
commit 3d877d5e58
11 changed files with 66 additions and 14 deletions

View file

@ -76,7 +76,9 @@ func main() {
serverKeyAPI = base.ServerKeyAPIClient() serverKeyAPI = base.ServerKeyAPIClient()
} }
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices) keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, base.KafkaProducer)
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI)
rsImpl := roomserver.NewInternalAPI( rsImpl := roomserver.NewInternalAPI(
base, keyRing, federation, base, keyRing, federation,
@ -119,7 +121,6 @@ func main() {
rsImpl.SetFederationSenderAPI(fsAPI) rsImpl.SetFederationSenderAPI(fsAPI)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, userAPI, base.KafkaProducer)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,

View file

@ -27,7 +27,7 @@ func main() {
accountDB := base.CreateAccountsDB() accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB() deviceDB := base.CreateDeviceDB()
userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices) userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices, base.KeyServerHTTPClient())
userapi.AddInternalRoutes(base.InternalAPIMux, userAPI) userapi.AddInternalRoutes(base.InternalAPIMux, userAPI)

View file

@ -19,9 +19,13 @@ import (
"encoding/json" "encoding/json"
"strings" "strings"
"time" "time"
userapi "github.com/matrix-org/dendrite/userapi/api"
) )
type KeyInternalAPI interface { type KeyInternalAPI interface {
// SetUserAPI assigns a user API to query when extracting device names.
SetUserAPI(i userapi.UserInternalAPI)
PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse) PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse)
// PerformClaimKeys claims one-time keys for use in pre-key messages // PerformClaimKeys claims one-time keys for use in pre-key messages
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)

View file

@ -40,6 +40,10 @@ type KeyInternalAPI struct {
Producer *producers.KeyChange Producer *producers.KeyChange
} }
func (a *KeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) {
a.UserAPI = i
}
func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
if req.Partition < 0 { if req.Partition < 0 {
req.Partition = a.Producer.DefaultPartition() req.Partition = a.Producer.DefaultPartition()
@ -272,6 +276,10 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU
var keysToStore []api.DeviceKeys var keysToStore []api.DeviceKeys
// assert that the user ID / device ID are not lying for each key // assert that the user ID / device ID are not lying for each key
for _, key := range req.DeviceKeys { for _, key := range req.DeviceKeys {
if len(key.KeyJSON) == 0 {
keysToStore = append(keysToStore, key)
continue // deleted keys don't need sanity checking
}
gotUserID := gjson.GetBytes(key.KeyJSON, "user_id").Str gotUserID := gjson.GetBytes(key.KeyJSON, "user_id").Str
gotDeviceID := gjson.GetBytes(key.KeyJSON, "device_id").Str gotDeviceID := gjson.GetBytes(key.KeyJSON, "device_id").Str
if gotUserID == key.UserID && gotDeviceID == key.DeviceID { if gotUserID == key.UserID && gotDeviceID == key.DeviceID {
@ -286,6 +294,7 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU
), ),
}) })
} }
util.GetLogger(ctx).Infof("upload device keys: %+v", keysToStore)
// get existing device keys so we can check for changes // get existing device keys so we can check for changes
existingKeys := make([]api.DeviceKeys, len(keysToStore)) existingKeys := make([]api.DeviceKeys, len(keysToStore))
for i := range keysToStore { for i := range keysToStore {
@ -358,7 +367,9 @@ func (a *KeyInternalAPI) emitDeviceKeyChanges(existing, new []api.DeviceKeys) er
for _, newKey := range new { for _, newKey := range new {
exists := false exists := false
for _, existingKey := range existing { for _, existingKey := range existing {
if bytes.Equal(existingKey.KeyJSON, newKey.KeyJSON) { // Do not treat the absence of keys as equal, or else we will not emit key changes
// when users delete devices which never had a key to begin with as both KeyJSONs are nil.
if bytes.Equal(existingKey.KeyJSON, newKey.KeyJSON) && len(existingKey.KeyJSON) > 0 {
exists = true exists = true
break break
} }

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
) )
@ -52,6 +53,10 @@ type httpKeyInternalAPI struct {
httpClient *http.Client httpClient *http.Client
} }
func (h *httpKeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) {
// no-op: doesn't need it
}
func (h *httpKeyInternalAPI) PerformClaimKeys( func (h *httpKeyInternalAPI) PerformClaimKeys(
ctx context.Context, ctx context.Context,
request *api.PerformClaimKeysRequest, request *api.PerformClaimKeysRequest,

View file

@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/inthttp"
"github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/producers"
"github.com/matrix-org/dendrite/keyserver/storage" "github.com/matrix-org/dendrite/keyserver/storage"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -37,7 +36,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers // NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI( func NewInternalAPI(
cfg *config.Dendrite, fedClient *gomatrixserverlib.FederationClient, userAPI userapi.UserInternalAPI, producer sarama.SyncProducer, cfg *config.Dendrite, fedClient *gomatrixserverlib.FederationClient, producer sarama.SyncProducer,
) api.KeyInternalAPI { ) api.KeyInternalAPI {
db, err := storage.NewDatabase( db, err := storage.NewDatabase(
string(cfg.Database.E2EKey), string(cfg.Database.E2EKey),
@ -55,7 +54,6 @@ func NewInternalAPI(
DB: db, DB: db,
ThisServer: cfg.Matrix.ServerName, ThisServer: cfg.Matrix.ServerName,
FedClient: fedClient, FedClient: fedClient,
UserAPI: userAPI,
Producer: keyChangeProducer, Producer: keyChangeProducer,
} }
} }

View file

@ -67,6 +67,7 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceKeys) error {
"device_id": key.DeviceID, "device_id": key.DeviceID,
"partition": partition, "partition": partition,
"offset": offset, "offset": offset,
"len_key_bytes": len(key.KeyJSON),
}).Infof("Produced to key change topic '%s'", p.Topic) }).Infof("Produced to key change topic '%s'", p.Topic)
} }
return nil return nil

View file

@ -32,7 +32,8 @@ type Database interface {
// DeviceKeysJSON populates the KeyJSON for the given keys. If any proided `keys` have a `KeyJSON` already then it will be replaced. // DeviceKeysJSON populates the KeyJSON for the given keys. If any proided `keys` have a `KeyJSON` already then it will be replaced.
DeviceKeysJSON(ctx context.Context, keys []api.DeviceKeys) error DeviceKeysJSON(ctx context.Context, keys []api.DeviceKeys) error
// StoreDeviceKeys persists the given keys. Keys with the same user ID and device ID will be replaced. // StoreDeviceKeys persists the given keys. Keys with the same user ID and device ID will be replaced. An empty KeyJSON removes the key
// for this (user, device).
// Returns an error if there was a problem storing the keys. // Returns an error if there was a problem storing the keys.
StoreDeviceKeys(ctx context.Context, keys []api.DeviceKeys) error StoreDeviceKeys(ctx context.Context, keys []api.DeviceKeys) error

View file

@ -129,6 +129,7 @@ Can claim one time key using POST
Can claim remote one time key using POST Can claim remote one time key using POST
Local device key changes appear in v2 /sync Local device key changes appear in v2 /sync
Local device key changes appear in /keys/changes Local device key changes appear in /keys/changes
Local delete device changes appear in v2 /sync
Get left notifs for other users in sync and /keys/changes when user leaves Get left notifs for other users in sync and /keys/changes when user leaves
Can add account data Can add account data
Can add account data to room Can add account data to room

View file

@ -25,10 +25,12 @@ import (
"github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/dendrite/userapi/storage/devices" "github.com/matrix-org/dendrite/userapi/storage/devices"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
) )
type UserInternalAPI struct { type UserInternalAPI struct {
@ -37,6 +39,7 @@ type UserInternalAPI struct {
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
// AppServices is the list of all registered AS // AppServices is the list of all registered AS
AppServices []config.ApplicationService AppServices []config.ApplicationService
KeyAPI keyapi.KeyInternalAPI
} }
func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error { func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error {
@ -105,6 +108,7 @@ func (a *UserInternalAPI) PerformDeviceCreation(ctx context.Context, req *api.Pe
} }
func (a *UserInternalAPI) PerformDeviceDeletion(ctx context.Context, req *api.PerformDeviceDeletionRequest, res *api.PerformDeviceDeletionResponse) error { func (a *UserInternalAPI) PerformDeviceDeletion(ctx context.Context, req *api.PerformDeviceDeletionRequest, res *api.PerformDeviceDeletionResponse) error {
util.GetLogger(ctx).WithField("user_id", req.UserID).WithField("devices", req.DeviceIDs).Info("PerformDeviceDeletion")
local, domain, err := gomatrixserverlib.SplitID('@', req.UserID) local, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
if err != nil { if err != nil {
return err return err
@ -112,7 +116,31 @@ func (a *UserInternalAPI) PerformDeviceDeletion(ctx context.Context, req *api.Pe
if domain != a.ServerName { if domain != a.ServerName {
return fmt.Errorf("cannot PerformDeviceDeletion of remote users: got %s want %s", domain, a.ServerName) return fmt.Errorf("cannot PerformDeviceDeletion of remote users: got %s want %s", domain, a.ServerName)
} }
return a.DeviceDB.RemoveDevices(ctx, local, req.DeviceIDs) err = a.DeviceDB.RemoveDevices(ctx, local, req.DeviceIDs)
if err != nil {
return err
}
// create empty device keys and upload them to delete what was once there and trigger device list changes
deviceKeys := make([]keyapi.DeviceKeys, len(req.DeviceIDs))
for i, did := range req.DeviceIDs {
deviceKeys[i] = keyapi.DeviceKeys{
UserID: req.UserID,
DeviceID: did,
KeyJSON: nil,
}
}
var uploadRes keyapi.PerformUploadKeysResponse
a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{
DeviceKeys: deviceKeys,
}, &uploadRes)
if uploadRes.Error != nil {
return fmt.Errorf("Failed to delete device keys: %v", uploadRes.Error)
}
if len(uploadRes.KeyErrors) > 0 {
return fmt.Errorf("Failed to delete device keys, key errors: %+v", uploadRes.KeyErrors)
}
return nil
} }
func (a *UserInternalAPI) QueryProfile(ctx context.Context, req *api.QueryProfileRequest, res *api.QueryProfileResponse) error { func (a *UserInternalAPI) QueryProfile(ctx context.Context, req *api.QueryProfileRequest, res *api.QueryProfileResponse) error {

View file

@ -17,6 +17,7 @@ package userapi
import ( import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/internal" "github.com/matrix-org/dendrite/userapi/internal"
"github.com/matrix-org/dendrite/userapi/inthttp" "github.com/matrix-org/dendrite/userapi/inthttp"
@ -34,12 +35,13 @@ func AddInternalRoutes(router *mux.Router, intAPI api.UserInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers // NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(accountDB accounts.Database, deviceDB devices.Database, func NewInternalAPI(accountDB accounts.Database, deviceDB devices.Database,
serverName gomatrixserverlib.ServerName, appServices []config.ApplicationService) api.UserInternalAPI { serverName gomatrixserverlib.ServerName, appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI) api.UserInternalAPI {
return &internal.UserInternalAPI{ return &internal.UserInternalAPI{
AccountDB: accountDB, AccountDB: accountDB,
DeviceDB: deviceDB, DeviceDB: deviceDB,
ServerName: serverName, ServerName: serverName,
AppServices: appServices, AppServices: appServices,
KeyAPI: keyAPI,
} }
} }