From fb708845d419612179abc1c5c04aad4dca92f75f Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 4 May 2022 17:33:06 +0100 Subject: [PATCH] Use specific interfaces for syncapi-roomserver interactions --- roomserver/api/api.go | 81 ++++++++++++++------------- setup/monolith.go | 2 +- syncapi/consumers/keychange.go | 4 +- syncapi/consumers/presence.go | 4 +- syncapi/consumers/roomserver.go | 4 +- syncapi/internal/keychange.go | 6 +- syncapi/routing/context.go | 2 +- syncapi/routing/messages.go | 6 +- syncapi/routing/routing.go | 4 +- syncapi/streams/stream_accountdata.go | 2 +- syncapi/streams/stream_devicelist.go | 2 +- syncapi/streams/stream_pdu.go | 2 +- syncapi/streams/streams.go | 4 +- syncapi/sync/requestpool.go | 8 +-- syncapi/syncapi.go | 13 ++--- userapi/api/api.go | 8 +-- 16 files changed, 75 insertions(+), 77 deletions(-) diff --git a/roomserver/api/api.go b/roomserver/api/api.go index f0ca8a615..2e4ec3ffd 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -12,6 +12,8 @@ import ( // RoomserverInputAPI is used to write events to the room server. type RoomserverInternalAPI interface { + SyncRoomserverAPI + // needed to avoid chicken and egg scenario when setting up the // interdependencies between the roomserver and other input APIs SetFederationAPI(fsAPI fsAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing) @@ -78,34 +80,6 @@ type RoomserverInternalAPI interface { res *QueryPublishedRoomsResponse, ) error - // Query the latest events and state for a room from the room server. - QueryLatestEventsAndState( - ctx context.Context, - request *QueryLatestEventsAndStateRequest, - response *QueryLatestEventsAndStateResponse, - ) error - - // Query the state after a list of events in a room from the room server. - QueryStateAfterEvents( - ctx context.Context, - request *QueryStateAfterEventsRequest, - response *QueryStateAfterEventsResponse, - ) error - - // Query a list of events by event ID. - QueryEventsByID( - ctx context.Context, - request *QueryEventsByIDRequest, - response *QueryEventsByIDResponse, - ) error - - // Query the membership event for an user for a room. - QueryMembershipForUser( - ctx context.Context, - request *QueryMembershipForUserRequest, - response *QueryMembershipForUserResponse, - ) error - // Query a list of membership events for a room QueryMembershipsForRoom( ctx context.Context, @@ -157,22 +131,11 @@ type RoomserverInternalAPI interface { QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error // QueryRoomsForUser retrieves a list of room IDs matching the given query. QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error - // QueryBulkStateContent does a bulk query for state event content in the given rooms. - QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error - // QuerySharedUsers returns a list of users who share at least 1 room in common with the given user. - QuerySharedUsers(ctx context.Context, req *QuerySharedUsersRequest, res *QuerySharedUsersResponse) error // QueryKnownUsers returns a list of users that we know about from our joined rooms. QueryKnownUsers(ctx context.Context, req *QueryKnownUsersRequest, res *QueryKnownUsersResponse) error // QueryServerBannedFromRoom returns whether a server is banned from a room by server ACLs. QueryServerBannedFromRoom(ctx context.Context, req *QueryServerBannedFromRoomRequest, res *QueryServerBannedFromRoomResponse) error - // Query a given amount (or less) of events prior to a given set of events. - PerformBackfill( - ctx context.Context, - request *PerformBackfillRequest, - response *PerformBackfillResponse, - ) error - // PerformForget forgets a rooms history for a specific user PerformForget(ctx context.Context, req *PerformForgetRequest, resp *PerformForgetResponse) error @@ -228,3 +191,43 @@ type RoomserverInternalAPI interface { response *RemoveRoomAliasResponse, ) error } + +// API functions required by the syncapi +type SyncRoomserverAPI interface { + // Query the latest events and state for a room from the room server. + QueryLatestEventsAndState( + ctx context.Context, + request *QueryLatestEventsAndStateRequest, + response *QueryLatestEventsAndStateResponse, + ) error + // QueryBulkStateContent does a bulk query for state event content in the given rooms. + QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error + // QuerySharedUsers returns a list of users who share at least 1 room in common with the given user. + QuerySharedUsers(ctx context.Context, req *QuerySharedUsersRequest, res *QuerySharedUsersResponse) error + // Query a list of events by event ID. + QueryEventsByID( + ctx context.Context, + request *QueryEventsByIDRequest, + response *QueryEventsByIDResponse, + ) error + // Query the membership event for an user for a room. + QueryMembershipForUser( + ctx context.Context, + request *QueryMembershipForUserRequest, + response *QueryMembershipForUserResponse, + ) error + + // Query the state after a list of events in a room from the room server. + QueryStateAfterEvents( + ctx context.Context, + request *QueryStateAfterEventsRequest, + response *QueryStateAfterEventsResponse, + ) error + + // Query a given amount (or less) of events prior to a given set of events. + PerformBackfill( + ctx context.Context, + request *PerformBackfillRequest, + response *PerformBackfillResponse, + ) error +} diff --git a/setup/monolith.go b/setup/monolith.go index 11a942cc3..e033c14d7 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -69,6 +69,6 @@ func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite) { base, m.UserAPI, m.Client, ) syncapi.AddPublicRoutes( - base, m.UserAPI, m.UserAPI, m.RoomserverAPI, m.KeyAPI, + base, m.UserAPI, m.RoomserverAPI, m.KeyAPI, ) } diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index d845e5012..c8d88ddac 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -42,7 +42,7 @@ type OutputKeyChangeEventConsumer struct { notifier *notifier.Notifier stream types.StreamProvider serverName gomatrixserverlib.ServerName // our server name - rsAPI roomserverAPI.RoomserverInternalAPI + rsAPI roomserverAPI.SyncRoomserverAPI } // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. @@ -52,7 +52,7 @@ func NewOutputKeyChangeEventConsumer( cfg *config.SyncAPI, topic string, js nats.JetStreamContext, - rsAPI roomserverAPI.RoomserverInternalAPI, + rsAPI roomserverAPI.SyncRoomserverAPI, store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go index 6bcca48f4..388c08ff4 100644 --- a/syncapi/consumers/presence.go +++ b/syncapi/consumers/presence.go @@ -41,7 +41,7 @@ type PresenceConsumer struct { db storage.Database stream types.StreamProvider notifier *notifier.Notifier - deviceAPI api.UserDeviceAPI + deviceAPI api.SyncUserAPI cfg *config.SyncAPI } @@ -55,7 +55,7 @@ func NewPresenceConsumer( db storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, - deviceAPI api.UserDeviceAPI, + deviceAPI api.SyncUserAPI, ) *PresenceConsumer { return &PresenceConsumer{ ctx: process.Context(), diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 5bdc0fad7..7712c8403 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -38,7 +38,7 @@ import ( type OutputRoomEventConsumer struct { ctx context.Context cfg *config.SyncAPI - rsAPI api.RoomserverInternalAPI + rsAPI api.SyncRoomserverAPI jetstream nats.JetStreamContext durable string topic string @@ -58,7 +58,7 @@ func NewOutputRoomEventConsumer( notifier *notifier.Notifier, pduStream types.StreamProvider, inviteStream types.StreamProvider, - rsAPI api.RoomserverInternalAPI, + rsAPI api.SyncRoomserverAPI, producer *producers.UserAPIStreamEventProducer, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index 71a2d248c..d96718d20 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -46,7 +46,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.SyncKeyAPI, userID, devi // was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST // be already filled in with join/leave information. func DeviceListCatchup( - ctx context.Context, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.RoomserverInternalAPI, + ctx context.Context, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, res *types.Response, from, to types.StreamPosition, ) (newPos types.StreamPosition, hasNew bool, err error) { @@ -130,7 +130,7 @@ func DeviceListCatchup( // TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. func TrackChangedUsers( - ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, + ctx context.Context, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, ) (changed, left []string, err error) { // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. @@ -216,7 +216,7 @@ func TrackChangedUsers( } func filterSharedUsers( - ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, usersWithChangedKeys []string, + ctx context.Context, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, usersWithChangedKeys []string, ) (map[string]int, []string) { var result []string var sharedUsersRes roomserverAPI.QuerySharedUsersResponse diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index 17215b669..f5f4b2dd0 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -42,7 +42,7 @@ type ContextRespsonse struct { func Context( req *http.Request, device *userapi.Device, - rsAPI roomserver.RoomserverInternalAPI, + rsAPI roomserver.SyncRoomserverAPI, syncDB storage.Database, roomID, eventID string, lazyLoadCache *caching.LazyLoadCache, diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index a71e47ca9..f19dfaed3 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -36,7 +36,7 @@ import ( type messagesReq struct { ctx context.Context db storage.Database - rsAPI api.RoomserverInternalAPI + rsAPI api.SyncRoomserverAPI cfg *config.SyncAPI roomID string from *types.TopologyToken @@ -60,7 +60,7 @@ type messagesResp struct { // See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages func OnIncomingMessagesRequest( req *http.Request, db storage.Database, roomID string, device *userapi.Device, - rsAPI api.RoomserverInternalAPI, + rsAPI api.SyncRoomserverAPI, cfg *config.SyncAPI, srp *sync.RequestPool, lazyLoadCache *caching.LazyLoadCache, @@ -244,7 +244,7 @@ func OnIncomingMessagesRequest( } } -func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.RoomserverInternalAPI) (bool, error) { +func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (bool, error) { req := api.QueryMembershipForUserRequest{ RoomID: roomID, UserID: userID, diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go index 3df6a976c..245ee5b66 100644 --- a/syncapi/routing/routing.go +++ b/syncapi/routing/routing.go @@ -36,8 +36,8 @@ import ( // nolint: gocyclo func Setup( csMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database, - userAPI userapi.QueryAccountAPI, - rsAPI api.RoomserverInternalAPI, + userAPI userapi.SyncUserAPI, + rsAPI api.SyncRoomserverAPI, cfg *config.SyncAPI, lazyLoadCache *caching.LazyLoadCache, ) { diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index dfbde4317..9c19b846b 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -10,7 +10,7 @@ import ( type AccountDataStreamProvider struct { StreamProvider - userAPI userapi.QueryAccountAPI + userAPI userapi.SyncUserAPI } func (p *AccountDataStreamProvider) Setup() { diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index 5527aebd1..f42099510 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -11,7 +11,7 @@ import ( type DeviceListStreamProvider struct { StreamProvider - rsAPI api.RoomserverInternalAPI + rsAPI api.SyncRoomserverAPI keyAPI keyapi.SyncKeyAPI } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 0d033095d..f774a1af8 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -33,7 +33,7 @@ type PDUStreamProvider struct { workers atomic.Int32 // userID+deviceID -> lazy loading cache lazyLoadCache *caching.LazyLoadCache - rsAPI roomserverAPI.RoomserverInternalAPI + rsAPI roomserverAPI.SyncRoomserverAPI } func (p *PDUStreamProvider) worker() { diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index 5a3355e05..af2a0387e 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -25,8 +25,8 @@ type Streams struct { } func NewSyncStreamProviders( - d storage.Database, userAPI userapi.QueryAccountAPI, - rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.SyncKeyAPI, + d storage.Database, userAPI userapi.SyncUserAPI, + rsAPI rsapi.SyncRoomserverAPI, keyAPI keyapi.SyncKeyAPI, eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier, ) *Streams { streams := &Streams{ diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index e31b1b37b..9616b3238 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -45,9 +45,9 @@ import ( type RequestPool struct { db storage.Database cfg *config.SyncAPI - userAPI userapi.UserDeviceAPI + userAPI userapi.SyncUserAPI keyAPI keyapi.SyncKeyAPI - rsAPI roomserverAPI.RoomserverInternalAPI + rsAPI roomserverAPI.SyncRoomserverAPI lastseen *sync.Map presence *sync.Map streams *streams.Streams @@ -62,8 +62,8 @@ type PresencePublisher interface { // NewRequestPool makes a new RequestPool func NewRequestPool( db storage.Database, cfg *config.SyncAPI, - userAPI userapi.UserDeviceAPI, keyAPI keyapi.SyncKeyAPI, - rsAPI roomserverAPI.RoomserverInternalAPI, + userAPI userapi.SyncUserAPI, keyAPI keyapi.SyncKeyAPI, + rsAPI roomserverAPI.SyncRoomserverAPI, streams *streams.Streams, notifier *notifier.Notifier, producer PresencePublisher, ) *RequestPool { diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index b39cfcafa..686e2044f 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -39,9 +39,8 @@ import ( // component. func AddPublicRoutes( base *base.BaseDendrite, - userDeviceAPI userapi.UserDeviceAPI, - userQueryAPI userapi.QueryAccountAPI, - rsAPI api.RoomserverInternalAPI, + userAPI userapi.SyncUserAPI, + rsAPI api.SyncRoomserverAPI, keyAPI keyapi.SyncKeyAPI, ) { cfg := &base.Cfg.SyncAPI @@ -59,7 +58,7 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to create lazy loading cache") } notifier := notifier.NewNotifier() - streams := streams.NewSyncStreamProviders(syncDB, userQueryAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier) + streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier) notifier.SetCurrentPosition(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { logrus.WithError(err).Panicf("failed to load notifier ") @@ -70,7 +69,7 @@ func AddPublicRoutes( JetStream: js, } - requestPool := sync.NewRequestPool(syncDB, cfg, userDeviceAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer) + requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer) userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ JetStream: js, @@ -139,14 +138,14 @@ func AddPublicRoutes( presenceConsumer := consumers.NewPresenceConsumer( base.ProcessContext, cfg, js, natsClient, syncDB, notifier, streams.PresenceStreamProvider, - userDeviceAPI, + userAPI, ) if err = presenceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start presence consumer") } routing.Setup( - base.PublicClientAPIMux, requestPool, syncDB, userQueryAPI, + base.PublicClientAPIMux, requestPool, syncDB, userAPI, rsAPI, cfg, lazyLoadCache, ) } diff --git a/userapi/api/api.go b/userapi/api/api.go index 7efe4ed25..70b70800a 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -31,8 +31,7 @@ type UserInternalAPI interface { UserRegisterAPI UserAccountAPI UserThreePIDAPI - UserDeviceAPI - QueryAccountAPI + SyncUserAPI InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error @@ -50,12 +49,9 @@ type UserInternalAPI interface { QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error } -type QueryAccountAPI interface { +type SyncUserAPI interface { QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error -} - -type UserDeviceAPI interface { PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error