Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/roomservertests

This commit is contained in:
Till Faelligen 2022-05-05 12:11:27 +02:00
commit da49cd282c
23 changed files with 228 additions and 216 deletions

View file

@ -51,7 +51,7 @@ type AccountDatabase interface {
// Note: For an AS user, AS dummy device is returned. // Note: For an AS user, AS dummy device is returned.
// On failure returns an JSON error response which can be sent to the client. // On failure returns an JSON error response which can be sent to the client.
func VerifyUserFromRequest( func VerifyUserFromRequest(
req *http.Request, userAPI api.UserInternalAPI, req *http.Request, userAPI api.QueryAcccessTokenAPI,
) (*api.Device, *util.JSONResponse) { ) (*api.Device, *util.JSONResponse) {
// Try to find the Application Service user // Try to find the Application Service user
token, err := ExtractAccessToken(req) token, err := ExtractAccessToken(req)

View file

@ -21,6 +21,7 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/tokens" "github.com/matrix-org/gomatrixserverlib/tokens"
@ -95,29 +96,16 @@ func SendServerNotice(
// get rooms for specified user // get rooms for specified user
allUserRooms := []string{} allUserRooms := []string{}
userRooms := api.QueryRoomsForUserResponse{} userRooms := api.QueryRoomsForUserResponse{}
// Get rooms the user is either joined, invited or has left.
for _, membership := range []string{"join", "invite", "leave"} {
if err := rsAPI.QueryRoomsForUser(ctx, &api.QueryRoomsForUserRequest{ if err := rsAPI.QueryRoomsForUser(ctx, &api.QueryRoomsForUserRequest{
UserID: r.UserID, UserID: r.UserID,
WantMembership: "join", WantMembership: membership,
}, &userRooms); err != nil { }, &userRooms); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
allUserRooms = append(allUserRooms, userRooms.RoomIDs...) allUserRooms = append(allUserRooms, userRooms.RoomIDs...)
// get invites for specified user
if err := rsAPI.QueryRoomsForUser(ctx, &api.QueryRoomsForUserRequest{
UserID: r.UserID,
WantMembership: "invite",
}, &userRooms); err != nil {
return util.ErrorResponse(err)
} }
allUserRooms = append(allUserRooms, userRooms.RoomIDs...)
// get left rooms for specified user
if err := rsAPI.QueryRoomsForUser(ctx, &api.QueryRoomsForUserRequest{
UserID: r.UserID,
WantMembership: "leave",
}, &userRooms); err != nil {
return util.ErrorResponse(err)
}
allUserRooms = append(allUserRooms, userRooms.RoomIDs...)
// get rooms of the sender // get rooms of the sender
senderUserID := fmt.Sprintf("@%s:%s", cfgNotices.LocalPart, cfgClient.Matrix.ServerName) senderUserID := fmt.Sprintf("@%s:%s", cfgNotices.LocalPart, cfgClient.Matrix.ServerName)
@ -145,7 +133,7 @@ func SendServerNotice(
var ( var (
roomID string roomID string
roomVersion = gomatrixserverlib.RoomVersionV6 roomVersion = version.DefaultRoomVersion()
) )
// create a new room for the user // create a new room for the user
@ -194,16 +182,23 @@ func SendServerNotice(
// if we didn't get a createRoomResponse, we probably received an error, so return that. // if we didn't get a createRoomResponse, we probably received an error, so return that.
return roomRes return roomRes
} }
} else { } else {
// we've found a room in common, check the membership // we've found a room in common, check the membership
roomID = commonRooms[0] roomID = commonRooms[0]
membershipRes := api.QueryMembershipForUserResponse{}
err := rsAPI.QueryMembershipForUser(ctx, &api.QueryMembershipForUserRequest{UserID: r.UserID, RoomID: roomID}, &membershipRes)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("unable to query membership for user")
return jsonerror.InternalServerError()
}
if !membershipRes.IsInRoom {
// re-invite the user // re-invite the user
res, err := sendInvite(ctx, userAPI, senderDevice, roomID, r.UserID, "Server notice room", cfgClient, rsAPI, asAPI, time.Now()) res, err := sendInvite(ctx, userAPI, senderDevice, roomID, r.UserID, "Server notice room", cfgClient, rsAPI, asAPI, time.Now())
if err != nil { if err != nil {
return res return res
} }
} }
}
startedGeneratingEvent := time.Now() startedGeneratingEvent := time.Now()

View file

@ -22,7 +22,6 @@ import (
func SyncAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { func SyncAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
userAPI := base.UserAPIClient() userAPI := base.UserAPIClient()
federation := base.CreateFederationClient()
rsAPI := base.RoomserverHTTPClient() rsAPI := base.RoomserverHTTPClient()
@ -30,7 +29,6 @@ func SyncAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
base, base,
userAPI, rsAPI, userAPI, rsAPI,
base.KeyServerHTTPClient(), base.KeyServerHTTPClient(),
federation,
) )
base.SetupAndServeHTTP( base.SetupAndServeHTTP(

View file

@ -49,7 +49,7 @@ type BasicAuth struct {
// MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which authenticates the request. // MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which authenticates the request.
func MakeAuthAPI( func MakeAuthAPI(
metricsName string, userAPI userapi.UserInternalAPI, metricsName string, userAPI userapi.QueryAcccessTokenAPI,
f func(*http.Request, *userapi.Device) util.JSONResponse, f func(*http.Request, *userapi.Device) util.JSONResponse,
) http.Handler { ) http.Handler {
h := func(req *http.Request) util.JSONResponse { h := func(req *http.Request) util.JSONResponse {

View file

@ -27,6 +27,7 @@ import (
) )
type KeyInternalAPI interface { type KeyInternalAPI interface {
SyncKeyAPI
// SetUserAPI assigns a user API to query when extracting device names. // SetUserAPI assigns a user API to query when extracting device names.
SetUserAPI(i userapi.UserInternalAPI) SetUserAPI(i userapi.UserInternalAPI)
// InputDeviceListUpdate from a federated server EDU // InputDeviceListUpdate from a federated server EDU
@ -38,12 +39,16 @@ type KeyInternalAPI interface {
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse) PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse) PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse)
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse) QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse)
QueryOneTimeKeys(ctx context.Context, req *QueryOneTimeKeysRequest, res *QueryOneTimeKeysResponse)
QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse) QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse)
QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse) QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse)
} }
// API functions required by the syncapi
type SyncKeyAPI interface {
QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse)
QueryOneTimeKeys(ctx context.Context, req *QueryOneTimeKeysRequest, res *QueryOneTimeKeysResponse)
}
// KeyError is returned if there was a problem performing/querying the server // KeyError is returned if there was a problem performing/querying the server
type KeyError struct { type KeyError struct {
Err string `json:"error"` Err string `json:"error"`

View file

@ -12,6 +12,8 @@ import (
// RoomserverInputAPI is used to write events to the room server. // RoomserverInputAPI is used to write events to the room server.
type RoomserverInternalAPI interface { type RoomserverInternalAPI interface {
SyncRoomserverAPI
// needed to avoid chicken and egg scenario when setting up the // needed to avoid chicken and egg scenario when setting up the
// interdependencies between the roomserver and other input APIs // interdependencies between the roomserver and other input APIs
SetFederationAPI(fsAPI fsAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing)
@ -78,34 +80,6 @@ type RoomserverInternalAPI interface {
res *QueryPublishedRoomsResponse, res *QueryPublishedRoomsResponse,
) error ) error
// Query the latest events and state for a room from the room server.
QueryLatestEventsAndState(
ctx context.Context,
request *QueryLatestEventsAndStateRequest,
response *QueryLatestEventsAndStateResponse,
) error
// Query the state after a list of events in a room from the room server.
QueryStateAfterEvents(
ctx context.Context,
request *QueryStateAfterEventsRequest,
response *QueryStateAfterEventsResponse,
) error
// Query a list of events by event ID.
QueryEventsByID(
ctx context.Context,
request *QueryEventsByIDRequest,
response *QueryEventsByIDResponse,
) error
// Query the membership event for an user for a room.
QueryMembershipForUser(
ctx context.Context,
request *QueryMembershipForUserRequest,
response *QueryMembershipForUserResponse,
) error
// Query a list of membership events for a room // Query a list of membership events for a room
QueryMembershipsForRoom( QueryMembershipsForRoom(
ctx context.Context, ctx context.Context,
@ -157,22 +131,11 @@ type RoomserverInternalAPI interface {
QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error
// QueryRoomsForUser retrieves a list of room IDs matching the given query. // QueryRoomsForUser retrieves a list of room IDs matching the given query.
QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error
// QuerySharedUsers returns a list of users who share at least 1 room in common with the given user.
QuerySharedUsers(ctx context.Context, req *QuerySharedUsersRequest, res *QuerySharedUsersResponse) error
// QueryKnownUsers returns a list of users that we know about from our joined rooms. // QueryKnownUsers returns a list of users that we know about from our joined rooms.
QueryKnownUsers(ctx context.Context, req *QueryKnownUsersRequest, res *QueryKnownUsersResponse) error QueryKnownUsers(ctx context.Context, req *QueryKnownUsersRequest, res *QueryKnownUsersResponse) error
// QueryServerBannedFromRoom returns whether a server is banned from a room by server ACLs. // QueryServerBannedFromRoom returns whether a server is banned from a room by server ACLs.
QueryServerBannedFromRoom(ctx context.Context, req *QueryServerBannedFromRoomRequest, res *QueryServerBannedFromRoomResponse) error QueryServerBannedFromRoom(ctx context.Context, req *QueryServerBannedFromRoomRequest, res *QueryServerBannedFromRoomResponse) error
// Query a given amount (or less) of events prior to a given set of events.
PerformBackfill(
ctx context.Context,
request *PerformBackfillRequest,
response *PerformBackfillResponse,
) error
// PerformForget forgets a rooms history for a specific user // PerformForget forgets a rooms history for a specific user
PerformForget(ctx context.Context, req *PerformForgetRequest, resp *PerformForgetResponse) error PerformForget(ctx context.Context, req *PerformForgetRequest, resp *PerformForgetResponse) error
@ -228,3 +191,43 @@ type RoomserverInternalAPI interface {
response *RemoveRoomAliasResponse, response *RemoveRoomAliasResponse,
) error ) error
} }
// API functions required by the syncapi
type SyncRoomserverAPI interface {
// Query the latest events and state for a room from the room server.
QueryLatestEventsAndState(
ctx context.Context,
request *QueryLatestEventsAndStateRequest,
response *QueryLatestEventsAndStateResponse,
) error
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error
// QuerySharedUsers returns a list of users who share at least 1 room in common with the given user.
QuerySharedUsers(ctx context.Context, req *QuerySharedUsersRequest, res *QuerySharedUsersResponse) error
// Query a list of events by event ID.
QueryEventsByID(
ctx context.Context,
request *QueryEventsByIDRequest,
response *QueryEventsByIDResponse,
) error
// Query the membership event for an user for a room.
QueryMembershipForUser(
ctx context.Context,
request *QueryMembershipForUserRequest,
response *QueryMembershipForUserResponse,
) error
// Query the state after a list of events in a room from the room server.
QueryStateAfterEvents(
ctx context.Context,
request *QueryStateAfterEventsRequest,
response *QueryStateAfterEventsResponse,
) error
// Query a given amount (or less) of events prior to a given set of events.
PerformBackfill(
ctx context.Context,
request *PerformBackfillRequest,
response *PerformBackfillResponse,
) error
}

View file

@ -69,6 +69,6 @@ func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite) {
base, m.UserAPI, m.Client, base, m.UserAPI, m.Client,
) )
syncapi.AddPublicRoutes( syncapi.AddPublicRoutes(
base, m.UserAPI, m.RoomserverAPI, m.KeyAPI, m.FedClient, base, m.UserAPI, m.RoomserverAPI, m.KeyAPI,
) )
} }

