mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 15:03:09 -06:00
Use IsBlacklistedOrBackingOff from the fed API to check if we should
query the given server or not
This commit is contained in:
parent
be0c6995e9
commit
b82ac59cf0
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/appservice"
|
"github.com/matrix-org/dendrite/appservice"
|
||||||
|
|
@ -32,6 +33,10 @@ import (
|
||||||
"github.com/matrix-org/dendrite/test/testrig"
|
"github.com/matrix-org/dendrite/test/testrig"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
|
||||||
|
return &statistics.ServerStatistics{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestAppserviceInternalAPI(t *testing.T) {
|
func TestAppserviceInternalAPI(t *testing.T) {
|
||||||
|
|
||||||
// Set expected results
|
// Set expected results
|
||||||
|
|
@ -144,7 +149,7 @@ func TestAppserviceInternalAPI(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(ctx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(ctx, cfg.Global.DatabaseOptions)
|
||||||
rsAPI := roomserver.NewInternalAPI(ctx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(ctx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
asAPI := appservice.NewInternalAPI(ctx, cfg, &natsInstance, usrAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(ctx, cfg, &natsInstance, usrAPI, rsAPI)
|
||||||
|
|
||||||
runCases(t, asAPI)
|
runCases(t, asAPI)
|
||||||
|
|
@ -239,7 +244,7 @@ func TestAppserviceInternalAPI_UnixSocket_Simple(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(ctx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(ctx, cfg.Global.DatabaseOptions)
|
||||||
rsAPI := roomserver.NewInternalAPI(ctx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(ctx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
asAPI := appservice.NewInternalAPI(ctx, cfg, &natsInstance, usrAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(ctx, cfg, &natsInstance, usrAPI, rsAPI)
|
||||||
|
|
||||||
t.Run("UserIDExists", func(t *testing.T) {
|
t.Run("UserIDExists", func(t *testing.T) {
|
||||||
|
|
@ -378,7 +383,7 @@ func TestRoomserverConsumerOneInvite(t *testing.T) {
|
||||||
// Create required internal APIs
|
// Create required internal APIs
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics)
|
usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
// start the consumer
|
// start the consumer
|
||||||
appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI)
|
appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -191,13 +191,13 @@ func startup() {
|
||||||
serverKeyAPI := &signing.YggdrasilKeys{}
|
serverKeyAPI := &signing.YggdrasilKeys{}
|
||||||
keyRing := serverKeyAPI.KeyRing()
|
keyRing := serverKeyAPI.KeyRing()
|
||||||
|
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics)
|
fedSenderAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true)
|
||||||
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics, fedSenderAPI.IsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
asQuery := appservice.NewInternalAPI(
|
asQuery := appservice.NewInternalAPI(
|
||||||
processCtx, cfg, &natsInstance, userAPI, rsAPI,
|
processCtx, cfg, &natsInstance, userAPI, rsAPI,
|
||||||
)
|
)
|
||||||
rsAPI.SetAppserviceAPI(asQuery)
|
rsAPI.SetAppserviceAPI(asQuery)
|
||||||
fedSenderAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true)
|
|
||||||
rsAPI.SetFederationAPI(fedSenderAPI, keyRing)
|
rsAPI.SetFederationAPI(fedSenderAPI, keyRing)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
|
|
|
||||||
|
|
@ -216,7 +216,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true,
|
processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true,
|
||||||
)
|
)
|
||||||
|
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics, fsAPI.IsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
|
||||||
rsAPI.SetAppserviceAPI(asAPI)
|
rsAPI.SetAppserviceAPI(asAPI)
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ func TestAdminCreateToken(t *testing.T) {
|
||||||
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
accessTokens := map[*test.User]userDevice{
|
accessTokens := map[*test.User]userDevice{
|
||||||
aliceAdmin: {},
|
aliceAdmin: {},
|
||||||
|
|
@ -196,7 +196,7 @@ func TestAdminListRegistrationTokens(t *testing.T) {
|
||||||
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
accessTokens := map[*test.User]userDevice{
|
accessTokens := map[*test.User]userDevice{
|
||||||
aliceAdmin: {},
|
aliceAdmin: {},
|
||||||
|
|
@ -314,7 +314,7 @@ func TestAdminGetRegistrationToken(t *testing.T) {
|
||||||
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
accessTokens := map[*test.User]userDevice{
|
accessTokens := map[*test.User]userDevice{
|
||||||
aliceAdmin: {},
|
aliceAdmin: {},
|
||||||
|
|
@ -415,7 +415,7 @@ func TestAdminDeleteRegistrationToken(t *testing.T) {
|
||||||
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
accessTokens := map[*test.User]userDevice{
|
accessTokens := map[*test.User]userDevice{
|
||||||
aliceAdmin: {},
|
aliceAdmin: {},
|
||||||
|
|
@ -509,7 +509,7 @@ func TestAdminUpdateRegistrationToken(t *testing.T) {
|
||||||
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
accessTokens := map[*test.User]userDevice{
|
accessTokens := map[*test.User]userDevice{
|
||||||
aliceAdmin: {},
|
aliceAdmin: {},
|
||||||
|
|
@ -693,7 +693,7 @@ func TestAdminResetPassword(t *testing.T) {
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
// Needed for changing the password/login
|
// Needed for changing the password/login
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
// We mostly need the userAPI for this test, so nil for other APIs/caches etc.
|
// We mostly need the userAPI for this test, so nil for other APIs/caches etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
||||||
|
|
@ -791,7 +791,7 @@ func TestPurgeRoom(t *testing.T) {
|
||||||
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
|
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
|
||||||
rsAPI.SetFederationAPI(fsAPI, nil)
|
rsAPI.SetFederationAPI(fsAPI, nil)
|
||||||
|
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, userAPI, rsAPI, caches, caching.DisableMetrics)
|
syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, userAPI, rsAPI, caches, caching.DisableMetrics)
|
||||||
|
|
||||||
// Create the room
|
// Create the room
|
||||||
|
|
@ -863,7 +863,7 @@ func TestAdminEvacuateRoom(t *testing.T) {
|
||||||
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
|
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
|
||||||
rsAPI.SetFederationAPI(fsAPI, nil)
|
rsAPI.SetFederationAPI(fsAPI, nil)
|
||||||
|
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
// Create the room
|
// Create the room
|
||||||
if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", api.DoNotSendToOtherServers, nil, false); err != nil {
|
if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", api.DoNotSendToOtherServers, nil, false); err != nil {
|
||||||
|
|
@ -964,7 +964,7 @@ func TestAdminEvacuateUser(t *testing.T) {
|
||||||
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, basepkg.CreateFederationClient(cfg, nil), rsAPI, caches, nil, true)
|
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, basepkg.CreateFederationClient(cfg, nil), rsAPI, caches, nil, true)
|
||||||
rsAPI.SetFederationAPI(fsAPI, nil)
|
rsAPI.SetFederationAPI(fsAPI, nil)
|
||||||
|
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
// Create the room
|
// Create the room
|
||||||
if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", api.DoNotSendToOtherServers, nil, false); err != nil {
|
if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", api.DoNotSendToOtherServers, nil, false); err != nil {
|
||||||
|
|
@ -1055,7 +1055,7 @@ func TestAdminMarkAsStale(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
|
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/routing"
|
"github.com/matrix-org/dendrite/clientapi/routing"
|
||||||
"github.com/matrix-org/dendrite/clientapi/threepid"
|
"github.com/matrix-org/dendrite/clientapi/threepid"
|
||||||
|
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
"github.com/matrix-org/dendrite/internal/pushrules"
|
"github.com/matrix-org/dendrite/internal/pushrules"
|
||||||
|
|
@ -49,6 +50,10 @@ type userDevice struct {
|
||||||
password string
|
password string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
|
||||||
|
return &statistics.ServerStatistics{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetPutDevices(t *testing.T) {
|
func TestGetPutDevices(t *testing.T) {
|
||||||
alice := test.NewUser(t)
|
alice := test.NewUser(t)
|
||||||
bob := test.NewUser(t)
|
bob := test.NewUser(t)
|
||||||
|
|
@ -121,7 +126,7 @@ func TestGetPutDevices(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
|
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
@ -170,7 +175,7 @@ func TestDeleteDevice(t *testing.T) {
|
||||||
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
// We mostly need the rsAPI/ for this test, so nil for other APIs/caches etc.
|
// We mostly need the rsAPI/ for this test, so nil for other APIs/caches etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
@ -275,7 +280,7 @@ func TestDeleteDevices(t *testing.T) {
|
||||||
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
// We mostly need the rsAPI/ for this test, so nil for other APIs/caches etc.
|
// We mostly need the rsAPI/ for this test, so nil for other APIs/caches etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
@ -442,7 +447,7 @@ func TestSetDisplayname(t *testing.T) {
|
||||||
|
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
asPI := appservice.NewInternalAPI(processCtx, cfg, natsInstance, userAPI, rsAPI)
|
asPI := appservice.NewInternalAPI(processCtx, cfg, natsInstance, userAPI, rsAPI)
|
||||||
|
|
||||||
AddPublicRoutes(processCtx, routers, cfg, natsInstance, base.CreateFederationClient(cfg, nil), rsAPI, asPI, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, natsInstance, base.CreateFederationClient(cfg, nil), rsAPI, asPI, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
@ -554,7 +559,7 @@ func TestSetAvatarURL(t *testing.T) {
|
||||||
|
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
asPI := appservice.NewInternalAPI(processCtx, cfg, natsInstance, userAPI, rsAPI)
|
asPI := appservice.NewInternalAPI(processCtx, cfg, natsInstance, userAPI, rsAPI)
|
||||||
|
|
||||||
AddPublicRoutes(processCtx, routers, cfg, natsInstance, base.CreateFederationClient(cfg, nil), rsAPI, asPI, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, natsInstance, base.CreateFederationClient(cfg, nil), rsAPI, asPI, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
@ -632,7 +637,7 @@ func TestTyping(t *testing.T) {
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
// Needed to create accounts
|
// Needed to create accounts
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
||||||
|
|
@ -716,7 +721,7 @@ func TestMembership(t *testing.T) {
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
// Needed to create accounts
|
// Needed to create accounts
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
rsAPI.SetUserAPI(userAPI)
|
rsAPI.SetUserAPI(userAPI)
|
||||||
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
@ -955,7 +960,7 @@ func TestCapabilities(t *testing.T) {
|
||||||
// Needed to create accounts
|
// Needed to create accounts
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
||||||
|
|
@ -1002,7 +1007,7 @@ func TestTurnserver(t *testing.T) {
|
||||||
// Needed to create accounts
|
// Needed to create accounts
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
//rsAPI.SetUserAPI(userAPI)
|
//rsAPI.SetUserAPI(userAPI)
|
||||||
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
@ -1100,7 +1105,7 @@ func Test3PID(t *testing.T) {
|
||||||
// Needed to create accounts
|
// Needed to create accounts
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
||||||
|
|
@ -1276,7 +1281,7 @@ func TestPushRules(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
|
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
@ -1663,7 +1668,7 @@ func TestKeys(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
|
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
@ -2125,7 +2130,7 @@ func TestKeyBackup(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
|
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
|
||||||
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
|
@ -21,6 +22,10 @@ import (
|
||||||
uapi "github.com/matrix-org/dendrite/userapi/api"
|
uapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
|
||||||
|
return &statistics.ServerStatistics{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestJoinRoomByIDOrAlias(t *testing.T) {
|
func TestJoinRoomByIDOrAlias(t *testing.T) {
|
||||||
alice := test.NewUser(t)
|
alice := test.NewUser(t)
|
||||||
bob := test.NewUser(t)
|
bob := test.NewUser(t)
|
||||||
|
|
@ -36,7 +41,7 @@ func TestJoinRoomByIDOrAlias(t *testing.T) {
|
||||||
natsInstance := jetstream.NATSInstance{}
|
natsInstance := jetstream.NATSInstance{}
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc
|
rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
|
||||||
|
|
||||||
// Create the users in the userapi
|
// Create the users in the userapi
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ func TestLogin(t *testing.T) {
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
// Needed for /login
|
// Needed for /login
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
// We mostly need the userAPI for this test, so nil for other APIs/caches etc.
|
// We mostly need the userAPI for this test, so nil for other APIs/caches etc.
|
||||||
Setup(routers, cfg, nil, nil, userAPI, nil, nil, nil, nil, nil, nil, nil, caching.DisableMetrics)
|
Setup(routers, cfg, nil, nil, userAPI, nil, nil, nil, nil, nil, nil, nil, caching.DisableMetrics)
|
||||||
|
|
|
||||||
|
|
@ -416,7 +416,7 @@ func Test_register(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
|
@ -596,7 +596,7 @@ func TestRegisterUserWithDisplayName(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
deviceName, deviceID := "deviceName", "deviceID"
|
deviceName, deviceID := "deviceName", "deviceID"
|
||||||
expectedDisplayName := "DisplayName"
|
expectedDisplayName := "DisplayName"
|
||||||
response := completeRegistration(
|
response := completeRegistration(
|
||||||
|
|
@ -637,7 +637,7 @@ func TestRegisterAdminUsingSharedSecret(t *testing.T) {
|
||||||
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
expectedDisplayName := "rabbit"
|
expectedDisplayName := "rabbit"
|
||||||
jsonStr := []byte(`{"admin":true,"mac":"24dca3bba410e43fe64b9b5c28306693bf3baa9f","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`)
|
jsonStr := []byte(`{"admin":true,"mac":"24dca3bba410e43fe64b9b5c28306693bf3baa9f","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`)
|
||||||
|
|
|
||||||
|
|
@ -145,7 +145,7 @@ func (p *P2PMonolith) SetupDendrite(
|
||||||
)
|
)
|
||||||
rsAPI.SetFederationAPI(fsAPI, keyRing)
|
rsAPI.SetFederationAPI(fsAPI, keyRing)
|
||||||
|
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, enableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, enableMetrics, fsAPI.IsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -213,14 +213,15 @@ func main() {
|
||||||
natsInstance := jetstream.NATSInstance{}
|
natsInstance := jetstream.NATSInstance{}
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.EnableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.EnableMetrics)
|
||||||
|
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics)
|
|
||||||
|
|
||||||
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
|
|
||||||
rsAPI.SetAppserviceAPI(asAPI)
|
|
||||||
fsAPI := federationapi.NewInternalAPI(
|
fsAPI := federationapi.NewInternalAPI(
|
||||||
processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true,
|
processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics, fsAPI.IsBlacklistedOrBackingOff)
|
||||||
|
|
||||||
|
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
|
||||||
|
rsAPI.SetAppserviceAPI(asAPI)
|
||||||
|
|
||||||
rsAPI.SetFederationAPI(fsAPI, keyRing)
|
rsAPI.SetFederationAPI(fsAPI, keyRing)
|
||||||
|
|
||||||
monolith := setup.Monolith{
|
monolith := setup.Monolith{
|
||||||
|
|
|
||||||
|
|
@ -162,7 +162,7 @@ func main() {
|
||||||
// dependency. Other components also need updating after their dependencies are up.
|
// dependency. Other components also need updating after their dependencies are up.
|
||||||
rsAPI.SetFederationAPI(fsAPI, keyRing)
|
rsAPI.SetFederationAPI(fsAPI, keyRing)
|
||||||
|
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federationClient, caching.EnableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federationClient, caching.EnableMetrics, fsAPI.IsBlacklistedOrBackingOff)
|
||||||
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
|
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
|
||||||
|
|
||||||
rsAPI.SetAppserviceAPI(asAPI)
|
rsAPI.SetAppserviceAPI(asAPI)
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib/fclient"
|
"github.com/matrix-org/gomatrixserverlib/fclient"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/api"
|
|
||||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/federationapi/consumers"
|
"github.com/matrix-org/dendrite/federationapi/consumers"
|
||||||
"github.com/matrix-org/dendrite/federationapi/internal"
|
"github.com/matrix-org/dendrite/federationapi/internal"
|
||||||
|
|
@ -102,7 +101,7 @@ func NewInternalAPI(
|
||||||
caches *caching.Caches,
|
caches *caching.Caches,
|
||||||
keyRing *gomatrixserverlib.KeyRing,
|
keyRing *gomatrixserverlib.KeyRing,
|
||||||
resetBlacklist bool,
|
resetBlacklist bool,
|
||||||
) api.FederationInternalAPI {
|
) *internal.FederationInternalAPI {
|
||||||
cfg := &dendriteCfg.FederationAPI
|
cfg := &dendriteCfg.FederationAPI
|
||||||
|
|
||||||
federationDB, err := storage.NewDatabase(processContext.Context(), cm, &cfg.Database, caches, dendriteCfg.Global.IsLocalServerName)
|
federationDB, err := storage.NewDatabase(processContext.Context(), cm, &cfg.Database, caches, dendriteCfg.Global.IsLocalServerName)
|
||||||
|
|
|
||||||
|
|
@ -112,7 +112,7 @@ func NewFederationInternalAPI(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *FederationInternalAPI) isBlacklistedOrBackingOff(s spec.ServerName) (*statistics.ServerStatistics, error) {
|
func (a *FederationInternalAPI) IsBlacklistedOrBackingOff(s spec.ServerName) (*statistics.ServerStatistics, error) {
|
||||||
stats := a.statistics.ForServer(s)
|
stats := a.statistics.ForServer(s)
|
||||||
if stats.Blacklisted() {
|
if stats.Blacklisted() {
|
||||||
return stats, &api.FederationClientError{
|
return stats, &api.FederationClientError{
|
||||||
|
|
@ -151,7 +151,7 @@ func failBlacklistableError(err error, stats *statistics.ServerStatistics) (unti
|
||||||
func (a *FederationInternalAPI) doRequestIfNotBackingOffOrBlacklisted(
|
func (a *FederationInternalAPI) doRequestIfNotBackingOffOrBlacklisted(
|
||||||
s spec.ServerName, request func() (interface{}, error),
|
s spec.ServerName, request func() (interface{}, error),
|
||||||
) (interface{}, error) {
|
) (interface{}, error) {
|
||||||
stats, err := a.isBlacklistedOrBackingOff(s)
|
stats, err := a.IsBlacklistedOrBackingOff(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||||
fedAPI "github.com/matrix-org/dendrite/federationapi"
|
fedAPI "github.com/matrix-org/dendrite/federationapi"
|
||||||
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/routing"
|
"github.com/matrix-org/dendrite/federationapi/routing"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
|
|
@ -67,11 +66,8 @@ func TestHandleQueryProfile(t *testing.T) {
|
||||||
keyRing := serverKeyAPI.KeyRing()
|
keyRing := serverKeyAPI.KeyRing()
|
||||||
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, &fedClient, nil, nil, keyRing, true)
|
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, &fedClient, nil, nil, keyRing, true)
|
||||||
userapi := fakeUserAPI{}
|
userapi := fakeUserAPI{}
|
||||||
r, ok := fedapi.(*fedInternal.FederationInternalAPI)
|
|
||||||
if !ok {
|
routing.Setup(routers, cfg, nil, fedapi, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
|
||||||
panic("This is a programming error.")
|
|
||||||
}
|
|
||||||
routing.Setup(routers, cfg, nil, r, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
|
|
||||||
|
|
||||||
handler := fedMux.Get(routing.QueryProfileRouteName).GetHandler().ServeHTTP
|
handler := fedMux.Get(routing.QueryProfileRouteName).GetHandler().ServeHTTP
|
||||||
_, sk, _ := ed25519.GenerateKey(nil)
|
_, sk, _ := ed25519.GenerateKey(nil)
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,6 @@ import (
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||||
fedAPI "github.com/matrix-org/dendrite/federationapi"
|
fedAPI "github.com/matrix-org/dendrite/federationapi"
|
||||||
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/routing"
|
"github.com/matrix-org/dendrite/federationapi/routing"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
|
|
@ -65,11 +64,8 @@ func TestHandleQueryDirectory(t *testing.T) {
|
||||||
keyRing := serverKeyAPI.KeyRing()
|
keyRing := serverKeyAPI.KeyRing()
|
||||||
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, &fedClient, nil, nil, keyRing, true)
|
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, &fedClient, nil, nil, keyRing, true)
|
||||||
userapi := fakeUserAPI{}
|
userapi := fakeUserAPI{}
|
||||||
r, ok := fedapi.(*fedInternal.FederationInternalAPI)
|
|
||||||
if !ok {
|
routing.Setup(routers, cfg, nil, fedapi, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
|
||||||
panic("This is a programming error.")
|
|
||||||
}
|
|
||||||
routing.Setup(routers, cfg, nil, r, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
|
|
||||||
|
|
||||||
handler := fedMux.Get(routing.QueryDirectoryRouteName).GetHandler().ServeHTTP
|
handler := fedMux.Get(routing.QueryDirectoryRouteName).GetHandler().ServeHTTP
|
||||||
_, sk, _ := ed25519.GenerateKey(nil)
|
_, sk, _ := ed25519.GenerateKey(nil)
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ import (
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||||
fedAPI "github.com/matrix-org/dendrite/federationapi"
|
fedAPI "github.com/matrix-org/dendrite/federationapi"
|
||||||
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/routing"
|
"github.com/matrix-org/dendrite/federationapi/routing"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
|
|
@ -62,11 +61,8 @@ func TestHandleSend(t *testing.T) {
|
||||||
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, nil, nil, true)
|
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, nil, nil, true)
|
||||||
serverKeyAPI := &signing.YggdrasilKeys{}
|
serverKeyAPI := &signing.YggdrasilKeys{}
|
||||||
keyRing := serverKeyAPI.KeyRing()
|
keyRing := serverKeyAPI.KeyRing()
|
||||||
r, ok := fedapi.(*fedInternal.FederationInternalAPI)
|
|
||||||
if !ok {
|
routing.Setup(routers, cfg, nil, fedapi, keyRing, nil, nil, &cfg.MSCs, nil, caching.DisableMetrics)
|
||||||
panic("This is a programming error.")
|
|
||||||
}
|
|
||||||
routing.Setup(routers, cfg, nil, r, keyRing, nil, nil, &cfg.MSCs, nil, caching.DisableMetrics)
|
|
||||||
|
|
||||||
handler := fedMux.Get(routing.SendRouteName).GetHandler().ServeHTTP
|
handler := fedMux.Get(routing.SendRouteName).GetHandler().ServeHTTP
|
||||||
_, sk, _ := ed25519.GenerateKey(nil)
|
_, sk, _ := ed25519.GenerateKey(nil)
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
|
|
@ -34,6 +35,10 @@ import (
|
||||||
"github.com/matrix-org/dendrite/test/testrig"
|
"github.com/matrix-org/dendrite/test/testrig"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
|
||||||
|
return &statistics.ServerStatistics{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
type FakeQuerier struct {
|
type FakeQuerier struct {
|
||||||
api.QuerySenderIDAPI
|
api.QuerySenderIDAPI
|
||||||
}
|
}
|
||||||
|
|
@ -58,7 +63,7 @@ func TestUsers(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("kick users", func(t *testing.T) {
|
t.Run("kick users", func(t *testing.T) {
|
||||||
usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
rsAPI.SetUserAPI(usrAPI)
|
rsAPI.SetUserAPI(usrAPI)
|
||||||
testKickUsers(t, rsAPI, usrAPI)
|
testKickUsers(t, rsAPI, usrAPI)
|
||||||
})
|
})
|
||||||
|
|
@ -258,7 +263,7 @@ func TestPurgeRoom(t *testing.T) {
|
||||||
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
|
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
|
||||||
rsAPI.SetFederationAPI(fsAPI, nil)
|
rsAPI.SetFederationAPI(fsAPI, nil)
|
||||||
|
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, fsAPI.IsBlacklistedOrBackingOff)
|
||||||
syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, userAPI, rsAPI, caches, caching.DisableMetrics)
|
syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, userAPI, rsAPI, caches, caching.DisableMetrics)
|
||||||
|
|
||||||
// Create the room
|
// Create the room
|
||||||
|
|
@ -1050,7 +1055,7 @@ func TestUpgrade(t *testing.T) {
|
||||||
|
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
rsAPI.SetUserAPI(userAPI)
|
rsAPI.SetUserAPI(userAPI)
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib/fclient"
|
"github.com/matrix-org/gomatrixserverlib/fclient"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
|
|
@ -108,6 +109,8 @@ type DeviceListUpdater struct {
|
||||||
userIDToChan map[string]chan bool
|
userIDToChan map[string]chan bool
|
||||||
userIDToChanMu *sync.Mutex
|
userIDToChanMu *sync.Mutex
|
||||||
rsAPI rsapi.KeyserverRoomserverAPI
|
rsAPI rsapi.KeyserverRoomserverAPI
|
||||||
|
|
||||||
|
isBlacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater.
|
// DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater.
|
||||||
|
|
@ -167,25 +170,28 @@ func NewDeviceListUpdater(
|
||||||
process *process.ProcessContext, db DeviceListUpdaterDatabase,
|
process *process.ProcessContext, db DeviceListUpdaterDatabase,
|
||||||
api DeviceListUpdaterAPI, producer KeyChangeProducer,
|
api DeviceListUpdaterAPI, producer KeyChangeProducer,
|
||||||
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
|
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
|
||||||
rsAPI rsapi.KeyserverRoomserverAPI, thisServer spec.ServerName,
|
rsAPI rsapi.KeyserverRoomserverAPI,
|
||||||
|
thisServer spec.ServerName,
|
||||||
enableMetrics bool,
|
enableMetrics bool,
|
||||||
|
isBlacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error),
|
||||||
) *DeviceListUpdater {
|
) *DeviceListUpdater {
|
||||||
if enableMetrics {
|
if enableMetrics {
|
||||||
prometheus.MustRegister(deviceListUpdaterBackpressure, deviceListUpdaterServersRetrying)
|
prometheus.MustRegister(deviceListUpdaterBackpressure, deviceListUpdaterServersRetrying)
|
||||||
}
|
}
|
||||||
return &DeviceListUpdater{
|
return &DeviceListUpdater{
|
||||||
process: process,
|
process: process,
|
||||||
userIDToMutex: make(map[string]*sync.Mutex),
|
userIDToMutex: make(map[string]*sync.Mutex),
|
||||||
mu: &sync.Mutex{},
|
mu: &sync.Mutex{},
|
||||||
db: db,
|
db: db,
|
||||||
api: api,
|
api: api,
|
||||||
producer: producer,
|
producer: producer,
|
||||||
fedClient: fedClient,
|
fedClient: fedClient,
|
||||||
thisServer: thisServer,
|
thisServer: thisServer,
|
||||||
workerChans: make([]chan spec.ServerName, numWorkers),
|
workerChans: make([]chan spec.ServerName, numWorkers),
|
||||||
userIDToChan: make(map[string]chan bool),
|
userIDToChan: make(map[string]chan bool),
|
||||||
userIDToChanMu: &sync.Mutex{},
|
userIDToChanMu: &sync.Mutex{},
|
||||||
rsAPI: rsAPI,
|
rsAPI: rsAPI,
|
||||||
|
isBlacklistedOrBackingOffFn: isBlacklistedOrBackingOffFn,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -362,15 +368,24 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
_, err = u.isBlacklistedOrBackingOffFn(remoteServer)
|
||||||
|
var federationClientError *fedsenderapi.FederationClientError
|
||||||
|
if errors.As(err, &federationClientError) {
|
||||||
|
if federationClientError.Blacklisted {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
hash := fnv.New32a()
|
hash := fnv.New32a()
|
||||||
_, _ = hash.Write([]byte(remoteServer))
|
_, _ = hash.Write([]byte(remoteServer))
|
||||||
index := int(int64(hash.Sum32()) % int64(len(u.workerChans)))
|
index := int(int64(hash.Sum32()) % int64(len(u.workerChans)))
|
||||||
|
|
||||||
ch := u.assignChannel(userID)
|
ch := u.assignChannel(userID)
|
||||||
|
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc()
|
||||||
u.workerChans[index] <- remoteServer
|
u.workerChans[index] <- remoteServer
|
||||||
select {
|
select {
|
||||||
case <-ch:
|
case <-ch:
|
||||||
case <-time.After(1 * time.Second):
|
case <-time.After(10 * time.Second):
|
||||||
// we don't return an error in this case as it's not a failure condition.
|
// we don't return an error in this case as it's not a failure condition.
|
||||||
// we mainly block for the benefit of sytest anyway
|
// we mainly block for the benefit of sytest anyway
|
||||||
}
|
}
|
||||||
|
|
@ -403,27 +418,37 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
|
||||||
go func() {
|
go func() {
|
||||||
var serversToRetry []spec.ServerName
|
var serversToRetry []spec.ServerName
|
||||||
for {
|
for {
|
||||||
|
deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
|
||||||
serversToRetry = serversToRetry[:0] // reuse memory
|
serversToRetry = serversToRetry[:0] // reuse memory
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second * 10)
|
||||||
|
if len(ch) == cap(ch) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
maxServers := cap(ch) - len(ch)
|
||||||
|
|
||||||
retriesMu.Lock()
|
retriesMu.Lock()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
for srv, retryAt := range retries {
|
for srv, retryAt := range retries {
|
||||||
if now.After(retryAt) {
|
if now.After(retryAt) {
|
||||||
serversToRetry = append(serversToRetry, srv)
|
serversToRetry = append(serversToRetry, srv)
|
||||||
|
if maxServers == len(serversToRetry) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, srv := range serversToRetry {
|
for _, srv := range serversToRetry {
|
||||||
delete(retries, srv)
|
delete(retries, srv)
|
||||||
}
|
}
|
||||||
deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
|
|
||||||
retriesMu.Unlock()
|
retriesMu.Unlock()
|
||||||
|
|
||||||
for _, srv := range serversToRetry {
|
for _, srv := range serversToRetry {
|
||||||
|
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc()
|
||||||
ch <- srv
|
ch <- srv
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
for serverName := range ch {
|
for serverName := range ch {
|
||||||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc()
|
|
||||||
retriesMu.Lock()
|
retriesMu.Lock()
|
||||||
_, exists := retries[serverName]
|
_, exists := retries[serverName]
|
||||||
retriesMu.Unlock()
|
retriesMu.Unlock()
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
@ -129,6 +130,10 @@ type mockDeviceListUpdaterAPI struct {
|
||||||
func (d *mockDeviceListUpdaterAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) {
|
func (d *mockDeviceListUpdaterAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
|
||||||
|
return &statistics.ServerStatistics{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
type roundTripper struct {
|
type roundTripper struct {
|
||||||
fn func(*http.Request) (*http.Response, error)
|
fn func(*http.Request) (*http.Response, error)
|
||||||
}
|
}
|
||||||
|
|
@ -162,7 +167,7 @@ func TestUpdateHavePrevID(t *testing.T) {
|
||||||
}
|
}
|
||||||
ap := &mockDeviceListUpdaterAPI{}
|
ap := &mockDeviceListUpdaterAPI{}
|
||||||
producer := &mockKeyChangeProducer{}
|
producer := &mockKeyChangeProducer{}
|
||||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics)
|
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
event := gomatrixserverlib.DeviceListUpdateEvent{
|
event := gomatrixserverlib.DeviceListUpdateEvent{
|
||||||
DeviceDisplayName: "Foo Bar",
|
DeviceDisplayName: "Foo Bar",
|
||||||
Deleted: false,
|
Deleted: false,
|
||||||
|
|
@ -234,7 +239,7 @@ func TestUpdateNoPrevID(t *testing.T) {
|
||||||
`)),
|
`)),
|
||||||
}, nil
|
}, nil
|
||||||
})
|
})
|
||||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics)
|
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
if err := updater.Start(); err != nil {
|
if err := updater.Start(); err != nil {
|
||||||
t.Fatalf("failed to start updater: %s", err)
|
t.Fatalf("failed to start updater: %s", err)
|
||||||
}
|
}
|
||||||
|
|
@ -304,7 +309,7 @@ func TestDebounce(t *testing.T) {
|
||||||
close(incomingFedReq)
|
close(incomingFedReq)
|
||||||
return <-fedCh, nil
|
return <-fedCh, nil
|
||||||
})
|
})
|
||||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics)
|
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
if err := updater.Start(); err != nil {
|
if err := updater.Start(); err != nil {
|
||||||
t.Fatalf("failed to start updater: %s", err)
|
t.Fatalf("failed to start updater: %s", err)
|
||||||
}
|
}
|
||||||
|
|
@ -407,7 +412,7 @@ func TestDeviceListUpdater_CleanUp(t *testing.T) {
|
||||||
|
|
||||||
updater := NewDeviceListUpdater(processCtx, db, nil,
|
updater := NewDeviceListUpdater(processCtx, db, nil,
|
||||||
nil, nil,
|
nil, nil,
|
||||||
0, rsAPI, "test", caching.DisableMetrics)
|
0, rsAPI, "test", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
if err := updater.CleanUp(); err != nil {
|
if err := updater.CleanUp(); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,10 +18,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
|
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||||
"github.com/matrix-org/dendrite/internal/pushgateway"
|
"github.com/matrix-org/dendrite/internal/pushgateway"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
|
@ -47,6 +49,7 @@ func NewInternalAPI(
|
||||||
rsAPI rsapi.UserRoomserverAPI,
|
rsAPI rsapi.UserRoomserverAPI,
|
||||||
fedClient fedsenderapi.KeyserverFederationAPI,
|
fedClient fedsenderapi.KeyserverFederationAPI,
|
||||||
enableMetrics bool,
|
enableMetrics bool,
|
||||||
|
blacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error),
|
||||||
) *internal.UserInternalAPI {
|
) *internal.UserInternalAPI {
|
||||||
js, _ := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream)
|
js, _ := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream)
|
||||||
appServices := dendriteCfg.Derived.ApplicationServices
|
appServices := dendriteCfg.Derived.ApplicationServices
|
||||||
|
|
@ -100,7 +103,7 @@ func NewInternalAPI(
|
||||||
FedClient: fedClient,
|
FedClient: fedClient,
|
||||||
}
|
}
|
||||||
|
|
||||||
updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics)
|
updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics, blacklistedOrBackingOffFn)
|
||||||
userAPI.Updater = updater
|
userAPI.Updater = updater
|
||||||
// Remove users which we don't share a room with anymore
|
// Remove users which we don't share a room with anymore
|
||||||
if err := updater.CleanUp(); err != nil {
|
if err := updater.CleanUp(); err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue