syncapi: use finer-grained interfaces when making the syncapi

This commit is contained in:
Kegan Dougal 2022-05-04 17:00:45 +01:00
parent b0a9e85c4a
commit 7e08c3a375
15 changed files with 38 additions and 37 deletions

View file

@ -51,7 +51,7 @@ type AccountDatabase interface {
// Note: For an AS user, AS dummy device is returned. // Note: For an AS user, AS dummy device is returned.
// On failure returns an JSON error response which can be sent to the client. // On failure returns an JSON error response which can be sent to the client.
func VerifyUserFromRequest( func VerifyUserFromRequest(
req *http.Request, userAPI api.UserInternalAPI, req *http.Request, userAPI api.QueryAccountAPI,
) (*api.Device, *util.JSONResponse) { ) (*api.Device, *util.JSONResponse) {
// Try to find the Application Service user // Try to find the Application Service user
token, err := ExtractAccessToken(req) token, err := ExtractAccessToken(req)

View file

@ -22,15 +22,13 @@ import (
func SyncAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { func SyncAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
userAPI := base.UserAPIClient() userAPI := base.UserAPIClient()
federation := base.CreateFederationClient()
rsAPI := base.RoomserverHTTPClient() rsAPI := base.RoomserverHTTPClient()
syncapi.AddPublicRoutes( syncapi.AddPublicRoutes(
base, base,
userAPI, rsAPI, userAPI, userAPI, rsAPI,
base.KeyServerHTTPClient(), base.KeyServerHTTPClient(),
federation,
) )
base.SetupAndServeHTTP( base.SetupAndServeHTTP(

View file

@ -49,7 +49,7 @@ type BasicAuth struct {
// MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which authenticates the request. // MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which authenticates the request.
func MakeAuthAPI( func MakeAuthAPI(
metricsName string, userAPI userapi.UserInternalAPI, metricsName string, userAPI userapi.QueryAccountAPI,
f func(*http.Request, *userapi.Device) util.JSONResponse, f func(*http.Request, *userapi.Device) util.JSONResponse,
) http.Handler { ) http.Handler {
h := func(req *http.Request) util.JSONResponse { h := func(req *http.Request) util.JSONResponse {

View file

@ -27,6 +27,7 @@ import (
) )
type KeyInternalAPI interface { type KeyInternalAPI interface {
SyncKeyAPI
// SetUserAPI assigns a user API to query when extracting device names. // SetUserAPI assigns a user API to query when extracting device names.
SetUserAPI(i userapi.UserInternalAPI) SetUserAPI(i userapi.UserInternalAPI)
// InputDeviceListUpdate from a federated server EDU // InputDeviceListUpdate from a federated server EDU
@ -38,12 +39,16 @@ type KeyInternalAPI interface {
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse) PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse) PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse)
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse) QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse)
QueryOneTimeKeys(ctx context.Context, req *QueryOneTimeKeysRequest, res *QueryOneTimeKeysResponse)
QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse) QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse)
QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse) QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse)
} }
// API functions required by the syncapi
type SyncKeyAPI interface {
QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse)
QueryOneTimeKeys(ctx context.Context, req *QueryOneTimeKeysRequest, res *QueryOneTimeKeysResponse)
}
// KeyError is returned if there was a problem performing/querying the server // KeyError is returned if there was a problem performing/querying the server
type KeyError struct { type KeyError struct {
Err string `json:"error"` Err string `json:"error"`

View file

@ -69,6 +69,6 @@ func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite) {
base, m.UserAPI, m.Client, base, m.UserAPI, m.Client,
) )
syncapi.AddPublicRoutes( syncapi.AddPublicRoutes(
base, m.UserAPI, m.RoomserverAPI, m.KeyAPI, m.FedClient, base, m.UserAPI, m.UserAPI, m.RoomserverAPI, m.KeyAPI,
) )
} }

View file

