From 83391da0e04dda7a52589ee7ec6df2b615571894 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 16 Jun 2020 17:05:38 +0100 Subject: [PATCH 1/2] Make syncapi use userapi (#1136) * Make syncapi use userapi * Unbreak things * Fix tests * Lint --- cmd/dendrite-sync-api-server/main.go | 3 +- docs/WIRING-Current.md | 2 ++ internal/setup/monolith.go | 2 +- syncapi/sync/requestpool.go | 45 +++++++++++++++------------- syncapi/syncapi.go | 4 +-- userapi/api/api.go | 23 +++++++++++++- userapi/internal/api.go | 33 ++++++++++++++++++++ userapi/inthttp/client.go | 9 ++++++ userapi/inthttp/server.go | 13 ++++++++ 9 files changed, 106 insertions(+), 28 deletions(-) diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go index a17b648d6..d67395fb3 100644 --- a/cmd/dendrite-sync-api-server/main.go +++ b/cmd/dendrite-sync-api-server/main.go @@ -25,12 +25,11 @@ func main() { defer base.Close() // nolint: errcheck userAPI := base.UserAPIClient() - accountDB := base.CreateAccountsDB() federation := base.CreateFederationClient() rsAPI := base.RoomserverHTTPClient() - syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, accountDB, rsAPI, federation, cfg) + syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, federation, cfg) base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI)) diff --git a/docs/WIRING-Current.md b/docs/WIRING-Current.md index 62450f2f8..ec539d4e9 100644 --- a/docs/WIRING-Current.md +++ b/docs/WIRING-Current.md @@ -39,6 +39,8 @@ Internal only | `------------------- - 12 (FedSender -> ServerKeyAPI): Verifying event signatures of responses (e.g from send_join) - 13 (Roomserver -> ServerKeyAPI): Verifying event signatures of backfilled events +In addition to this, all public facing components (Tier 1) talk to the `UserAPI` to verify access tokens and extract profile information where needed. + ## Kafka logs ``` diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index aec14aa72..bb81f7403 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -87,6 +87,6 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) { m.ExtPublicRoomsProvider, ) syncapi.AddPublicRoutes( - publicMux, m.KafkaConsumer, m.UserAPI, m.AccountDB, m.RoomserverAPI, m.FedClient, m.Config, + publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config, ) } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index ec22a05f4..26b925eac 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -21,7 +21,6 @@ import ( "net/http" "time" - "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" @@ -33,14 +32,14 @@ import ( // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db storage.Database - accountDB accounts.Database - notifier *Notifier + db storage.Database + userAPI userapi.UserInternalAPI + notifier *Notifier } // NewRequestPool makes a new RequestPool -func NewRequestPool(db storage.Database, n *Notifier, adb accounts.Database) *RequestPool { - return &RequestPool{db, adb, n} +func NewRequestPool(db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI) *RequestPool { + return &RequestPool{db, userAPI, n} } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -193,6 +192,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea return } +// nolint:gocyclo func (rp *RequestPool) appendAccountData( data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, accountDataFilter *gomatrixserverlib.EventFilter, @@ -202,25 +202,21 @@ func (rp *RequestPool) appendAccountData( // data keys were set between two message. This isn't a huge issue since the // duplicate data doesn't represent a huge quantity of data, but an optimisation // here would be making sure each data is sent only once to the client. - localpart, _, err := gomatrixserverlib.SplitID('@', userID) - if err != nil { - return nil, err - } - if req.since == nil { // If this is the initial sync, we don't need to check if a data has // already been sent. Instead, we send the whole batch. - var global []gomatrixserverlib.ClientEvent - var rooms map[string][]gomatrixserverlib.ClientEvent - global, rooms, err = rp.accountDB.GetAccountData(req.ctx, localpart) + var res userapi.QueryAccountDataResponse + err := rp.userAPI.QueryAccountData(req.ctx, &userapi.QueryAccountDataRequest{ + UserID: userID, + }, &res) if err != nil { return nil, err } - data.AccountData.Events = global + data.AccountData.Events = res.GlobalAccountData for r, j := range data.Rooms.Join { - if len(rooms[r]) > 0 { - j.AccountData.Events = rooms[r] + if len(res.RoomAccountData[r]) > 0 { + j.AccountData.Events = res.RoomAccountData[r] data.Rooms.Join[r] = j } } @@ -256,13 +252,20 @@ func (rp *RequestPool) appendAccountData( events := []gomatrixserverlib.ClientEvent{} // Request the missing data from the database for _, dataType := range dataTypes { - event, err := rp.accountDB.GetAccountDataByType( - req.ctx, localpart, roomID, dataType, - ) + var res userapi.QueryAccountDataResponse + err = rp.userAPI.QueryAccountData(req.ctx, &userapi.QueryAccountDataRequest{ + UserID: userID, + RoomID: roomID, + DataType: dataType, + }, &res) if err != nil { return nil, err } - events = append(events, *event) + if len(res.RoomAccountData[roomID]) > 0 { + events = append(events, res.RoomAccountData[roomID]...) + } else if len(res.GlobalAccountData) > 0 { + events = append(events, res.GlobalAccountData...) + } } // Append the data to the response diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 2351ee4d8..caf91e27e 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -21,7 +21,6 @@ import ( "github.com/gorilla/mux" "github.com/sirupsen/logrus" - "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -39,7 +38,6 @@ func AddPublicRoutes( router *mux.Router, consumer sarama.Consumer, userAPI userapi.UserInternalAPI, - accountsDB accounts.Database, rsAPI api.RoomserverInternalAPI, federation *gomatrixserverlib.FederationClient, cfg *config.Dendrite, @@ -60,7 +58,7 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start notifier") } - requestPool := sync.NewRequestPool(syncDB, notifier, accountsDB) + requestPool := sync.NewRequestPool(syncDB, notifier, userAPI) roomConsumer := consumers.NewOutputRoomEventConsumer( cfg, consumer, notifier, syncDB, rsAPI, diff --git a/userapi/api/api.go b/userapi/api/api.go index 3ed9252cb..1578268ac 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -14,13 +14,18 @@ package api -import "context" +import ( + "context" + + "github.com/matrix-org/gomatrixserverlib" +) // UserInternalAPI is the internal API for information about users and devices. type UserInternalAPI interface { QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error + QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error } // QueryAccessTokenRequest is the request for QueryAccessToken @@ -37,6 +42,22 @@ type QueryAccessTokenResponse struct { Err error // e.g ErrorForbidden } +// QueryAccountDataRequest is the request for QueryAccountData +type QueryAccountDataRequest struct { + UserID string // required: the user to get account data for. + // TODO: This is a terribly confusing API shape :/ + DataType string // optional: if specified returns only a single event matching this data type. + // optional: Only used if DataType is set. If blank returns global account data matching the data type. + // If set, returns only room account data matching this data type. + RoomID string +} + +// QueryAccountDataResponse is the response for QueryAccountData +type QueryAccountDataResponse struct { + GlobalAccountData []gomatrixserverlib.ClientEvent + RoomAccountData map[string][]gomatrixserverlib.ClientEvent +} + // QueryDevicesRequest is the request for QueryDevices type QueryDevicesRequest struct { UserID string diff --git a/userapi/internal/api.go b/userapi/internal/api.go index d8dec11af..6e737b81f 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -73,6 +73,39 @@ func (a *UserInternalAPI) QueryDevices(ctx context.Context, req *api.QueryDevice return nil } +func (a *UserInternalAPI) QueryAccountData(ctx context.Context, req *api.QueryAccountDataRequest, res *api.QueryAccountDataResponse) error { + local, domain, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + return err + } + if domain != a.ServerName { + return fmt.Errorf("cannot query account data of remote users: got %s want %s", domain, a.ServerName) + } + if req.DataType != "" { + var event *gomatrixserverlib.ClientEvent + event, err = a.AccountDB.GetAccountDataByType(ctx, local, req.RoomID, req.DataType) + if err != nil { + return err + } + if event != nil { + if req.RoomID != "" { + res.RoomAccountData = make(map[string][]gomatrixserverlib.ClientEvent) + res.RoomAccountData[req.RoomID] = []gomatrixserverlib.ClientEvent{*event} + } else { + res.GlobalAccountData = append(res.GlobalAccountData, *event) + } + } + return nil + } + global, rooms, err := a.AccountDB.GetAccountData(ctx, local) + if err != nil { + return err + } + res.RoomAccountData = rooms + res.GlobalAccountData = global + return nil +} + func (a *UserInternalAPI) QueryAccessToken(ctx context.Context, req *api.QueryAccessTokenRequest, res *api.QueryAccessTokenResponse) error { if req.AppServiceUserID != "" { appServiceDevice, err := a.queryAppServiceToken(ctx, req.AccessToken, req.AppServiceUserID) diff --git a/userapi/inthttp/client.go b/userapi/inthttp/client.go index 638a7e9b6..48e6d7d72 100644 --- a/userapi/inthttp/client.go +++ b/userapi/inthttp/client.go @@ -29,6 +29,7 @@ const ( QueryProfilePath = "/userapi/queryProfile" QueryAccessTokenPath = "/userapi/queryAccessToken" QueryDevicesPath = "/userapi/queryDevices" + QueryAccountDataPath = "/userapi/queryAccountData" ) // NewUserAPIClient creates a UserInternalAPI implemented by talking to a HTTP POST API. @@ -82,3 +83,11 @@ func (h *httpUserInternalAPI) QueryDevices(ctx context.Context, req *api.QueryDe apiURL := h.apiURL + QueryDevicesPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) } + +func (h *httpUserInternalAPI) QueryAccountData(ctx context.Context, req *api.QueryAccountDataRequest, res *api.QueryAccountDataResponse) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryAccountData") + defer span.Finish() + + apiURL := h.apiURL + QueryAccountDataPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) +} diff --git a/userapi/inthttp/server.go b/userapi/inthttp/server.go index 19b0e40b4..8bf2efc01 100644 --- a/userapi/inthttp/server.go +++ b/userapi/inthttp/server.go @@ -64,4 +64,17 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(QueryAccountDataPath, + httputil.MakeInternalAPI("queryAccountData", func(req *http.Request) util.JSONResponse { + request := api.QueryAccountDataRequest{} + response := api.QueryAccountDataResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := s.QueryAccountData(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } From e15a8042a19b270060beef1358f90cda075ddd38 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 16 Jun 2020 17:39:56 +0100 Subject: [PATCH 2/2] BREAKING: Make eduserver/appservice use userapi (#1138) * BREAKING: Make eduserver/appservice use userapi This is a breaking change because this PR restructures how the AS API tracks its position in Kafka streams. Previously, it used the account DB to store partition offsets. However, this is also being used by `clientapi` for the same purpose, which is bad (each component needs to store offsets independently or else you might lose messages across restarts). This PR changes this behaviour to now store partition offsets in the `appservice` database. This means that: - Upon restart, the `appservice` component will attempt to replay all room events from the beginning of time. - An additional table will be created in the appservice database, which in and of itself is backwards compatible. * Return ErrorConflict --- appservice/appservice.go | 38 +++++++------- appservice/consumers/roomserver.go | 6 +-- appservice/storage/interface.go | 2 + appservice/storage/postgres/storage.go | 4 ++ appservice/storage/sqlite3/storage.go | 4 ++ clientapi/auth/storage/accounts/interface.go | 4 ++ clientapi/auth/storage/devices/interface.go | 6 +++ cmd/dendrite-appservice-server/main.go | 5 +- cmd/dendrite-demo-libp2p/main.go | 6 +-- cmd/dendrite-demo-yggdrasil/main.go | 7 +-- cmd/dendrite-edu-server/main.go | 3 +- cmd/dendrite-monolith-server/main.go | 7 ++- cmd/dendritejs/main.go | 7 ++- eduserver/eduserver.go | 6 +-- eduserver/input/input.go | 15 +++--- userapi/api/api.go | 53 ++++++++++++++++++++ userapi/internal/api.go | 34 +++++++++++++ userapi/inthttp/client.go | 27 ++++++++++ userapi/inthttp/server.go | 26 ++++++++++ 19 files changed, 207 insertions(+), 53 deletions(-) diff --git a/appservice/appservice.go b/appservice/appservice.go index bd261ff9b..84a6a9b12 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -16,7 +16,6 @@ package appservice import ( "context" - "errors" "net/http" "sync" "time" @@ -29,12 +28,10 @@ import ( "github.com/matrix-org/dendrite/appservice/storage" "github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/appservice/workers" - "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" - "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" - "github.com/matrix-org/dendrite/internal/sqlutil" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" ) @@ -47,8 +44,7 @@ func AddInternalRoutes(router *mux.Router, queryAPI appserviceAPI.AppServiceQuer // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( base *setup.BaseDendrite, - accountsDB accounts.Database, - deviceDB devices.Database, + userAPI userapi.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) appserviceAPI.AppServiceQueryAPI { // Create a connection to the appservice postgres DB @@ -70,7 +66,7 @@ func NewInternalAPI( workerStates[i] = ws // Create bot account for this AS if it doesn't already exist - if err = generateAppServiceAccount(accountsDB, deviceDB, appservice); err != nil { + if err = generateAppServiceAccount(userAPI, appservice); err != nil { logrus.WithFields(logrus.Fields{ "appservice": appservice.ID, }).WithError(err).Panicf("failed to generate bot account for appservice") @@ -90,7 +86,7 @@ func NewInternalAPI( // We can't add ASes at runtime so this is safe to do. if len(workerStates) > 0 { consumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB, + base.Cfg, base.KafkaConsumer, appserviceDB, rsAPI, workerStates, ) if err := consumer.Start(); err != nil { @@ -109,22 +105,24 @@ func NewInternalAPI( // `sender_localpart` field of each application service if it doesn't // exist already func generateAppServiceAccount( - accountsDB accounts.Database, - deviceDB devices.Database, + userAPI userapi.UserInternalAPI, as config.ApplicationService, ) error { - ctx := context.Background() - - // Create an account for the application service - _, err := accountsDB.CreateAccount(ctx, as.SenderLocalpart, "", as.ID) + var accRes userapi.PerformAccountCreationResponse + err := userAPI.PerformAccountCreation(context.Background(), &userapi.PerformAccountCreationRequest{ + Localpart: as.SenderLocalpart, + AppServiceID: as.ID, + OnConflict: userapi.ConflictUpdate, + }, &accRes) if err != nil { - if errors.Is(err, sqlutil.ErrUserExists) { // This account already exists - return nil - } return err } - - // Create a dummy device with a dummy token for the application service - _, err = deviceDB.CreateDevice(ctx, as.SenderLocalpart, nil, as.ASToken, &as.SenderLocalpart) + var devRes userapi.PerformDeviceCreationResponse + err = userAPI.PerformDeviceCreation(context.Background(), &userapi.PerformDeviceCreationRequest{ + Localpart: as.SenderLocalpart, + AccessToken: as.ASToken, + DeviceID: &as.SenderLocalpart, + DeviceDisplayName: &as.SenderLocalpart, + }, &devRes) return err } diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 1657fe542..4c0156b2c 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -20,7 +20,6 @@ import ( "github.com/matrix-org/dendrite/appservice/storage" "github.com/matrix-org/dendrite/appservice/types" - "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -33,7 +32,6 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { roomServerConsumer *internal.ContinualConsumer - db accounts.Database asDB storage.Database rsAPI api.RoomserverInternalAPI serverName string @@ -45,7 +43,6 @@ type OutputRoomEventConsumer struct { func NewOutputRoomEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, - store accounts.Database, appserviceDB storage.Database, rsAPI api.RoomserverInternalAPI, workerStates []types.ApplicationServiceWorkerState, @@ -53,11 +50,10 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, - PartitionStore: store, + PartitionStore: appserviceDB, } s := &OutputRoomEventConsumer{ roomServerConsumer: &consumer, - db: store, asDB: appserviceDB, rsAPI: rsAPI, serverName: string(cfg.Matrix.ServerName), diff --git a/appservice/storage/interface.go b/appservice/storage/interface.go index 25d35af6c..735e2f90a 100644 --- a/appservice/storage/interface.go +++ b/appservice/storage/interface.go @@ -17,10 +17,12 @@ package storage import ( "context" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/gomatrixserverlib" ) type Database interface { + internal.PartitionStorer StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) error GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error) diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go index 3e12f3a0d..03f331d64 100644 --- a/appservice/storage/postgres/storage.go +++ b/appservice/storage/postgres/storage.go @@ -27,6 +27,7 @@ import ( // Database stores events intended to be later sent to application services type Database struct { + sqlutil.PartitionOffsetStatements events eventsStatements txnID txnStatements db *sql.DB @@ -42,6 +43,9 @@ func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (*Dat if err = result.prepare(); err != nil { return nil, err } + if err = result.PartitionOffsetStatements.Prepare(result.db, "appservice"); err != nil { + return nil, err + } return &result, nil } diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go index 44dcba4ed..cb55c8d94 100644 --- a/appservice/storage/sqlite3/storage.go +++ b/appservice/storage/sqlite3/storage.go @@ -27,6 +27,7 @@ import ( // Database stores events intended to be later sent to application services type Database struct { + sqlutil.PartitionOffsetStatements events eventsStatements txnID txnStatements db *sql.DB @@ -46,6 +47,9 @@ func NewDatabase(dataSourceName string) (*Database, error) { if err = result.prepare(); err != nil { return nil, err } + if err = result.PartitionOffsetStatements.Prepare(result.db, "appservice"); err != nil { + return nil, err + } return &result, nil } diff --git a/clientapi/auth/storage/accounts/interface.go b/clientapi/auth/storage/accounts/interface.go index 4d1941a23..3391ccbfc 100644 --- a/clientapi/auth/storage/accounts/interface.go +++ b/clientapi/auth/storage/accounts/interface.go @@ -40,6 +40,10 @@ type Database interface { GetMembershipsByLocalpart(ctx context.Context, localpart string) (memberships []authtypes.Membership, err error) SaveAccountData(ctx context.Context, localpart, roomID, dataType, content string) error GetAccountData(ctx context.Context, localpart string) (global []gomatrixserverlib.ClientEvent, rooms map[string][]gomatrixserverlib.ClientEvent, err error) + // GetAccountDataByType returns account data matching a given + // localpart, room ID and type. + // If no account data could be found, returns nil + // Returns an error if there was an issue with the retrieval GetAccountDataByType(ctx context.Context, localpart, roomID, dataType string) (data *gomatrixserverlib.ClientEvent, err error) GetNewNumericLocalpart(ctx context.Context) (int64, error) SaveThreePIDAssociation(ctx context.Context, threepid, localpart, medium string) (err error) diff --git a/clientapi/auth/storage/devices/interface.go b/clientapi/auth/storage/devices/interface.go index fc2f4a320..4bdb57850 100644 --- a/clientapi/auth/storage/devices/interface.go +++ b/clientapi/auth/storage/devices/interface.go @@ -24,6 +24,12 @@ type Database interface { GetDeviceByAccessToken(ctx context.Context, token string) (*api.Device, error) GetDeviceByID(ctx context.Context, localpart, deviceID string) (*api.Device, error) GetDevicesByLocalpart(ctx context.Context, localpart string) ([]api.Device, error) + // CreateDevice makes a new device associated with the given user ID localpart. + // If there is already a device with the same device ID for this user, that access token will be revoked + // and replaced with the given accessToken. If the given accessToken is already in use for another device, + // an error will be returned. + // If no device ID is given one is generated. + // Returns the device on success. CreateDevice(ctx context.Context, localpart string, deviceID *string, accessToken string, displayName *string) (dev *api.Device, returnErr error) UpdateDevice(ctx context.Context, localpart, deviceID string, displayName *string) error RemoveDevice(ctx context.Context, deviceID, localpart string) error diff --git a/cmd/dendrite-appservice-server/main.go b/cmd/dendrite-appservice-server/main.go index ec68940af..6719d0471 100644 --- a/cmd/dendrite-appservice-server/main.go +++ b/cmd/dendrite-appservice-server/main.go @@ -24,11 +24,10 @@ func main() { base := setup.NewBaseDendrite(cfg, "AppServiceAPI", true) defer base.Close() // nolint: errcheck - accountDB := base.CreateAccountsDB() - deviceDB := base.CreateDeviceDB() + userAPI := base.UserAPIClient() rsAPI := base.RoomserverHTTPClient() - intAPI := appservice.NewInternalAPI(base, accountDB, deviceDB, rsAPI) + intAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) appservice.AddInternalRoutes(base.InternalAPIMux, intAPI) base.SetupAndServeHTTP(string(base.Cfg.Bind.AppServiceAPI), string(base.Cfg.Listen.AppServiceAPI)) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 6fb3003c2..356ab5a7f 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -141,6 +141,7 @@ func main() { accountDB := base.Base.CreateAccountsDB() deviceDB := base.Base.CreateDeviceDB() federation := createFederationClient(base) + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil) serverKeyAPI := serverkeyapi.NewInternalAPI( base.Base.Cfg, federation, base.Base.Caches, @@ -154,9 +155,9 @@ func main() { &base.Base, keyRing, federation, ) eduInputAPI := eduserver.NewInternalAPI( - &base.Base, cache.New(), deviceDB, + &base.Base, cache.New(), userAPI, ) - asAPI := appservice.NewInternalAPI(&base.Base, accountDB, deviceDB, rsAPI) + asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI) fsAPI := federationsender.NewInternalAPI( &base.Base, federation, rsAPI, keyRing, ) @@ -165,7 +166,6 @@ func main() { if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") } - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil) monolith := setup.Monolith{ Config: base.Base.Cfg, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 87e2246f2..be15da472 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -130,16 +130,18 @@ func main() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil) + rsComponent := roomserver.NewInternalAPI( base, keyRing, federation, ) rsAPI := rsComponent eduInputAPI := eduserver.NewInternalAPI( - base, cache.New(), deviceDB, + base, cache.New(), userAPI, ) - asAPI := appservice.NewInternalAPI(base, accountDB, deviceDB, rsAPI) + asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) fsAPI := federationsender.NewInternalAPI( base, federation, rsAPI, keyRing, @@ -153,7 +155,6 @@ func main() { } embed.Embed(*instancePort, "Yggdrasil Demo") - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil) monolith := setup.Monolith{ Config: base.Cfg, diff --git a/cmd/dendrite-edu-server/main.go b/cmd/dendrite-edu-server/main.go index 1ecce884d..6704ebd09 100644 --- a/cmd/dendrite-edu-server/main.go +++ b/cmd/dendrite-edu-server/main.go @@ -29,9 +29,8 @@ func main() { logrus.WithError(err).Warn("BaseDendrite close failed") } }() - deviceDB := base.CreateDeviceDB() - intAPI := eduserver.NewInternalAPI(base, cache.New(), deviceDB) + intAPI := eduserver.NewInternalAPI(base, cache.New(), base.UserAPIClient()) eduserver.AddInternalRoutes(base.InternalAPIMux, intAPI) base.SetupAndServeHTTP(string(base.Cfg.Bind.EDUServer), string(base.Cfg.Listen.EDUServer)) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 16e274fc9..339bbe699 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -75,6 +75,7 @@ func main() { serverKeyAPI = base.ServerKeyAPIClient() } keyRing := serverKeyAPI.KeyRing() + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices) rsImpl := roomserver.NewInternalAPI( base, keyRing, federation, @@ -92,14 +93,14 @@ func main() { } eduInputAPI := eduserver.NewInternalAPI( - base, cache.New(), deviceDB, + base, cache.New(), userAPI, ) if base.UseHTTPAPIs { eduserver.AddInternalRoutes(base.InternalAPIMux, eduInputAPI) eduInputAPI = base.EDUServerClient() } - asAPI := appservice.NewInternalAPI(base, accountDB, deviceDB, rsAPI) + asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) if base.UseHTTPAPIs { appservice.AddInternalRoutes(base.InternalAPIMux, asAPI) asAPI = base.AppserviceHTTPClient() @@ -121,8 +122,6 @@ func main() { logrus.WithError(err).Panicf("failed to connect to public rooms db") } - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices) - monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index aa9192129..883b0fad0 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -194,6 +194,7 @@ func main() { accountDB := base.CreateAccountsDB() deviceDB := base.CreateDeviceDB() federation := createFederationClient(cfg, node) + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil) fetcher := &libp2pKeyFetcher{} keyRing := gomatrixserverlib.KeyRing{ @@ -204,9 +205,9 @@ func main() { } rsAPI := roomserver.NewInternalAPI(base, keyRing, federation) - eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), deviceDB) + eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI) asQuery := appservice.NewInternalAPI( - base, accountDB, deviceDB, rsAPI, + base, userAPI, rsAPI, ) fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing) rsAPI.SetFederationSenderAPI(fedSenderAPI) @@ -217,8 +218,6 @@ func main() { logrus.WithError(err).Panicf("failed to connect to public rooms db") } - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil) - monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index aa65ff239..2e6ba0c85 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -18,12 +18,12 @@ package eduserver import ( "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/input" "github.com/matrix-org/dendrite/eduserver/inthttp" "github.com/matrix-org/dendrite/internal/setup" + userapi "github.com/matrix-org/dendrite/userapi/api" ) // AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions @@ -37,11 +37,11 @@ func AddInternalRoutes(internalMux *mux.Router, inputAPI api.EDUServerInputAPI) func NewInternalAPI( base *setup.BaseDendrite, eduCache *cache.EDUCache, - deviceDB devices.Database, + userAPI userapi.UserInternalAPI, ) api.EDUServerInputAPI { return &input.EDUServerInputAPI{ Cache: eduCache, - DeviceDB: deviceDB, + UserAPI: userAPI, Producer: base.KafkaProducer, OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), OutputSendToDeviceEventTopic: string(base.Cfg.Kafka.Topics.OutputSendToDeviceEvent), diff --git a/eduserver/input/input.go b/eduserver/input/input.go index 6eafce42f..e3d2c55e3 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -22,9 +22,9 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/cache" + userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -39,8 +39,8 @@ type EDUServerInputAPI struct { OutputSendToDeviceEventTopic string // kafka producer Producer sarama.SyncProducer - // device database - DeviceDB devices.Database + // Internal user query API + UserAPI userapi.UserInternalAPI // our server name ServerName gomatrixserverlib.ServerName } @@ -115,7 +115,7 @@ func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error { devices := []string{} - localpart, domain, err := gomatrixserverlib.SplitID('@', ise.UserID) + _, domain, err := gomatrixserverlib.SplitID('@', ise.UserID) if err != nil { return err } @@ -126,11 +126,14 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e // wildcard as we don't know about the remote devices, so instead we leave it // as-is, so that the federation sender can send it on with the wildcard intact. if domain == t.ServerName && ise.DeviceID == "*" { - devs, err := t.DeviceDB.GetDevicesByLocalpart(context.TODO(), localpart) + var res userapi.QueryDevicesResponse + err = t.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{ + UserID: ise.UserID, + }, &res) if err != nil { return err } - for _, dev := range devs { + for _, dev := range res.Devices { devices = append(devices, dev.ID) } } else { diff --git a/userapi/api/api.go b/userapi/api/api.go index 1578268ac..34c74bb3c 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -22,6 +22,8 @@ import ( // UserInternalAPI is the internal API for information about users and devices. type UserInternalAPI interface { + PerformAccountCreation(ctx context.Context, req *PerformAccountCreationRequest, res *PerformAccountCreationResponse) error + PerformDeviceCreation(ctx context.Context, req *PerformDeviceCreationRequest, res *PerformDeviceCreationResponse) error QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error @@ -85,6 +87,38 @@ type QueryProfileResponse struct { AvatarURL string } +// PerformAccountCreationRequest is the request for PerformAccountCreation +type PerformAccountCreationRequest struct { + Localpart string + AppServiceID string + Password string + OnConflict Conflict +} + +// PerformAccountCreationResponse is the response for PerformAccountCreation +type PerformAccountCreationResponse struct { + AccountCreated bool + UserID string +} + +// PerformDeviceCreationRequest is the request for PerformDeviceCreation +type PerformDeviceCreationRequest struct { + Localpart string + AccessToken string // optional: if blank one will be made on your behalf + // optional: if nil an ID is generated for you. If set, replaces any existing device session, + // which will generate a new access token and invalidate the old one. + DeviceID *string + // optional: if nil no display name will be associated with this device. + DeviceDisplayName *string +} + +// PerformDeviceCreationResponse is the response for PerformDeviceCreation +type PerformDeviceCreationResponse struct { + DeviceCreated bool + AccessToken string + DeviceID string +} + // Device represents a client's device (mobile, web, etc) type Device struct { ID string @@ -108,3 +142,22 @@ type ErrorForbidden struct { func (e *ErrorForbidden) Error() string { return "Forbidden: " + e.Message } + +// ErrorConflict is an error indicating that there was a conflict which resulted in the request being aborted. +type ErrorConflict struct { + Message string +} + +func (e *ErrorConflict) Error() string { + return "Conflict: " + e.Message +} + +// Conflict is an enum representing what to do when encountering conflicting when creating profiles/devices +type Conflict int + +const ( + // ConflictUpdate will update matching records returning no error + ConflictUpdate Conflict = 1 + // ConflictAbort will reject the request with ErrorConflict + ConflictAbort Conflict = 2 +) diff --git a/userapi/internal/api.go b/userapi/internal/api.go index 6e737b81f..1b34dc7b7 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -17,6 +17,7 @@ package internal import ( "context" "database/sql" + "errors" "fmt" "github.com/matrix-org/dendrite/appservice/types" @@ -24,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" ) @@ -36,6 +38,38 @@ type UserInternalAPI struct { AppServices []config.ApplicationService } +func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error { + acc, err := a.AccountDB.CreateAccount(ctx, req.Localpart, req.Password, req.AppServiceID) + if err != nil { + if errors.Is(err, sqlutil.ErrUserExists) { // This account already exists + switch req.OnConflict { + case api.ConflictUpdate: + break + case api.ConflictAbort: + return &api.ErrorConflict{ + Message: err.Error(), + } + } + } + res.AccountCreated = false + res.UserID = fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName) + return nil + } + res.AccountCreated = true + res.UserID = acc.UserID + return nil +} +func (a *UserInternalAPI) PerformDeviceCreation(ctx context.Context, req *api.PerformDeviceCreationRequest, res *api.PerformDeviceCreationResponse) error { + dev, err := a.DeviceDB.CreateDevice(ctx, req.Localpart, req.DeviceID, req.AccessToken, req.DeviceDisplayName) + if err != nil { + return err + } + res.DeviceCreated = true + res.AccessToken = dev.AccessToken + res.DeviceID = dev.ID + return nil +} + func (a *UserInternalAPI) QueryProfile(ctx context.Context, req *api.QueryProfileRequest, res *api.QueryProfileResponse) error { local, domain, err := gomatrixserverlib.SplitID('@', req.UserID) if err != nil { diff --git a/userapi/inthttp/client.go b/userapi/inthttp/client.go index 48e6d7d72..0e9628c58 100644 --- a/userapi/inthttp/client.go +++ b/userapi/inthttp/client.go @@ -26,6 +26,9 @@ import ( // HTTP paths for the internal HTTP APIs const ( + PerformDeviceCreationPath = "/userapi/performDeviceCreation" + PerformAccountCreationPath = "/userapi/performAccountCreation" + QueryProfilePath = "/userapi/queryProfile" QueryAccessTokenPath = "/userapi/queryAccessToken" QueryDevicesPath = "/userapi/queryDevices" @@ -52,6 +55,30 @@ type httpUserInternalAPI struct { httpClient *http.Client } +func (h *httpUserInternalAPI) PerformAccountCreation( + ctx context.Context, + request *api.PerformAccountCreationRequest, + response *api.PerformAccountCreationResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformAccountCreation") + defer span.Finish() + + apiURL := h.apiURL + PerformAccountCreationPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +func (h *httpUserInternalAPI) PerformDeviceCreation( + ctx context.Context, + request *api.PerformDeviceCreationRequest, + response *api.PerformDeviceCreationResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformDeviceCreation") + defer span.Finish() + + apiURL := h.apiURL + PerformDeviceCreationPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + func (h *httpUserInternalAPI) QueryProfile( ctx context.Context, request *api.QueryProfileRequest, diff --git a/userapi/inthttp/server.go b/userapi/inthttp/server.go index 8bf2efc01..8f3be7738 100644 --- a/userapi/inthttp/server.go +++ b/userapi/inthttp/server.go @@ -25,6 +25,32 @@ import ( ) func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) { + internalAPIMux.Handle(PerformAccountCreationPath, + httputil.MakeInternalAPI("performAccountCreation", func(req *http.Request) util.JSONResponse { + request := api.PerformAccountCreationRequest{} + response := api.PerformAccountCreationResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := s.PerformAccountCreation(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + internalAPIMux.Handle(PerformDeviceCreationPath, + httputil.MakeInternalAPI("performDeviceCreation", func(req *http.Request) util.JSONResponse { + request := api.PerformDeviceCreationRequest{} + response := api.PerformDeviceCreationResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := s.PerformDeviceCreation(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) internalAPIMux.Handle(QueryProfilePath, httputil.MakeInternalAPI("queryProfile", func(req *http.Request) util.JSONResponse { request := api.QueryProfileRequest{}