From 7e08c3a375629baf95d8e009e7e79b91b20a8444 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 4 May 2022 17:00:45 +0100 Subject: [PATCH] syncapi: use finer-grained interfaces when making the syncapi --- clientapi/auth/auth.go | 2 +- .../personalities/syncapi.go | 4 +--- internal/httputil/httpapi.go | 2 +- keyserver/api/api.go | 9 +++++++-- setup/monolith.go | 2 +- syncapi/consumers/keychange.go | 3 --- syncapi/internal/keychange.go | 4 ++-- syncapi/routing/messages.go | 3 --- syncapi/routing/routing.go | 4 ++-- syncapi/streams/stream_accountdata.go | 2 +- syncapi/streams/stream_devicelist.go | 2 +- syncapi/streams/streams.go | 4 ++-- syncapi/sync/requestpool.go | 6 +++--- syncapi/syncapi.go | 19 +++++++++---------- userapi/api/api.go | 9 +++++++-- 15 files changed, 38 insertions(+), 37 deletions(-) diff --git a/clientapi/auth/auth.go b/clientapi/auth/auth.go index 575c5377f..569060f01 100644 --- a/clientapi/auth/auth.go +++ b/clientapi/auth/auth.go @@ -51,7 +51,7 @@ type AccountDatabase interface { // Note: For an AS user, AS dummy device is returned. // On failure returns an JSON error response which can be sent to the client. func VerifyUserFromRequest( - req *http.Request, userAPI api.UserInternalAPI, + req *http.Request, userAPI api.QueryAccountAPI, ) (*api.Device, *util.JSONResponse) { // Try to find the Application Service user token, err := ExtractAccessToken(req) diff --git a/cmd/dendrite-polylith-multi/personalities/syncapi.go b/cmd/dendrite-polylith-multi/personalities/syncapi.go index 2245b9b54..e22eeae9f 100644 --- a/cmd/dendrite-polylith-multi/personalities/syncapi.go +++ b/cmd/dendrite-polylith-multi/personalities/syncapi.go @@ -22,15 +22,13 @@ import ( func SyncAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { userAPI := base.UserAPIClient() - federation := base.CreateFederationClient() rsAPI := base.RoomserverHTTPClient() syncapi.AddPublicRoutes( base, - userAPI, rsAPI, + userAPI, userAPI, rsAPI, base.KeyServerHTTPClient(), - federation, ) base.SetupAndServeHTTP( diff --git a/internal/httputil/httpapi.go b/internal/httputil/httpapi.go index 5fcacd2ad..6ef2dbf7a 100644 --- a/internal/httputil/httpapi.go +++ b/internal/httputil/httpapi.go @@ -49,7 +49,7 @@ type BasicAuth struct { // MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which authenticates the request. func MakeAuthAPI( - metricsName string, userAPI userapi.UserInternalAPI, + metricsName string, userAPI userapi.QueryAccountAPI, f func(*http.Request, *userapi.Device) util.JSONResponse, ) http.Handler { h := func(req *http.Request) util.JSONResponse { diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 429617b10..ce651ba4e 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -27,6 +27,7 @@ import ( ) type KeyInternalAPI interface { + SyncKeyAPI // SetUserAPI assigns a user API to query when extracting device names. SetUserAPI(i userapi.UserInternalAPI) // InputDeviceListUpdate from a federated server EDU @@ -38,12 +39,16 @@ type KeyInternalAPI interface { PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse) PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse) QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse) - QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse) - QueryOneTimeKeys(ctx context.Context, req *QueryOneTimeKeysRequest, res *QueryOneTimeKeysResponse) QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse) QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse) } +// API functions required by the syncapi +type SyncKeyAPI interface { + QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse) + QueryOneTimeKeys(ctx context.Context, req *QueryOneTimeKeysRequest, res *QueryOneTimeKeysResponse) +} + // KeyError is returned if there was a problem performing/querying the server type KeyError struct { Err string `json:"error"` diff --git a/setup/monolith.go b/setup/monolith.go index 23bd2fb52..11a942cc3 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -69,6 +69,6 @@ func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite) { base, m.UserAPI, m.Client, ) syncapi.AddPublicRoutes( - base, m.UserAPI, m.RoomserverAPI, m.KeyAPI, m.FedClient, + base, m.UserAPI, m.UserAPI, m.RoomserverAPI, m.KeyAPI, ) } diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index e806f76e6..d845e5012 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -43,7 +43,6 @@ type OutputKeyChangeEventConsumer struct { stream types.StreamProvider serverName gomatrixserverlib.ServerName // our server name rsAPI roomserverAPI.RoomserverInternalAPI - keyAPI api.KeyInternalAPI } // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. @@ -53,7 +52,6 @@ func NewOutputKeyChangeEventConsumer( cfg *config.SyncAPI, topic string, js nats.JetStreamContext, - keyAPI api.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, store storage.Database, notifier *notifier.Notifier, @@ -66,7 +64,6 @@ func NewOutputKeyChangeEventConsumer( topic: topic, db: store, serverName: cfg.Matrix.ServerName, - keyAPI: keyAPI, rsAPI: rsAPI, notifier: notifier, stream: stream, diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index dc4acd8da..71a2d248c 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -29,7 +29,7 @@ import ( const DeviceListLogName = "dl" // DeviceOTKCounts adds one-time key counts to the /sync response -func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, deviceID string, res *types.Response) error { +func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.SyncKeyAPI, userID, deviceID string, res *types.Response) error { var queryRes keyapi.QueryOneTimeKeysResponse keyAPI.QueryOneTimeKeys(ctx, &keyapi.QueryOneTimeKeysRequest{ UserID: userID, @@ -46,7 +46,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, // was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST // be already filled in with join/leave information. func DeviceListCatchup( - ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, + ctx context.Context, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, res *types.Response, from, to types.StreamPosition, ) (newPos types.StreamPosition, hasNew bool, err error) { diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index f34901bf2..a71e47ca9 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -37,7 +37,6 @@ type messagesReq struct { ctx context.Context db storage.Database rsAPI api.RoomserverInternalAPI - federation *gomatrixserverlib.FederationClient cfg *config.SyncAPI roomID string from *types.TopologyToken @@ -61,7 +60,6 @@ type messagesResp struct { // See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages func OnIncomingMessagesRequest( req *http.Request, db storage.Database, roomID string, device *userapi.Device, - federation *gomatrixserverlib.FederationClient, rsAPI api.RoomserverInternalAPI, cfg *config.SyncAPI, srp *sync.RequestPool, @@ -180,7 +178,6 @@ func OnIncomingMessagesRequest( ctx: req.Context(), db: db, rsAPI: rsAPI, - federation: federation, cfg: cfg, roomID: roomID, from: &from, diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go index 4102cf073..3df6a976c 100644 --- a/syncapi/routing/routing.go +++ b/syncapi/routing/routing.go @@ -36,7 +36,7 @@ import ( // nolint: gocyclo func Setup( csMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database, - userAPI userapi.UserInternalAPI, federation *gomatrixserverlib.FederationClient, + userAPI userapi.QueryAccountAPI, rsAPI api.RoomserverInternalAPI, cfg *config.SyncAPI, lazyLoadCache *caching.LazyLoadCache, @@ -53,7 +53,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, federation, rsAPI, cfg, srp, lazyLoadCache) + return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, rsAPI, cfg, srp, lazyLoadCache) })).Methods(http.MethodGet, http.MethodOptions) v3mux.Handle("/user/{userId}/filter", diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index 2cddbcf04..dfbde4317 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -10,7 +10,7 @@ import ( type AccountDataStreamProvider struct { StreamProvider - userAPI userapi.UserInternalAPI + userAPI userapi.QueryAccountAPI } func (p *AccountDataStreamProvider) Setup() { diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index 6ff8a7fd5..5527aebd1 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -12,7 +12,7 @@ import ( type DeviceListStreamProvider struct { StreamProvider rsAPI api.RoomserverInternalAPI - keyAPI keyapi.KeyInternalAPI + keyAPI keyapi.SyncKeyAPI } func (p *DeviceListStreamProvider) CompleteSync( diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index a18a0cc41..5a3355e05 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -25,8 +25,8 @@ type Streams struct { } func NewSyncStreamProviders( - d storage.Database, userAPI userapi.UserInternalAPI, - rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, + d storage.Database, userAPI userapi.QueryAccountAPI, + rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.SyncKeyAPI, eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier, ) *Streams { streams := &Streams{ diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 76d550a65..e31b1b37b 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -45,8 +45,8 @@ import ( type RequestPool struct { db storage.Database cfg *config.SyncAPI - userAPI userapi.UserInternalAPI - keyAPI keyapi.KeyInternalAPI + userAPI userapi.UserDeviceAPI + keyAPI keyapi.SyncKeyAPI rsAPI roomserverAPI.RoomserverInternalAPI lastseen *sync.Map presence *sync.Map @@ -62,7 +62,7 @@ type PresencePublisher interface { // NewRequestPool makes a new RequestPool func NewRequestPool( db storage.Database, cfg *config.SyncAPI, - userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, + userAPI userapi.UserDeviceAPI, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.RoomserverInternalAPI, streams *streams.Streams, notifier *notifier.Notifier, producer PresencePublisher, diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index d8becb6ed..b39cfcafa 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/notifier" @@ -40,10 +39,10 @@ import ( // component. func AddPublicRoutes( base *base.BaseDendrite, - userAPI userapi.UserInternalAPI, + userDeviceAPI userapi.UserDeviceAPI, + userQueryAPI userapi.QueryAccountAPI, rsAPI api.RoomserverInternalAPI, - keyAPI keyapi.KeyInternalAPI, - federation *gomatrixserverlib.FederationClient, + keyAPI keyapi.SyncKeyAPI, ) { cfg := &base.Cfg.SyncAPI @@ -60,7 +59,7 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to create lazy loading cache") } notifier := notifier.NewNotifier() - streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier) + streams := streams.NewSyncStreamProviders(syncDB, userQueryAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier) notifier.SetCurrentPosition(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { logrus.WithError(err).Panicf("failed to load notifier ") @@ -71,7 +70,7 @@ func AddPublicRoutes( JetStream: js, } - requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer) + requestPool := sync.NewRequestPool(syncDB, cfg, userDeviceAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer) userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ JetStream: js, @@ -85,7 +84,7 @@ func AddPublicRoutes( keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), - js, keyAPI, rsAPI, syncDB, notifier, + js, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) if err = keyChangeConsumer.Start(); err != nil { @@ -140,14 +139,14 @@ func AddPublicRoutes( presenceConsumer := consumers.NewPresenceConsumer( base.ProcessContext, cfg, js, natsClient, syncDB, notifier, streams.PresenceStreamProvider, - userAPI, + userDeviceAPI, ) if err = presenceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start presence consumer") } routing.Setup( - base.PublicClientAPIMux, requestPool, syncDB, userAPI, - federation, rsAPI, cfg, lazyLoadCache, + base.PublicClientAPIMux, requestPool, syncDB, userQueryAPI, + rsAPI, cfg, lazyLoadCache, ) } diff --git a/userapi/api/api.go b/userapi/api/api.go index 6aa6a6842..7efe4ed25 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -32,6 +32,7 @@ type UserInternalAPI interface { UserAccountAPI UserThreePIDAPI UserDeviceAPI + QueryAccountAPI InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error @@ -42,14 +43,18 @@ type UserInternalAPI interface { PerformPushRulesPut(ctx context.Context, req *PerformPushRulesPutRequest, res *struct{}) error QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse) - QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error - QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error + QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error } +type QueryAccountAPI interface { + QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error + QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error +} + type UserDeviceAPI interface { PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error