@ -43,7 +43,6 @@ type OutputKeyChangeEventConsumer struct {
stream types.StreamProvider stream types.StreamProvider
serverName gomatrixserverlib.ServerName // our server name serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.RoomserverInternalAPI rsAPI roomserverAPI.RoomserverInternalAPI
keyAPI api.KeyInternalAPI
} }
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@ -53,7 +52,6 @@ func NewOutputKeyChangeEventConsumer(
cfg *config.SyncAPI, cfg *config.SyncAPI,
topic string, topic string,
js nats.JetStreamContext, js nats.JetStreamContext,
keyAPI api.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
@ -66,7 +64,6 @@ func NewOutputKeyChangeEventConsumer(
topic: topic, topic: topic,
db: store, db: store,
serverName: cfg.Matrix.ServerName, serverName: cfg.Matrix.ServerName,
keyAPI: keyAPI,
rsAPI: rsAPI, rsAPI: rsAPI,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,

View file

@ -29,7 +29,7 @@ import (
const DeviceListLogName = "dl" const DeviceListLogName = "dl"
// DeviceOTKCounts adds one-time key counts to the /sync response // DeviceOTKCounts adds one-time key counts to the /sync response
func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, deviceID string, res *types.Response) error { func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.SyncKeyAPI, userID, deviceID string, res *types.Response) error {
var queryRes keyapi.QueryOneTimeKeysResponse var queryRes keyapi.QueryOneTimeKeysResponse
keyAPI.QueryOneTimeKeys(ctx, &keyapi.QueryOneTimeKeysRequest{ keyAPI.QueryOneTimeKeys(ctx, &keyapi.QueryOneTimeKeysRequest{
UserID: userID, UserID: userID,
@ -46,7 +46,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST // was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
// be already filled in with join/leave information. // be already filled in with join/leave information.
func DeviceListCatchup( func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ctx context.Context, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
userID string, res *types.Response, from, to types.StreamPosition, userID string, res *types.Response, from, to types.StreamPosition,
) (newPos types.StreamPosition, hasNew bool, err error) { ) (newPos types.StreamPosition, hasNew bool, err error) {

View file

@ -37,7 +37,6 @@ type messagesReq struct {
ctx context.Context ctx context.Context
db storage.Database db storage.Database
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
federation *gomatrixserverlib.FederationClient
cfg *config.SyncAPI cfg *config.SyncAPI
roomID string roomID string
from *types.TopologyToken from *types.TopologyToken
@ -61,7 +60,6 @@ type messagesResp struct {
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages // See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
func OnIncomingMessagesRequest( func OnIncomingMessagesRequest(
req *http.Request, db storage.Database, roomID string, device *userapi.Device, req *http.Request, db storage.Database, roomID string, device *userapi.Device,
federation *gomatrixserverlib.FederationClient,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
cfg *config.SyncAPI, cfg *config.SyncAPI,
srp *sync.RequestPool, srp *sync.RequestPool,
@ -180,7 +178,6 @@ func OnIncomingMessagesRequest(
ctx: req.Context(), ctx: req.Context(),
db: db, db: db,
rsAPI: rsAPI, rsAPI: rsAPI,
federation: federation,
cfg: cfg, cfg: cfg,
roomID: roomID, roomID: roomID,
from: &from, from: &from,

View file

@ -36,7 +36,7 @@ import (
// nolint: gocyclo // nolint: gocyclo
func Setup( func Setup(
csMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database, csMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database,
userAPI userapi.UserInternalAPI, federation *gomatrixserverlib.FederationClient, userAPI userapi.QueryAccountAPI,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
cfg *config.SyncAPI, cfg *config.SyncAPI,
lazyLoadCache *caching.LazyLoadCache, lazyLoadCache *caching.LazyLoadCache,
@ -53,7 +53,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, federation, rsAPI, cfg, srp, lazyLoadCache) return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, rsAPI, cfg, srp, lazyLoadCache)
})).Methods(http.MethodGet, http.MethodOptions) })).Methods(http.MethodGet, http.MethodOptions)
v3mux.Handle("/user/{userId}/filter", v3mux.Handle("/user/{userId}/filter",

View file

@ -10,7 +10,7 @@ import (
type AccountDataStreamProvider struct { type AccountDataStreamProvider struct {
StreamProvider StreamProvider
userAPI userapi.UserInternalAPI userAPI userapi.QueryAccountAPI
} }
func (p *AccountDataStreamProvider) Setup() { func (p *AccountDataStreamProvider) Setup() {

View file

@ -12,7 +12,7 @@ import (
type DeviceListStreamProvider struct { type DeviceListStreamProvider struct {
StreamProvider StreamProvider
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
keyAPI keyapi.KeyInternalAPI keyAPI keyapi.SyncKeyAPI
} }
func (p *DeviceListStreamProvider) CompleteSync( func (p *DeviceListStreamProvider) CompleteSync(

View file

@ -25,8 +25,8 @@ type Streams struct {
} }
func NewSyncStreamProviders( func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI, d storage.Database, userAPI userapi.QueryAccountAPI,
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.SyncKeyAPI,
eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier, eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier,
) *Streams { ) *Streams {
streams := &Streams{ streams := &Streams{

View file

@ -45,8 +45,8 @@ import (
type RequestPool struct { type RequestPool struct {
db storage.Database db storage.Database
cfg *config.SyncAPI cfg *config.SyncAPI
userAPI userapi.UserInternalAPI userAPI userapi.UserDeviceAPI
keyAPI keyapi.KeyInternalAPI keyAPI keyapi.SyncKeyAPI
rsAPI roomserverAPI.RoomserverInternalAPI rsAPI roomserverAPI.RoomserverInternalAPI
lastseen *sync.Map lastseen *sync.Map
presence *sync.Map presence *sync.Map
@ -62,7 +62,7 @@ type PresencePublisher interface {
// NewRequestPool makes a new RequestPool // NewRequestPool makes a new RequestPool
func NewRequestPool( func NewRequestPool(
db storage.Database, cfg *config.SyncAPI, db storage.Database, cfg *config.SyncAPI,
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, userAPI userapi.UserDeviceAPI, keyAPI keyapi.SyncKeyAPI,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
streams *streams.Streams, notifier *notifier.Notifier, streams *streams.Streams, notifier *notifier.Notifier,
producer PresencePublisher, producer PresencePublisher,

View file

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/consumers"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
@ -40,10 +39,10 @@ import (
// component. // component.
func AddPublicRoutes( func AddPublicRoutes(
base *base.BaseDendrite, base *base.BaseDendrite,
userAPI userapi.UserInternalAPI, userDeviceAPI userapi.UserDeviceAPI,
userQueryAPI userapi.QueryAccountAPI,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
keyAPI keyapi.KeyInternalAPI, keyAPI keyapi.SyncKeyAPI,
federation *gomatrixserverlib.FederationClient,
) { ) {
cfg := &base.Cfg.SyncAPI cfg := &base.Cfg.SyncAPI
@ -60,7 +59,7 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to create lazy loading cache") logrus.WithError(err).Panicf("failed to create lazy loading cache")
} }
notifier := notifier.NewNotifier() notifier := notifier.NewNotifier()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier) streams := streams.NewSyncStreamProviders(syncDB, userQueryAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier)
notifier.SetCurrentPosition(streams.Latest(context.Background())) notifier.SetCurrentPosition(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil { if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ") logrus.WithError(err).Panicf("failed to load notifier ")
@ -71,7 +70,7 @@ func AddPublicRoutes(
JetStream: js, JetStream: js,
} }
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer) requestPool := sync.NewRequestPool(syncDB, cfg, userDeviceAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer)
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js, JetStream: js,
@ -85,7 +84,7 @@ func AddPublicRoutes(
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier, js, rsAPI, syncDB, notifier,
streams.DeviceListStreamProvider, streams.DeviceListStreamProvider,
) )
if err = keyChangeConsumer.Start(); err != nil { if err = keyChangeConsumer.Start(); err != nil {
@ -140,14 +139,14 @@ func AddPublicRoutes(
presenceConsumer := consumers.NewPresenceConsumer( presenceConsumer := consumers.NewPresenceConsumer(
base.ProcessContext, cfg, js, natsClient, syncDB, base.ProcessContext, cfg, js, natsClient, syncDB,
notifier, streams.PresenceStreamProvider, notifier, streams.PresenceStreamProvider,
userAPI, userDeviceAPI,
) )
if err = presenceConsumer.Start(); err != nil { if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start presence consumer") logrus.WithError(err).Panicf("failed to start presence consumer")
} }
routing.Setup( routing.Setup(
base.PublicClientAPIMux, requestPool, syncDB, userAPI, base.PublicClientAPIMux, requestPool, syncDB, userQueryAPI,
federation, rsAPI, cfg, lazyLoadCache, rsAPI, cfg, lazyLoadCache,
) )
} }

View file

@ -32,6 +32,7 @@ type UserInternalAPI interface {
UserAccountAPI UserAccountAPI
UserThreePIDAPI UserThreePIDAPI
UserDeviceAPI UserDeviceAPI
QueryAccountAPI
InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error
@ -42,14 +43,18 @@ type UserInternalAPI interface {
PerformPushRulesPut(ctx context.Context, req *PerformPushRulesPutRequest, res *struct{}) error PerformPushRulesPut(ctx context.Context, req *PerformPushRulesPutRequest, res *struct{}) error
QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse) QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse)
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error
QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error
QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error
QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error
QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error
} }
type QueryAccountAPI interface {
QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
}
type UserDeviceAPI interface { type UserDeviceAPI interface {
PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error
PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error