mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-06 13:43:09 -06:00
Use specific interfaces for syncapi-roomserver interactions
This commit is contained in:
parent
7e08c3a375
commit
fb708845d4
|
|
@ -12,6 +12,8 @@ import (
|
|||
|
||||
// RoomserverInputAPI is used to write events to the room server.
|
||||
type RoomserverInternalAPI interface {
|
||||
SyncRoomserverAPI
|
||||
|
||||
// needed to avoid chicken and egg scenario when setting up the
|
||||
// interdependencies between the roomserver and other input APIs
|
||||
SetFederationAPI(fsAPI fsAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing)
|
||||
|
|
@ -78,34 +80,6 @@ type RoomserverInternalAPI interface {
|
|||
res *QueryPublishedRoomsResponse,
|
||||
) error
|
||||
|
||||
// Query the latest events and state for a room from the room server.
|
||||
QueryLatestEventsAndState(
|
||||
ctx context.Context,
|
||||
request *QueryLatestEventsAndStateRequest,
|
||||
response *QueryLatestEventsAndStateResponse,
|
||||
) error
|
||||
|
||||
// Query the state after a list of events in a room from the room server.
|
||||
QueryStateAfterEvents(
|
||||
ctx context.Context,
|
||||
request *QueryStateAfterEventsRequest,
|
||||
response *QueryStateAfterEventsResponse,
|
||||
) error
|
||||
|
||||
// Query a list of events by event ID.
|
||||
QueryEventsByID(
|
||||
ctx context.Context,
|
||||
request *QueryEventsByIDRequest,
|
||||
response *QueryEventsByIDResponse,
|
||||
) error
|
||||
|
||||
// Query the membership event for an user for a room.
|
||||
QueryMembershipForUser(
|
||||
ctx context.Context,
|
||||
request *QueryMembershipForUserRequest,
|
||||
response *QueryMembershipForUserResponse,
|
||||
) error
|
||||
|
||||
// Query a list of membership events for a room
|
||||
QueryMembershipsForRoom(
|
||||
ctx context.Context,
|
||||
|
|
@ -157,22 +131,11 @@ type RoomserverInternalAPI interface {
|
|||
QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error
|
||||
// QueryRoomsForUser retrieves a list of room IDs matching the given query.
|
||||
QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error
|
||||
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
||||
QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error
|
||||
// QuerySharedUsers returns a list of users who share at least 1 room in common with the given user.
|
||||
QuerySharedUsers(ctx context.Context, req *QuerySharedUsersRequest, res *QuerySharedUsersResponse) error
|
||||
// QueryKnownUsers returns a list of users that we know about from our joined rooms.
|
||||
QueryKnownUsers(ctx context.Context, req *QueryKnownUsersRequest, res *QueryKnownUsersResponse) error
|
||||
// QueryServerBannedFromRoom returns whether a server is banned from a room by server ACLs.
|
||||
QueryServerBannedFromRoom(ctx context.Context, req *QueryServerBannedFromRoomRequest, res *QueryServerBannedFromRoomResponse) error
|
||||
|
||||
// Query a given amount (or less) of events prior to a given set of events.
|
||||
PerformBackfill(
|
||||
ctx context.Context,
|
||||
request *PerformBackfillRequest,
|
||||
response *PerformBackfillResponse,
|
||||
) error
|
||||
|
||||
// PerformForget forgets a rooms history for a specific user
|
||||
PerformForget(ctx context.Context, req *PerformForgetRequest, resp *PerformForgetResponse) error
|
||||
|
||||
|
|
@ -228,3 +191,43 @@ type RoomserverInternalAPI interface {
|
|||
response *RemoveRoomAliasResponse,
|
||||
) error
|
||||
}
|
||||
|
||||
// API functions required by the syncapi
|
||||
type SyncRoomserverAPI interface {
|
||||
// Query the latest events and state for a room from the room server.
|
||||
QueryLatestEventsAndState(
|
||||
ctx context.Context,
|
||||
request *QueryLatestEventsAndStateRequest,
|
||||
response *QueryLatestEventsAndStateResponse,
|
||||
) error
|
||||
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
||||
QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error
|
||||
// QuerySharedUsers returns a list of users who share at least 1 room in common with the given user.
|
||||
QuerySharedUsers(ctx context.Context, req *QuerySharedUsersRequest, res *QuerySharedUsersResponse) error
|
||||
// Query a list of events by event ID.
|
||||
QueryEventsByID(
|
||||
ctx context.Context,
|
||||
request *QueryEventsByIDRequest,
|
||||
response *QueryEventsByIDResponse,
|
||||
) error
|
||||
// Query the membership event for an user for a room.
|
||||
QueryMembershipForUser(
|
||||
ctx context.Context,
|
||||
request *QueryMembershipForUserRequest,
|
||||
response *QueryMembershipForUserResponse,
|
||||
) error
|
||||
|
||||
// Query the state after a list of events in a room from the room server.
|
||||
QueryStateAfterEvents(
|
||||
ctx context.Context,
|
||||
request *QueryStateAfterEventsRequest,
|
||||
response *QueryStateAfterEventsResponse,
|
||||
) error
|
||||
|
||||
// Query a given amount (or less) of events prior to a given set of events.
|
||||
PerformBackfill(
|
||||
ctx context.Context,
|
||||
request *PerformBackfillRequest,
|
||||
response *PerformBackfillResponse,
|
||||
) error
|
||||
}
|
||||
|
|
|
|||
|
|
@ -69,6 +69,6 @@ func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite) {
|
|||
base, m.UserAPI, m.Client,
|
||||
)
|
||||
syncapi.AddPublicRoutes(
|
||||
base, m.UserAPI, m.UserAPI, m.RoomserverAPI, m.KeyAPI,
|
||||
base, m.UserAPI, m.RoomserverAPI, m.KeyAPI,
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ type OutputKeyChangeEventConsumer struct {
|
|||
notifier *notifier.Notifier
|
||||
stream types.StreamProvider
|
||||
serverName gomatrixserverlib.ServerName // our server name
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||
rsAPI roomserverAPI.SyncRoomserverAPI
|
||||
}
|
||||
|
||||
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
|
||||
|
|
@ -52,7 +52,7 @@ func NewOutputKeyChangeEventConsumer(
|
|||
cfg *config.SyncAPI,
|
||||
topic string,
|
||||
js nats.JetStreamContext,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
rsAPI roomserverAPI.SyncRoomserverAPI,
|
||||
store storage.Database,
|
||||
notifier *notifier.Notifier,
|
||||
stream types.StreamProvider,
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ type PresenceConsumer struct {
|
|||
db storage.Database
|
||||
stream types.StreamProvider
|
||||
notifier *notifier.Notifier
|
||||
deviceAPI api.UserDeviceAPI
|
||||
deviceAPI api.SyncUserAPI
|
||||
cfg *config.SyncAPI
|
||||
}
|
||||
|
||||
|
|
@ -55,7 +55,7 @@ func NewPresenceConsumer(
|
|||
db storage.Database,
|
||||
notifier *notifier.Notifier,
|
||||
stream types.StreamProvider,
|
||||
deviceAPI api.UserDeviceAPI,
|
||||
deviceAPI api.SyncUserAPI,
|
||||
) *PresenceConsumer {
|
||||
return &PresenceConsumer{
|
||||
ctx: process.Context(),
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ import (
|
|||
type OutputRoomEventConsumer struct {
|
||||
ctx context.Context
|
||||
cfg *config.SyncAPI
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
rsAPI api.SyncRoomserverAPI
|
||||
jetstream nats.JetStreamContext
|
||||
durable string
|
||||
topic string
|
||||
|
|
@ -58,7 +58,7 @@ func NewOutputRoomEventConsumer(
|
|||
notifier *notifier.Notifier,
|
||||
pduStream types.StreamProvider,
|
||||
inviteStream types.StreamProvider,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
rsAPI api.SyncRoomserverAPI,
|
||||
producer *producers.UserAPIStreamEventProducer,
|
||||
) *OutputRoomEventConsumer {
|
||||
return &OutputRoomEventConsumer{
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.SyncKeyAPI, userID, devi
|
|||
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
|
||||
// be already filled in with join/leave information.
|
||||
func DeviceListCatchup(
|
||||
ctx context.Context, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
ctx context.Context, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI,
|
||||
userID string, res *types.Response, from, to types.StreamPosition,
|
||||
) (newPos types.StreamPosition, hasNew bool, err error) {
|
||||
|
||||
|
|
@ -130,7 +130,7 @@ func DeviceListCatchup(
|
|||
|
||||
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
|
||||
func TrackChangedUsers(
|
||||
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
|
||||
ctx context.Context, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
|
||||
) (changed, left []string, err error) {
|
||||
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
|
||||
|
||||
|
|
@ -216,7 +216,7 @@ func TrackChangedUsers(
|
|||
}
|
||||
|
||||
func filterSharedUsers(
|
||||
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, usersWithChangedKeys []string,
|
||||
ctx context.Context, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, usersWithChangedKeys []string,
|
||||
) (map[string]int, []string) {
|
||||
var result []string
|
||||
var sharedUsersRes roomserverAPI.QuerySharedUsersResponse
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ type ContextRespsonse struct {
|
|||
|
||||
func Context(
|
||||
req *http.Request, device *userapi.Device,
|
||||
rsAPI roomserver.RoomserverInternalAPI,
|
||||
rsAPI roomserver.SyncRoomserverAPI,
|
||||
syncDB storage.Database,
|
||||
roomID, eventID string,
|
||||
lazyLoadCache *caching.LazyLoadCache,
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ import (
|
|||
type messagesReq struct {
|
||||
ctx context.Context
|
||||
db storage.Database
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
rsAPI api.SyncRoomserverAPI
|
||||
cfg *config.SyncAPI
|
||||
roomID string
|
||||
from *types.TopologyToken
|
||||
|
|
@ -60,7 +60,7 @@ type messagesResp struct {
|
|||
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
|
||||
func OnIncomingMessagesRequest(
|
||||
req *http.Request, db storage.Database, roomID string, device *userapi.Device,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
rsAPI api.SyncRoomserverAPI,
|
||||
cfg *config.SyncAPI,
|
||||
srp *sync.RequestPool,
|
||||
lazyLoadCache *caching.LazyLoadCache,
|
||||
|
|
@ -244,7 +244,7 @@ func OnIncomingMessagesRequest(
|
|||
}
|
||||
}
|
||||
|
||||
func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.RoomserverInternalAPI) (bool, error) {
|
||||
func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (bool, error) {
|
||||
req := api.QueryMembershipForUserRequest{
|
||||
RoomID: roomID,
|
||||
UserID: userID,
|
||||
|
|
|
|||
|
|
@ -36,8 +36,8 @@ import (
|
|||
// nolint: gocyclo
|
||||
func Setup(
|
||||
csMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database,
|
||||
userAPI userapi.QueryAccountAPI,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
userAPI userapi.SyncUserAPI,
|
||||
rsAPI api.SyncRoomserverAPI,
|
||||
cfg *config.SyncAPI,
|
||||
lazyLoadCache *caching.LazyLoadCache,
|
||||
) {
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import (
|
|||
|
||||
type AccountDataStreamProvider struct {
|
||||
StreamProvider
|
||||
userAPI userapi.QueryAccountAPI
|
||||
userAPI userapi.SyncUserAPI
|
||||
}
|
||||
|
||||
func (p *AccountDataStreamProvider) Setup() {
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
type DeviceListStreamProvider struct {
|
||||
StreamProvider
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
rsAPI api.SyncRoomserverAPI
|
||||
keyAPI keyapi.SyncKeyAPI
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ type PDUStreamProvider struct {
|
|||
workers atomic.Int32
|
||||
// userID+deviceID -> lazy loading cache
|
||||
lazyLoadCache *caching.LazyLoadCache
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||
rsAPI roomserverAPI.SyncRoomserverAPI
|
||||
}
|
||||
|
||||
func (p *PDUStreamProvider) worker() {
|
||||
|
|
|
|||
|
|
@ -25,8 +25,8 @@ type Streams struct {
|
|||
}
|
||||
|
||||
func NewSyncStreamProviders(
|
||||
d storage.Database, userAPI userapi.QueryAccountAPI,
|
||||
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.SyncKeyAPI,
|
||||
d storage.Database, userAPI userapi.SyncUserAPI,
|
||||
rsAPI rsapi.SyncRoomserverAPI, keyAPI keyapi.SyncKeyAPI,
|
||||
eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier,
|
||||
) *Streams {
|
||||
streams := &Streams{
|
||||
|
|
|
|||
|
|
@ -45,9 +45,9 @@ import (
|
|||
type RequestPool struct {
|
||||
db storage.Database
|
||||
cfg *config.SyncAPI
|
||||
userAPI userapi.UserDeviceAPI
|
||||
userAPI userapi.SyncUserAPI
|
||||
keyAPI keyapi.SyncKeyAPI
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||
rsAPI roomserverAPI.SyncRoomserverAPI
|
||||
lastseen *sync.Map
|
||||
presence *sync.Map
|
||||
streams *streams.Streams
|
||||
|
|
@ -62,8 +62,8 @@ type PresencePublisher interface {
|
|||
// NewRequestPool makes a new RequestPool
|
||||
func NewRequestPool(
|
||||
db storage.Database, cfg *config.SyncAPI,
|
||||
userAPI userapi.UserDeviceAPI, keyAPI keyapi.SyncKeyAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
userAPI userapi.SyncUserAPI, keyAPI keyapi.SyncKeyAPI,
|
||||
rsAPI roomserverAPI.SyncRoomserverAPI,
|
||||
streams *streams.Streams, notifier *notifier.Notifier,
|
||||
producer PresencePublisher,
|
||||
) *RequestPool {
|
||||
|
|
|
|||
|
|
@ -39,9 +39,8 @@ import (
|
|||
// component.
|
||||
func AddPublicRoutes(
|
||||
base *base.BaseDendrite,
|
||||
userDeviceAPI userapi.UserDeviceAPI,
|
||||
userQueryAPI userapi.QueryAccountAPI,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
userAPI userapi.SyncUserAPI,
|
||||
rsAPI api.SyncRoomserverAPI,
|
||||
keyAPI keyapi.SyncKeyAPI,
|
||||
) {
|
||||
cfg := &base.Cfg.SyncAPI
|
||||
|
|
@ -59,7 +58,7 @@ func AddPublicRoutes(
|
|||
logrus.WithError(err).Panicf("failed to create lazy loading cache")
|
||||
}
|
||||
notifier := notifier.NewNotifier()
|
||||
streams := streams.NewSyncStreamProviders(syncDB, userQueryAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier)
|
||||
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier)
|
||||
notifier.SetCurrentPosition(streams.Latest(context.Background()))
|
||||
if err = notifier.Load(context.Background(), syncDB); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to load notifier ")
|
||||
|
|
@ -70,7 +69,7 @@ func AddPublicRoutes(
|
|||
JetStream: js,
|
||||
}
|
||||
|
||||
requestPool := sync.NewRequestPool(syncDB, cfg, userDeviceAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer)
|
||||
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer)
|
||||
|
||||
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
|
||||
JetStream: js,
|
||||
|
|
@ -139,14 +138,14 @@ func AddPublicRoutes(
|
|||
presenceConsumer := consumers.NewPresenceConsumer(
|
||||
base.ProcessContext, cfg, js, natsClient, syncDB,
|
||||
notifier, streams.PresenceStreamProvider,
|
||||
userDeviceAPI,
|
||||
userAPI,
|
||||
)
|
||||
if err = presenceConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start presence consumer")
|
||||
}
|
||||
|
||||
routing.Setup(
|
||||
base.PublicClientAPIMux, requestPool, syncDB, userQueryAPI,
|
||||
base.PublicClientAPIMux, requestPool, syncDB, userAPI,
|
||||
rsAPI, cfg, lazyLoadCache,
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,8 +31,7 @@ type UserInternalAPI interface {
|
|||
UserRegisterAPI
|
||||
UserAccountAPI
|
||||
UserThreePIDAPI
|
||||
UserDeviceAPI
|
||||
QueryAccountAPI
|
||||
SyncUserAPI
|
||||
|
||||
InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error
|
||||
|
||||
|
|
@ -50,12 +49,9 @@ type UserInternalAPI interface {
|
|||
QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error
|
||||
}
|
||||
|
||||
type QueryAccountAPI interface {
|
||||
type SyncUserAPI interface {
|
||||
QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error
|
||||
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
|
||||
}
|
||||
|
||||
type UserDeviceAPI interface {
|
||||
PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error
|
||||
PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error
|
||||
PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error
|
||||
|
|
|
|||
Loading…
Reference in a new issue