View file

@ -42,8 +42,7 @@ type OutputKeyChangeEventConsumer struct {
notifier *notifier.Notifier notifier *notifier.Notifier
stream types.StreamProvider stream types.StreamProvider
serverName gomatrixserverlib.ServerName // our server name serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.RoomserverInternalAPI rsAPI roomserverAPI.SyncRoomserverAPI
keyAPI api.KeyInternalAPI
} }
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@ -53,8 +52,7 @@ func NewOutputKeyChangeEventConsumer(
cfg *config.SyncAPI, cfg *config.SyncAPI,
topic string, topic string,
js nats.JetStreamContext, js nats.JetStreamContext,
keyAPI api.KeyInternalAPI, rsAPI roomserverAPI.SyncRoomserverAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.StreamProvider, stream types.StreamProvider,
@ -66,7 +64,6 @@ func NewOutputKeyChangeEventConsumer(
topic: topic, topic: topic,
db: store, db: store,
serverName: cfg.Matrix.ServerName, serverName: cfg.Matrix.ServerName,
keyAPI: keyAPI,
rsAPI: rsAPI, rsAPI: rsAPI,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,

View file

@ -41,7 +41,7 @@ type PresenceConsumer struct {
db storage.Database db storage.Database
stream types.StreamProvider stream types.StreamProvider
notifier *notifier.Notifier notifier *notifier.Notifier
deviceAPI api.UserDeviceAPI deviceAPI api.SyncUserAPI
cfg *config.SyncAPI cfg *config.SyncAPI
} }
@ -55,7 +55,7 @@ func NewPresenceConsumer(
db storage.Database, db storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.StreamProvider, stream types.StreamProvider,
deviceAPI api.UserDeviceAPI, deviceAPI api.SyncUserAPI,
) *PresenceConsumer { ) *PresenceConsumer {
return &PresenceConsumer{ return &PresenceConsumer{
ctx: process.Context(), ctx: process.Context(),

View file

@ -38,7 +38,7 @@ import (
type OutputRoomEventConsumer struct { type OutputRoomEventConsumer struct {
ctx context.Context ctx context.Context
cfg *config.SyncAPI cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI rsAPI api.SyncRoomserverAPI
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable string durable string
topic string topic string
@ -58,7 +58,7 @@ func NewOutputRoomEventConsumer(
notifier *notifier.Notifier, notifier *notifier.Notifier,
pduStream types.StreamProvider, pduStream types.StreamProvider,
inviteStream types.StreamProvider, inviteStream types.StreamProvider,
rsAPI api.RoomserverInternalAPI, rsAPI api.SyncRoomserverAPI,
producer *producers.UserAPIStreamEventProducer, producer *producers.UserAPIStreamEventProducer,
) *OutputRoomEventConsumer { ) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{ return &OutputRoomEventConsumer{

View file

@ -29,7 +29,7 @@ import (
const DeviceListLogName = "dl" const DeviceListLogName = "dl"
// DeviceOTKCounts adds one-time key counts to the /sync response // DeviceOTKCounts adds one-time key counts to the /sync response
func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, deviceID string, res *types.Response) error { func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.SyncKeyAPI, userID, deviceID string, res *types.Response) error {
var queryRes keyapi.QueryOneTimeKeysResponse var queryRes keyapi.QueryOneTimeKeysResponse
keyAPI.QueryOneTimeKeys(ctx, &keyapi.QueryOneTimeKeysRequest{ keyAPI.QueryOneTimeKeys(ctx, &keyapi.QueryOneTimeKeysRequest{
UserID: userID, UserID: userID,
@ -46,7 +46,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST // was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
// be already filled in with join/leave information. // be already filled in with join/leave information.
func DeviceListCatchup( func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ctx context.Context, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI,
userID string, res *types.Response, from, to types.StreamPosition, userID string, res *types.Response, from, to types.StreamPosition,
) (newPos types.StreamPosition, hasNew bool, err error) { ) (newPos types.StreamPosition, hasNew bool, err error) {
@ -130,7 +130,7 @@ func DeviceListCatchup(
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. // TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
func TrackChangedUsers( func TrackChangedUsers(
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, ctx context.Context, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
) (changed, left []string, err error) { ) (changed, left []string, err error) {
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
@ -216,7 +216,7 @@ func TrackChangedUsers(
} }
func filterSharedUsers( func filterSharedUsers(
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, usersWithChangedKeys []string, ctx context.Context, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, usersWithChangedKeys []string,
) (map[string]int, []string) { ) (map[string]int, []string) {
var result []string var result []string
var sharedUsersRes roomserverAPI.QuerySharedUsersResponse var sharedUsersRes roomserverAPI.QuerySharedUsersResponse

View file

@ -42,7 +42,7 @@ type ContextRespsonse struct {
func Context( func Context(
req *http.Request, device *userapi.Device, req *http.Request, device *userapi.Device,
rsAPI roomserver.RoomserverInternalAPI, rsAPI roomserver.SyncRoomserverAPI,
syncDB storage.Database, syncDB storage.Database,
roomID, eventID string, roomID, eventID string,
lazyLoadCache *caching.LazyLoadCache, lazyLoadCache *caching.LazyLoadCache,

View file

@ -36,8 +36,7 @@ import (
type messagesReq struct { type messagesReq struct {
ctx context.Context ctx context.Context
db storage.Database db storage.Database
rsAPI api.RoomserverInternalAPI rsAPI api.SyncRoomserverAPI
federation *gomatrixserverlib.FederationClient
cfg *config.SyncAPI cfg *config.SyncAPI
roomID string roomID string
from *types.TopologyToken from *types.TopologyToken
@ -61,8 +60,7 @@ type messagesResp struct {
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages // See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
func OnIncomingMessagesRequest( func OnIncomingMessagesRequest(
req *http.Request, db storage.Database, roomID string, device *userapi.Device, req *http.Request, db storage.Database, roomID string, device *userapi.Device,
federation *gomatrixserverlib.FederationClient, rsAPI api.SyncRoomserverAPI,
rsAPI api.RoomserverInternalAPI,
cfg *config.SyncAPI, cfg *config.SyncAPI,
srp *sync.RequestPool, srp *sync.RequestPool,
lazyLoadCache *caching.LazyLoadCache, lazyLoadCache *caching.LazyLoadCache,
@ -180,7 +178,6 @@ func OnIncomingMessagesRequest(
ctx: req.Context(), ctx: req.Context(),
db: db, db: db,
rsAPI: rsAPI, rsAPI: rsAPI,
federation: federation,
cfg: cfg, cfg: cfg,
roomID: roomID, roomID: roomID,
from: &from, from: &from,
@ -247,7 +244,7 @@ func OnIncomingMessagesRequest(
} }
} }
func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.RoomserverInternalAPI) (bool, error) { func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (bool, error) {
req := api.QueryMembershipForUserRequest{ req := api.QueryMembershipForUserRequest{
RoomID: roomID, RoomID: roomID,
UserID: userID, UserID: userID,

View file

@ -36,8 +36,8 @@ import (
// nolint: gocyclo // nolint: gocyclo
func Setup( func Setup(
csMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database, csMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database,
userAPI userapi.UserInternalAPI, federation *gomatrixserverlib.FederationClient, userAPI userapi.SyncUserAPI,
rsAPI api.RoomserverInternalAPI, rsAPI api.SyncRoomserverAPI,
cfg *config.SyncAPI, cfg *config.SyncAPI,
lazyLoadCache *caching.LazyLoadCache, lazyLoadCache *caching.LazyLoadCache,
) { ) {
@ -53,7 +53,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, federation, rsAPI, cfg, srp, lazyLoadCache) return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, rsAPI, cfg, srp, lazyLoadCache)
})).Methods(http.MethodGet, http.MethodOptions) })).Methods(http.MethodGet, http.MethodOptions)
v3mux.Handle("/user/{userId}/filter", v3mux.Handle("/user/{userId}/filter",

View file

@ -10,7 +10,7 @@ import (
type AccountDataStreamProvider struct { type AccountDataStreamProvider struct {
StreamProvider StreamProvider
userAPI userapi.UserInternalAPI userAPI userapi.SyncUserAPI
} }
func (p *AccountDataStreamProvider) Setup() { func (p *AccountDataStreamProvider) Setup() {

View file

@ -11,8 +11,8 @@ import (
type DeviceListStreamProvider struct { type DeviceListStreamProvider struct {
StreamProvider StreamProvider
rsAPI api.RoomserverInternalAPI rsAPI api.SyncRoomserverAPI
keyAPI keyapi.KeyInternalAPI keyAPI keyapi.SyncKeyAPI
} }
func (p *DeviceListStreamProvider) CompleteSync( func (p *DeviceListStreamProvider) CompleteSync(

View file

@ -33,7 +33,7 @@ type PDUStreamProvider struct {
workers atomic.Int32 workers atomic.Int32
// userID+deviceID -> lazy loading cache // userID+deviceID -> lazy loading cache
lazyLoadCache *caching.LazyLoadCache lazyLoadCache *caching.LazyLoadCache
rsAPI roomserverAPI.RoomserverInternalAPI rsAPI roomserverAPI.SyncRoomserverAPI
} }
func (p *PDUStreamProvider) worker() { func (p *PDUStreamProvider) worker() {

View file

@ -25,8 +25,8 @@ type Streams struct {
} }
func NewSyncStreamProviders( func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI, d storage.Database, userAPI userapi.SyncUserAPI,
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, rsAPI rsapi.SyncRoomserverAPI, keyAPI keyapi.SyncKeyAPI,
eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier, eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier,
) *Streams { ) *Streams {
streams := &Streams{ streams := &Streams{

View file

@ -45,9 +45,9 @@ import (
type RequestPool struct { type RequestPool struct {
db storage.Database db storage.Database
cfg *config.SyncAPI cfg *config.SyncAPI
userAPI userapi.UserInternalAPI userAPI userapi.SyncUserAPI
keyAPI keyapi.KeyInternalAPI keyAPI keyapi.SyncKeyAPI
rsAPI roomserverAPI.RoomserverInternalAPI rsAPI roomserverAPI.SyncRoomserverAPI
lastseen *sync.Map lastseen *sync.Map
presence *sync.Map presence *sync.Map
streams *streams.Streams streams *streams.Streams
@ -62,8 +62,8 @@ type PresencePublisher interface {
// NewRequestPool makes a new RequestPool // NewRequestPool makes a new RequestPool
func NewRequestPool( func NewRequestPool(
db storage.Database, cfg *config.SyncAPI, db storage.Database, cfg *config.SyncAPI,
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, userAPI userapi.SyncUserAPI, keyAPI keyapi.SyncKeyAPI,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.SyncRoomserverAPI,
streams *streams.Streams, notifier *notifier.Notifier, streams *streams.Streams, notifier *notifier.Notifier,
producer PresencePublisher, producer PresencePublisher,
) *RequestPool { ) *RequestPool {

View file

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/consumers"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
@ -40,10 +39,9 @@ import (
// component. // component.
func AddPublicRoutes( func AddPublicRoutes(
base *base.BaseDendrite, base *base.BaseDendrite,
userAPI userapi.UserInternalAPI, userAPI userapi.SyncUserAPI,
rsAPI api.RoomserverInternalAPI, rsAPI api.SyncRoomserverAPI,
keyAPI keyapi.KeyInternalAPI, keyAPI keyapi.SyncKeyAPI,
federation *gomatrixserverlib.FederationClient,
) { ) {
cfg := &base.Cfg.SyncAPI cfg := &base.Cfg.SyncAPI
@ -85,7 +83,7 @@ func AddPublicRoutes(
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier, js, rsAPI, syncDB, notifier,
streams.DeviceListStreamProvider, streams.DeviceListStreamProvider,
) )
if err = keyChangeConsumer.Start(); err != nil { if err = keyChangeConsumer.Start(); err != nil {
@ -148,6 +146,6 @@ func AddPublicRoutes(
routing.Setup( routing.Setup(
base.PublicClientAPIMux, requestPool, syncDB, userAPI, base.PublicClientAPIMux, requestPool, syncDB, userAPI,
federation, rsAPI, cfg, lazyLoadCache, rsAPI, cfg, lazyLoadCache,
) )
} }

View file

@ -53,8 +53,8 @@ func createLocalDB(t *testing.T, dbName string) {
createDB.Stderr = os.Stderr createDB.Stderr = os.Stderr
} }
err := createDB.Run() err := createDB.Run()
if err != nil { if err != nil && !Quiet {
fatalError(t, "createLocalDB returned error: %s", err) fmt.Println("createLocalDB returned error:", err)
} }
} }

View file

@ -31,7 +31,8 @@ type UserInternalAPI interface {
UserRegisterAPI UserRegisterAPI
UserAccountAPI UserAccountAPI
UserThreePIDAPI UserThreePIDAPI
UserDeviceAPI QueryAcccessTokenAPI
SyncUserAPI
InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error
@ -42,15 +43,20 @@ type UserInternalAPI interface {
PerformPushRulesPut(ctx context.Context, req *PerformPushRulesPutRequest, res *struct{}) error PerformPushRulesPut(ctx context.Context, req *PerformPushRulesPutRequest, res *struct{}) error
QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse) QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse)
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error
QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error
QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error
QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error
QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error
} }
type UserDeviceAPI interface { type QueryAcccessTokenAPI interface {
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
}
type SyncUserAPI interface {
QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error
PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error
PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package userapi package userapi_test
import ( import (
"context" "context"
@ -23,15 +23,17 @@ import (
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/httputil"
internalTest "github.com/matrix-org/dendrite/internal/test"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/dendrite/userapi/inthttp"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"golang.org/x/crypto/bcrypt" "golang.org/x/crypto/bcrypt"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/test"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/internal" "github.com/matrix-org/dendrite/userapi/internal"
"github.com/matrix-org/dendrite/userapi/inthttp"
"github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/storage"
) )
@ -43,16 +45,15 @@ type apiTestOpts struct {
loginTokenLifetime time.Duration loginTokenLifetime time.Duration
} }
func MustMakeInternalAPI(t *testing.T, opts apiTestOpts) (api.UserInternalAPI, storage.Database) { func MustMakeInternalAPI(t *testing.T, opts apiTestOpts, dbType test.DBType) (api.UserInternalAPI, storage.Database, func()) {
if opts.loginTokenLifetime == 0 { if opts.loginTokenLifetime == 0 {
opts.loginTokenLifetime = api.DefaultLoginTokenLifetime * time.Millisecond opts.loginTokenLifetime = api.DefaultLoginTokenLifetime * time.Millisecond
} }
dbopts := &config.DatabaseOptions{ connStr, close := test.PrepareDBConnectionString(t, dbType)
ConnectionString: "file::memory:",
MaxOpenConnections: 1, accountDB, err := storage.NewUserAPIDatabase(nil, &config.DatabaseOptions{
MaxIdleConnections: 1, ConnectionString: config.DataSource(connStr),
} }, serverName, bcrypt.MinCost, config.DefaultOpenIDTokenLifetimeMS, opts.loginTokenLifetime, "")
accountDB, err := storage.NewUserAPIDatabase(nil, dbopts, serverName, bcrypt.MinCost, config.DefaultOpenIDTokenLifetimeMS, opts.loginTokenLifetime, "")
if err != nil { if err != nil {
t.Fatalf("failed to create account DB: %s", err) t.Fatalf("failed to create account DB: %s", err)
} }
@ -66,13 +67,15 @@ func MustMakeInternalAPI(t *testing.T, opts apiTestOpts) (api.UserInternalAPI, s
return &internal.UserInternalAPI{ return &internal.UserInternalAPI{
DB: accountDB, DB: accountDB,
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
}, accountDB }, accountDB, close
} }
func TestQueryProfile(t *testing.T) { func TestQueryProfile(t *testing.T) {
aliceAvatarURL := "mxc://example.com/alice" aliceAvatarURL := "mxc://example.com/alice"
aliceDisplayName := "Alice" aliceDisplayName := "Alice"
userAPI, accountDB := MustMakeInternalAPI(t, apiTestOpts{}) // only one DBType, since userapi.AddInternalRoutes complains about multiple prometheus counters added
userAPI, accountDB, close := MustMakeInternalAPI(t, apiTestOpts{}, test.DBTypeSQLite)
defer close()
_, err := accountDB.CreateAccount(context.TODO(), "alice", "foobar", "", api.AccountTypeUser) _, err := accountDB.CreateAccount(context.TODO(), "alice", "foobar", "", api.AccountTypeUser)
if err != nil { if err != nil {
t.Fatalf("failed to make account: %s", err) t.Fatalf("failed to make account: %s", err)
@ -131,8 +134,8 @@ func TestQueryProfile(t *testing.T) {
t.Run("HTTP API", func(t *testing.T) { t.Run("HTTP API", func(t *testing.T) {
router := mux.NewRouter().PathPrefix(httputil.InternalPathPrefix).Subrouter() router := mux.NewRouter().PathPrefix(httputil.InternalPathPrefix).Subrouter()
AddInternalRoutes(router, userAPI) userapi.AddInternalRoutes(router, userAPI)
apiURL, cancel := test.ListenAndServe(t, router, false) apiURL, cancel := internalTest.ListenAndServe(t, router, false)
defer cancel() defer cancel()
httpAPI, err := inthttp.NewUserAPIClient(apiURL, &http.Client{}) httpAPI, err := inthttp.NewUserAPIClient(apiURL, &http.Client{})
if err != nil { if err != nil {
@ -149,8 +152,9 @@ func TestLoginToken(t *testing.T) {
ctx := context.Background() ctx := context.Background()
t.Run("tokenLoginFlow", func(t *testing.T) { t.Run("tokenLoginFlow", func(t *testing.T) {
userAPI, accountDB := MustMakeInternalAPI(t, apiTestOpts{}) test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
userAPI, accountDB, close := MustMakeInternalAPI(t, apiTestOpts{}, dbType)
defer close()
_, err := accountDB.CreateAccount(ctx, "auser", "apassword", "", api.AccountTypeUser) _, err := accountDB.CreateAccount(ctx, "auser", "apassword", "", api.AccountTypeUser)
if err != nil { if err != nil {
t.Fatalf("failed to make account: %s", err) t.Fatalf("failed to make account: %s", err)
@ -195,9 +199,12 @@ func TestLoginToken(t *testing.T) {
t.Fatalf("PerformLoginTokenDeletion failed: %v", err) t.Fatalf("PerformLoginTokenDeletion failed: %v", err)
} }
}) })
})
t.Run("expiredTokenIsNotReturned", func(t *testing.T) { t.Run("expiredTokenIsNotReturned", func(t *testing.T) {
userAPI, _ := MustMakeInternalAPI(t, apiTestOpts{loginTokenLifetime: -1 * time.Second}) test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
userAPI, _, close := MustMakeInternalAPI(t, apiTestOpts{loginTokenLifetime: -1 * time.Second}, dbType)
defer close()
creq := api.PerformLoginTokenCreationRequest{ creq := api.PerformLoginTokenCreationRequest{
Data: api.LoginTokenData{UserID: "@auser:example.com"}, Data: api.LoginTokenData{UserID: "@auser:example.com"},
@ -217,9 +224,12 @@ func TestLoginToken(t *testing.T) {
t.Errorf("QueryLoginToken Data: got %v, want nil", qresp.Data) t.Errorf("QueryLoginToken Data: got %v, want nil", qresp.Data)
} }
}) })
})
t.Run("deleteWorks", func(t *testing.T) { t.Run("deleteWorks", func(t *testing.T) {
userAPI, _ := MustMakeInternalAPI(t, apiTestOpts{}) test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
userAPI, _, close := MustMakeInternalAPI(t, apiTestOpts{}, dbType)
defer close()
creq := api.PerformLoginTokenCreationRequest{ creq := api.PerformLoginTokenCreationRequest{
Data: api.LoginTokenData{UserID: "@auser:example.com"}, Data: api.LoginTokenData{UserID: "@auser:example.com"},
@ -245,14 +255,17 @@ func TestLoginToken(t *testing.T) {
t.Errorf("QueryLoginToken Data: got %v, want nil", qresp.Data) t.Errorf("QueryLoginToken Data: got %v, want nil", qresp.Data)
} }
}) })
})
t.Run("deleteUnknownIsNoOp", func(t *testing.T) { t.Run("deleteUnknownIsNoOp", func(t *testing.T) {
userAPI, _ := MustMakeInternalAPI(t, apiTestOpts{}) test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
userAPI, _, close := MustMakeInternalAPI(t, apiTestOpts{}, dbType)
defer close()
dreq := api.PerformLoginTokenDeletionRequest{Token: "non-existent token"} dreq := api.PerformLoginTokenDeletionRequest{Token: "non-existent token"}
var dresp api.PerformLoginTokenDeletionResponse var dresp api.PerformLoginTokenDeletionResponse
if err := userAPI.PerformLoginTokenDeletion(ctx, &dreq, &dresp); err != nil { if err := userAPI.PerformLoginTokenDeletion(ctx, &dreq, &dresp); err != nil {
t.Fatalf("PerformLoginTokenDeletion failed: %v", err) t.Fatalf("PerformLoginTokenDeletion failed: %v", err)
} }
}) })
})
} }