Merge branch 'main' into contributing-links

This commit is contained in:
kegsay 2023-11-09 10:03:19 +00:00 committed by GitHub
commit dc2bc5a380
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
72 changed files with 327 additions and 152 deletions

View file

@ -14,6 +14,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
@ -32,6 +33,10 @@ import (
"github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/test/testrig"
) )
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
return &statistics.ServerStatistics{}, nil
}
func TestAppserviceInternalAPI(t *testing.T) { func TestAppserviceInternalAPI(t *testing.T) {
// Set expected results // Set expected results
@ -144,7 +149,7 @@ func TestAppserviceInternalAPI(t *testing.T) {
cm := sqlutil.NewConnectionManager(ctx, cfg.Global.DatabaseOptions) cm := sqlutil.NewConnectionManager(ctx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(ctx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(ctx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(ctx, cfg, &natsInstance, usrAPI, rsAPI) asAPI := appservice.NewInternalAPI(ctx, cfg, &natsInstance, usrAPI, rsAPI)
runCases(t, asAPI) runCases(t, asAPI)
@ -239,7 +244,7 @@ func TestAppserviceInternalAPI_UnixSocket_Simple(t *testing.T) {
cm := sqlutil.NewConnectionManager(ctx, cfg.Global.DatabaseOptions) cm := sqlutil.NewConnectionManager(ctx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(ctx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(ctx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(ctx, cfg, &natsInstance, usrAPI, rsAPI) asAPI := appservice.NewInternalAPI(ctx, cfg, &natsInstance, usrAPI, rsAPI)
t.Run("UserIDExists", func(t *testing.T) { t.Run("UserIDExists", func(t *testing.T) {
@ -378,7 +383,7 @@ func TestRoomserverConsumerOneInvite(t *testing.T) {
// Create required internal APIs // Create required internal APIs
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics) usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// start the consumer // start the consumer
appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI) appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI)

View file

@ -191,13 +191,13 @@ func startup() {
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics) fedSenderAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics, fedSenderAPI.IsBlacklistedOrBackingOff)
asQuery := appservice.NewInternalAPI( asQuery := appservice.NewInternalAPI(
processCtx, cfg, &natsInstance, userAPI, rsAPI, processCtx, cfg, &natsInstance, userAPI, rsAPI,
) )
rsAPI.SetAppserviceAPI(asQuery) rsAPI.SetAppserviceAPI(asQuery)
fedSenderAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true)
rsAPI.SetFederationAPI(fedSenderAPI, keyRing) rsAPI.SetFederationAPI(fedSenderAPI, keyRing)
monolith := setup.Monolith{ monolith := setup.Monolith{

View file

@ -216,7 +216,7 @@ func (m *DendriteMonolith) Start() {
processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true, processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true,
) )
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics, fsAPI.IsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI) rsAPI.SetAppserviceAPI(asAPI)

View file

@ -45,7 +45,7 @@ func TestAdminCreateToken(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
accessTokens := map[*test.User]userDevice{ accessTokens := map[*test.User]userDevice{
aliceAdmin: {}, aliceAdmin: {},
@ -196,7 +196,7 @@ func TestAdminListRegistrationTokens(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
accessTokens := map[*test.User]userDevice{ accessTokens := map[*test.User]userDevice{
aliceAdmin: {}, aliceAdmin: {},
@ -314,7 +314,7 @@ func TestAdminGetRegistrationToken(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
accessTokens := map[*test.User]userDevice{ accessTokens := map[*test.User]userDevice{
aliceAdmin: {}, aliceAdmin: {},
@ -415,7 +415,7 @@ func TestAdminDeleteRegistrationToken(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
accessTokens := map[*test.User]userDevice{ accessTokens := map[*test.User]userDevice{
aliceAdmin: {}, aliceAdmin: {},
@ -509,7 +509,7 @@ func TestAdminUpdateRegistrationToken(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
accessTokens := map[*test.User]userDevice{ accessTokens := map[*test.User]userDevice{
aliceAdmin: {}, aliceAdmin: {},
@ -693,7 +693,7 @@ func TestAdminResetPassword(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
// Needed for changing the password/login // Needed for changing the password/login
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the userAPI for this test, so nil for other APIs/caches etc. // We mostly need the userAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -791,7 +791,7 @@ func TestPurgeRoom(t *testing.T) {
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true) fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(fsAPI, nil) rsAPI.SetFederationAPI(fsAPI, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, userAPI, rsAPI, caches, caching.DisableMetrics) syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, userAPI, rsAPI, caches, caching.DisableMetrics)
// Create the room // Create the room
@ -863,7 +863,7 @@ func TestAdminEvacuateRoom(t *testing.T) {
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true) fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(fsAPI, nil) rsAPI.SetFederationAPI(fsAPI, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// Create the room // Create the room
if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", api.DoNotSendToOtherServers, nil, false); err != nil { if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", api.DoNotSendToOtherServers, nil, false); err != nil {
@ -964,7 +964,7 @@ func TestAdminEvacuateUser(t *testing.T) {
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, basepkg.CreateFederationClient(cfg, nil), rsAPI, caches, nil, true) fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, basepkg.CreateFederationClient(cfg, nil), rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(fsAPI, nil) rsAPI.SetFederationAPI(fsAPI, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// Create the room // Create the room
if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", api.DoNotSendToOtherServers, nil, false); err != nil { if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", api.DoNotSendToOtherServers, nil, false); err != nil {
@ -1055,7 +1055,7 @@ func TestAdminMarkAsStale(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc. // We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)

View file

@ -17,6 +17,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/routing" "github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/clientapi/threepid" "github.com/matrix-org/dendrite/clientapi/threepid"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/pushrules"
@ -49,6 +50,10 @@ type userDevice struct {
password string password string
} }
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
return &statistics.ServerStatistics{}, nil
}
func TestGetPutDevices(t *testing.T) { func TestGetPutDevices(t *testing.T) {
alice := test.NewUser(t) alice := test.NewUser(t)
bob := test.NewUser(t) bob := test.NewUser(t)
@ -121,7 +126,7 @@ func TestGetPutDevices(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc. // We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -170,7 +175,7 @@ func TestDeleteDevice(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI/ for this test, so nil for other APIs/caches etc. // We mostly need the rsAPI/ for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -275,7 +280,7 @@ func TestDeleteDevices(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI/ for this test, so nil for other APIs/caches etc. // We mostly need the rsAPI/ for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -442,7 +447,7 @@ func TestSetDisplayname(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
asPI := appservice.NewInternalAPI(processCtx, cfg, natsInstance, userAPI, rsAPI) asPI := appservice.NewInternalAPI(processCtx, cfg, natsInstance, userAPI, rsAPI)
AddPublicRoutes(processCtx, routers, cfg, natsInstance, base.CreateFederationClient(cfg, nil), rsAPI, asPI, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, natsInstance, base.CreateFederationClient(cfg, nil), rsAPI, asPI, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -554,7 +559,7 @@ func TestSetAvatarURL(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
asPI := appservice.NewInternalAPI(processCtx, cfg, natsInstance, userAPI, rsAPI) asPI := appservice.NewInternalAPI(processCtx, cfg, natsInstance, userAPI, rsAPI)
AddPublicRoutes(processCtx, routers, cfg, natsInstance, base.CreateFederationClient(cfg, nil), rsAPI, asPI, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, natsInstance, base.CreateFederationClient(cfg, nil), rsAPI, asPI, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -632,7 +637,7 @@ func TestTyping(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
// Needed to create accounts // Needed to create accounts
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc. // We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -716,7 +721,7 @@ func TestMembership(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
// Needed to create accounts // Needed to create accounts
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
rsAPI.SetUserAPI(userAPI) rsAPI.SetUserAPI(userAPI)
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc. // We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -955,7 +960,7 @@ func TestCapabilities(t *testing.T) {
// Needed to create accounts // Needed to create accounts
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc. // We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -1002,7 +1007,7 @@ func TestTurnserver(t *testing.T) {
// Needed to create accounts // Needed to create accounts
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
//rsAPI.SetUserAPI(userAPI) //rsAPI.SetUserAPI(userAPI)
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc. // We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -1100,7 +1105,7 @@ func Test3PID(t *testing.T) {
// Needed to create accounts // Needed to create accounts
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc. // We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -1276,7 +1281,7 @@ func TestPushRules(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc. // We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -1663,7 +1668,7 @@ func TestKeys(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc. // We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@ -2125,7 +2130,7 @@ func TestKeyBackup(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc. // We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)

View file

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
@ -21,6 +22,10 @@ import (
uapi "github.com/matrix-org/dendrite/userapi/api" uapi "github.com/matrix-org/dendrite/userapi/api"
) )
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
return &statistics.ServerStatistics{}, nil
}
func TestJoinRoomByIDOrAlias(t *testing.T) { func TestJoinRoomByIDOrAlias(t *testing.T) {
alice := test.NewUser(t) alice := test.NewUser(t)
bob := test.NewUser(t) bob := test.NewUser(t)
@ -36,7 +41,7 @@ func TestJoinRoomByIDOrAlias(t *testing.T) {
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
// Create the users in the userapi // Create the users in the userapi

View file

@ -49,7 +49,7 @@ func TestLogin(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
// Needed for /login // Needed for /login
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the userAPI for this test, so nil for other APIs/caches etc. // We mostly need the userAPI for this test, so nil for other APIs/caches etc.
Setup(routers, cfg, nil, nil, userAPI, nil, nil, nil, nil, nil, nil, nil, caching.DisableMetrics) Setup(routers, cfg, nil, nil, userAPI, nil, nil, nil, nil, nil, nil, nil, caching.DisableMetrics)

View file

@ -416,7 +416,7 @@ func Test_register(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
@ -596,7 +596,7 @@ func TestRegisterUserWithDisplayName(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
deviceName, deviceID := "deviceName", "deviceID" deviceName, deviceID := "deviceName", "deviceID"
expectedDisplayName := "DisplayName" expectedDisplayName := "DisplayName"
response := completeRegistration( response := completeRegistration(
@ -637,7 +637,7 @@ func TestRegisterAdminUsingSharedSecret(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
expectedDisplayName := "rabbit" expectedDisplayName := "rabbit"
jsonStr := []byte(`{"admin":true,"mac":"24dca3bba410e43fe64b9b5c28306693bf3baa9f","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`) jsonStr := []byte(`{"admin":true,"mac":"24dca3bba410e43fe64b9b5c28306693bf3baa9f","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`)

View file

@ -145,7 +145,7 @@ func (p *P2PMonolith) SetupDendrite(
) )
rsAPI.SetFederationAPI(fsAPI, keyRing) rsAPI.SetFederationAPI(fsAPI, keyRing)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, enableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, enableMetrics, fsAPI.IsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)

View file

@ -213,14 +213,15 @@ func main() {
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.EnableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.EnableMetrics)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics)
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationapi.NewInternalAPI( fsAPI := federationapi.NewInternalAPI(
processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true, processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true,
) )
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics, fsAPI.IsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
rsAPI.SetFederationAPI(fsAPI, keyRing) rsAPI.SetFederationAPI(fsAPI, keyRing)
monolith := setup.Monolith{ monolith := setup.Monolith{

View file

@ -162,7 +162,7 @@ func main() {
// dependency. Other components also need updating after their dependencies are up. // dependency. Other components also need updating after their dependencies are up.
rsAPI.SetFederationAPI(fsAPI, keyRing) rsAPI.SetFederationAPI(fsAPI, keyRing)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federationClient, caching.EnableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federationClient, caching.EnableMetrics, fsAPI.IsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI) rsAPI.SetAppserviceAPI(asAPI)

View file

@ -24,7 +24,6 @@ import (
"github.com/matrix-org/gomatrixserverlib/fclient" "github.com/matrix-org/gomatrixserverlib/fclient"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/federationapi/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers" "github.com/matrix-org/dendrite/federationapi/consumers"
"github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/federationapi/internal"
@ -102,7 +101,7 @@ func NewInternalAPI(
caches *caching.Caches, caches *caching.Caches,
keyRing *gomatrixserverlib.KeyRing, keyRing *gomatrixserverlib.KeyRing,
resetBlacklist bool, resetBlacklist bool,
) api.FederationInternalAPI { ) *internal.FederationInternalAPI {
cfg := &dendriteCfg.FederationAPI cfg := &dendriteCfg.FederationAPI
federationDB, err := storage.NewDatabase(processContext.Context(), cm, &cfg.Database, caches, dendriteCfg.Global.IsLocalServerName) federationDB, err := storage.NewDatabase(processContext.Context(), cm, &cfg.Database, caches, dendriteCfg.Global.IsLocalServerName)

View file

@ -112,7 +112,7 @@ func NewFederationInternalAPI(
} }
} }
func (a *FederationInternalAPI) isBlacklistedOrBackingOff(s spec.ServerName) (*statistics.ServerStatistics, error) { func (a *FederationInternalAPI) IsBlacklistedOrBackingOff(s spec.ServerName) (*statistics.ServerStatistics, error) {
stats := a.statistics.ForServer(s) stats := a.statistics.ForServer(s)
if stats.Blacklisted() { if stats.Blacklisted() {
return stats, &api.FederationClientError{ return stats, &api.FederationClientError{
@ -151,7 +151,7 @@ func failBlacklistableError(err error, stats *statistics.ServerStatistics) (unti
func (a *FederationInternalAPI) doRequestIfNotBackingOffOrBlacklisted( func (a *FederationInternalAPI) doRequestIfNotBackingOffOrBlacklisted(
s spec.ServerName, request func() (interface{}, error), s spec.ServerName, request func() (interface{}, error),
) (interface{}, error) { ) (interface{}, error) {
stats, err := a.isBlacklistedOrBackingOff(s) stats, err := a.IsBlacklistedOrBackingOff(s)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
fedAPI "github.com/matrix-org/dendrite/federationapi" fedAPI "github.com/matrix-org/dendrite/federationapi"
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
@ -67,11 +66,8 @@ func TestHandleQueryProfile(t *testing.T) {
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, &fedClient, nil, nil, keyRing, true) fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, &fedClient, nil, nil, keyRing, true)
userapi := fakeUserAPI{} userapi := fakeUserAPI{}
r, ok := fedapi.(*fedInternal.FederationInternalAPI)
if !ok { routing.Setup(routers, cfg, nil, fedapi, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
panic("This is a programming error.")
}
routing.Setup(routers, cfg, nil, r, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
handler := fedMux.Get(routing.QueryProfileRouteName).GetHandler().ServeHTTP handler := fedMux.Get(routing.QueryProfileRouteName).GetHandler().ServeHTTP
_, sk, _ := ed25519.GenerateKey(nil) _, sk, _ := ed25519.GenerateKey(nil)

View file

@ -25,7 +25,6 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
fedAPI "github.com/matrix-org/dendrite/federationapi" fedAPI "github.com/matrix-org/dendrite/federationapi"
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
@ -65,11 +64,8 @@ func TestHandleQueryDirectory(t *testing.T) {
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, &fedClient, nil, nil, keyRing, true) fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, &fedClient, nil, nil, keyRing, true)
userapi := fakeUserAPI{} userapi := fakeUserAPI{}
r, ok := fedapi.(*fedInternal.FederationInternalAPI)
if !ok { routing.Setup(routers, cfg, nil, fedapi, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
panic("This is a programming error.")
}
routing.Setup(routers, cfg, nil, r, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
handler := fedMux.Get(routing.QueryDirectoryRouteName).GetHandler().ServeHTTP handler := fedMux.Get(routing.QueryDirectoryRouteName).GetHandler().ServeHTTP
_, sk, _ := ed25519.GenerateKey(nil) _, sk, _ := ed25519.GenerateKey(nil)

View file

@ -23,7 +23,6 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
fedAPI "github.com/matrix-org/dendrite/federationapi" fedAPI "github.com/matrix-org/dendrite/federationapi"
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
@ -62,11 +61,8 @@ func TestHandleSend(t *testing.T) {
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, nil, nil, true) fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, nil, nil, true)
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
r, ok := fedapi.(*fedInternal.FederationInternalAPI)
if !ok { routing.Setup(routers, cfg, nil, fedapi, keyRing, nil, nil, &cfg.MSCs, nil, caching.DisableMetrics)
panic("This is a programming error.")
}
routing.Setup(routers, cfg, nil, r, keyRing, nil, nil, &cfg.MSCs, nil, caching.DisableMetrics)
handler := fedMux.Get(routing.SendRouteName).GetHandler().ServeHTTP handler := fedMux.Get(routing.SendRouteName).GetHandler().ServeHTTP
_, sk, _ := ed25519.GenerateKey(nil) _, sk, _ := ed25519.GenerateKey(nil)

View file

@ -151,7 +151,7 @@ func (s *notaryServerKeysMetadataStatements) SelectKeys(ctx context.Context, txn
} }
results = append(results, sk) results = append(results, sk)
} }
return results, nil return results, rows.Err()
} }
func (s *notaryServerKeysMetadataStatements) DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error { func (s *notaryServerKeysMetadataStatements) DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error {

View file

@ -109,5 +109,5 @@ func (s *queueJSONStatements) SelectQueueJSON(
} }
blobs[nid] = blob blobs[nid] = blob
} }
return blobs, err return blobs, rows.Err()
} }

View file

@ -110,7 +110,7 @@ func (s *relayServersStatements) SelectRelayServers(
} }
result = append(result, spec.ServerName(relayServer)) result = append(result, spec.ServerName(relayServer))
} }
return result, nil return result, rows.Err()
} }
func (s *relayServersStatements) DeleteRelayServers( func (s *relayServersStatements) DeleteRelayServers(

View file

@ -216,5 +216,5 @@ func joinedHostsFromStmt(
}) })
} }
return result, nil return result, rows.Err()
} }

View file

@ -154,7 +154,7 @@ func (s *notaryServerKeysMetadataStatements) SelectKeys(ctx context.Context, txn
} }
results = append(results, sk) results = append(results, sk)
} }
return results, nil return results, rows.Err()
} }
func (s *notaryServerKeysMetadataStatements) DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error { func (s *notaryServerKeysMetadataStatements) DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error {

View file

@ -135,5 +135,5 @@ func (s *queueJSONStatements) SelectQueueJSON(
} }
blobs[nid] = blob blobs[nid] = blob
} }
return blobs, err return blobs, rows.Err()
} }

View file

@ -109,7 +109,7 @@ func (s *relayServersStatements) SelectRelayServers(
} }
result = append(result, spec.ServerName(relayServer)) result = append(result, spec.ServerName(relayServer))
} }
return result, nil return result, rows.Err()
} }
func (s *relayServersStatements) DeleteRelayServers( func (s *relayServersStatements) DeleteRelayServers(

View file

@ -109,5 +109,5 @@ func (s *relayQueueJSONStatements) SelectQueueJSON(
} }
blobs[nid] = blob blobs[nid] = blob
} }
return blobs, err return blobs, rows.Err()
} }

View file

@ -133,5 +133,5 @@ func (s *relayQueueJSONStatements) SelectQueueJSON(
} }
blobs[nid] = blob blobs[nid] = blob
} }
return blobs, err return blobs, rows.Err()
} }

View file

@ -17,6 +17,7 @@ package query
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"sort" "sort"
@ -56,6 +57,12 @@ func (querier *Queryer) QueryNextRoomHierarchyPage(ctx context.Context, walker r
break break
} }
// If the context is canceled, we might still have discovered rooms
// return them to the client and let the client know there _may_ be more rooms.
if errors.Is(ctx.Err(), context.Canceled) {
break
}
// pop the stack // pop the stack
queuedRoom := unvisited[len(unvisited)-1] queuedRoom := unvisited[len(unvisited)-1]
unvisited = unvisited[:len(unvisited)-1] unvisited = unvisited[:len(unvisited)-1]
@ -112,6 +119,11 @@ func (querier *Queryer) QueryNextRoomHierarchyPage(ctx context.Context, walker r
pubRoom := publicRoomsChunk(ctx, querier, queuedRoom.RoomID) pubRoom := publicRoomsChunk(ctx, querier, queuedRoom.RoomID)
if pubRoom == nil {
util.GetLogger(ctx).WithField("room_id", queuedRoom.RoomID).Debug("unable to get publicRoomsChunk")
continue
}
discoveredRooms = append(discoveredRooms, fclient.RoomHierarchyRoom{ discoveredRooms = append(discoveredRooms, fclient.RoomHierarchyRoom{
PublicRoom: *pubRoom, PublicRoom: *pubRoom,
RoomType: roomType, RoomType: roomType,

View file

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
@ -34,6 +35,10 @@ import (
"github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/test/testrig"
) )
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
return &statistics.ServerStatistics{}, nil
}
type FakeQuerier struct { type FakeQuerier struct {
api.QuerySenderIDAPI api.QuerySenderIDAPI
} }
@ -58,7 +63,7 @@ func TestUsers(t *testing.T) {
}) })
t.Run("kick users", func(t *testing.T) { t.Run("kick users", func(t *testing.T) {
usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
rsAPI.SetUserAPI(usrAPI) rsAPI.SetUserAPI(usrAPI)
testKickUsers(t, rsAPI, usrAPI) testKickUsers(t, rsAPI, usrAPI)
}) })
@ -258,7 +263,7 @@ func TestPurgeRoom(t *testing.T) {
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true) fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(fsAPI, nil) rsAPI.SetFederationAPI(fsAPI, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, fsAPI.IsBlacklistedOrBackingOff)
syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, userAPI, rsAPI, caches, caching.DisableMetrics) syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, userAPI, rsAPI, caches, caching.DisableMetrics)
// Create the room // Create the room
@ -1050,7 +1055,7 @@ func TestUpgrade(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
rsAPI.SetUserAPI(userAPI) rsAPI.SetUserAPI(userAPI)
for _, tc := range testCases { for _, tc := range testCases {

View file

@ -249,6 +249,7 @@ func (s *eventStatements) BulkSelectSnapshotsFromEventIDs(
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer internal.CloseAndLogIfError(ctx, rows, "BulkSelectSnapshotsFromEventIDs: rows.close() failed")
var eventID string var eventID string
var stateNID types.StateSnapshotNID var stateNID types.StateSnapshotNID
@ -563,7 +564,7 @@ func (s *eventStatements) SelectRoomNIDsForEventNIDs(
} }
result[eventNID] = roomNID result[eventNID] = roomNID
} }
return result, nil return result, rows.Err()
} }
func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array { func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array {

View file

@ -363,7 +363,7 @@ func (s *membershipStatements) SelectRoomsWithMembership(
} }
roomNIDs = append(roomNIDs, roomNID) roomNIDs = append(roomNIDs, roomNID)
} }
return roomNIDs, nil return roomNIDs, rows.Err()
} }
func (s *membershipStatements) SelectJoinedUsersSetForRooms( func (s *membershipStatements) SelectJoinedUsersSetForRooms(

View file

@ -137,7 +137,7 @@ func (s *roomStatements) SelectRoomIDsWithEvents(ctx context.Context, txn *sql.T
} }
roomIDs = append(roomIDs, roomID) roomIDs = append(roomIDs, roomID)
} }
return roomIDs, nil return roomIDs, rows.Err()
} }
func (s *roomStatements) InsertRoomNID( func (s *roomStatements) InsertRoomNID(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
@ -255,7 +255,7 @@ func (s *roomStatements) SelectRoomVersionsForRoomNIDs(
} }
result[roomNID] = roomVersion result[roomNID] = roomVersion
} }
return result, nil return result, rows.Err()
} }
func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID) ([]string, error) { func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID) ([]string, error) {
@ -277,7 +277,7 @@ func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, txn *sql.Tx, roo
} }
roomIDs = append(roomIDs, roomID) roomIDs = append(roomIDs, roomID)
} }
return roomIDs, nil return roomIDs, rows.Err()
} }
func (s *roomStatements) BulkSelectRoomNIDs(ctx context.Context, txn *sql.Tx, roomIDs []string) ([]types.RoomNID, error) { func (s *roomStatements) BulkSelectRoomNIDs(ctx context.Context, txn *sql.Tx, roomIDs []string) ([]types.RoomNID, error) {
@ -299,7 +299,7 @@ func (s *roomStatements) BulkSelectRoomNIDs(ctx context.Context, txn *sql.Tx, ro
} }
roomNIDs = append(roomNIDs, roomNID) roomNIDs = append(roomNIDs, roomNID)
} }
return roomNIDs, nil return roomNIDs, rows.Err()
} }
func roomNIDsAsArray(roomNIDs []types.RoomNID) pq.Int64Array { func roomNIDsAsArray(roomNIDs []types.RoomNID) pq.Int64Array {

View file

@ -162,6 +162,7 @@ func (s *userRoomKeysStatements) SelectAllPublicKeysForUser(ctx context.Context,
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
return nil, nil return nil, nil
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectAllPublicKeysForUser: failed to close rows")
resultMap := make(map[types.RoomNID]ed25519.PublicKey) resultMap := make(map[types.RoomNID]ed25519.PublicKey)
@ -173,5 +174,5 @@ func (s *userRoomKeysStatements) SelectAllPublicKeysForUser(ctx context.Context,
} }
resultMap[roomNID] = pubkey resultMap[roomNID] = pubkey
} }
return resultMap, err return resultMap, rows.Err()
} }

View file

@ -109,5 +109,5 @@ func (s *eventJSONStatements) BulkSelectEventJSON(
} }
result.EventNID = types.EventNID(eventNID) result.EventNID = types.EventNID(eventNID)
} }
return results[:i], nil return results[:i], rows.Err()
} }

View file

@ -136,7 +136,7 @@ func (s *eventStateKeyStatements) BulkSelectEventStateKeyNID(
} }
result[stateKey] = types.EventStateKeyNID(stateKeyNID) result[stateKey] = types.EventStateKeyNID(stateKeyNID)
} }
return result, nil return result, rows.Err()
} }
func (s *eventStateKeyStatements) BulkSelectEventStateKey( func (s *eventStateKeyStatements) BulkSelectEventStateKey(
@ -167,5 +167,5 @@ func (s *eventStateKeyStatements) BulkSelectEventStateKey(
} }
result[types.EventStateKeyNID(stateKeyNID)] = stateKey result[types.EventStateKeyNID(stateKeyNID)] = stateKey
} }
return result, nil return result, rows.Err()
} }

View file

@ -147,5 +147,5 @@ func (s *eventTypeStatements) BulkSelectEventTypeNID(
} }
result[eventType] = types.EventTypeNID(eventTypeNID) result[eventType] = types.EventTypeNID(eventTypeNID)
} }
return result, nil return result, rows.Err()
} }

View file

@ -310,6 +310,9 @@ func (s *eventStatements) BulkSelectStateEventByID(
} }
results = append(results, result) results = append(results, result)
} }
if err = rows.Err(); err != nil {
return nil, err
}
if !excludeRejected && i != len(eventIDs) { if !excludeRejected && i != len(eventIDs) {
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have. // If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
// We don't know which ones were missing because we don't return the string IDs in the query. // We don't know which ones were missing because we don't return the string IDs in the query.
@ -377,7 +380,7 @@ func (s *eventStatements) BulkSelectStateEventByNID(
return nil, err return nil, err
} }
} }
return results[:i], err return results[:i], rows.Err()
} }
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID. // bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
@ -425,6 +428,9 @@ func (s *eventStatements) BulkSelectStateAtEventByID(
) )
} }
} }
if err = rows.Err(); err != nil {
return nil, err
}
if i != len(eventIDs) { if i != len(eventIDs) {
return nil, types.MissingEventError( return nil, types.MissingEventError(
fmt.Sprintf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)), fmt.Sprintf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)),
@ -507,6 +513,9 @@ func (s *eventStatements) BulkSelectStateAtEventAndReference(
result.BeforeStateSnapshotNID = types.StateSnapshotNID(stateSnapshotNID) result.BeforeStateSnapshotNID = types.StateSnapshotNID(stateSnapshotNID)
result.EventID = eventID result.EventID = eventID
} }
if err = rows.Err(); err != nil {
return nil, err
}
if i != len(eventNIDs) { if i != len(eventNIDs) {
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs)) return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
} }
@ -544,6 +553,9 @@ func (s *eventStatements) BulkSelectEventID(ctx context.Context, txn *sql.Tx, ev
} }
results[types.EventNID(eventNID)] = eventID results[types.EventNID(eventNID)] = eventID
} }
if err = rows.Err(); err != nil {
return nil, err
}
if i != len(eventNIDs) { if i != len(eventNIDs) {
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs)) return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
} }
@ -602,7 +614,7 @@ func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, e
RoomNID: types.RoomNID(roomNID), RoomNID: types.RoomNID(roomNID),
} }
} }
return results, nil return results, rows.Err()
} }
func (s *eventStatements) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) { func (s *eventStatements) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) {
@ -652,7 +664,7 @@ func (s *eventStatements) SelectRoomNIDsForEventNIDs(
} }
result[eventNID] = roomNID result[eventNID] = roomNID
} }
return result, nil return result, rows.Err()
} }
func eventNIDsAsArray(eventNIDs []types.EventNID) string { func eventNIDsAsArray(eventNIDs []types.EventNID) string {

View file

@ -126,6 +126,9 @@ func (s *inviteStatements) UpdateInviteRetired(
} }
eventIDs = append(eventIDs, inviteEventID) eventIDs = append(eventIDs, inviteEventID)
} }
if err = rows.Err(); err != nil {
return
}
// now retire the invites // now retire the invites
stmt = sqlutil.TxStmt(txn, s.updateInviteRetiredStmt) stmt = sqlutil.TxStmt(txn, s.updateInviteRetiredStmt)
_, err = stmt.ExecContext(ctx, roomNID, targetUserNID) _, err = stmt.ExecContext(ctx, roomNID, targetUserNID)
@ -157,5 +160,5 @@ func (s *inviteStatements) SelectInviteActiveForUserInRoom(
result = append(result, types.EventStateKeyNID(senderUserNID)) result = append(result, types.EventStateKeyNID(senderUserNID))
eventIDs = append(eventIDs, eventID) eventIDs = append(eventIDs, eventID)
} }
return result, eventIDs, eventJSON, nil return result, eventIDs, eventJSON, rows.Err()
} }

View file

@ -250,6 +250,7 @@ func (s *membershipStatements) SelectMembershipsFromRoom(
} }
eventNIDs = append(eventNIDs, eNID) eventNIDs = append(eventNIDs, eNID)
} }
err = rows.Err()
return return
} }
@ -277,6 +278,7 @@ func (s *membershipStatements) SelectMembershipsFromRoomAndMembership(
} }
eventNIDs = append(eventNIDs, eNID) eventNIDs = append(eventNIDs, eNID)
} }
err = rows.Err()
return return
} }
@ -313,7 +315,7 @@ func (s *membershipStatements) SelectRoomsWithMembership(
} }
roomNIDs = append(roomNIDs, roomNID) roomNIDs = append(roomNIDs, roomNID)
} }
return roomNIDs, nil return roomNIDs, rows.Err()
} }
func (s *membershipStatements) SelectJoinedUsersSetForRooms(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID, localOnly bool) (map[types.EventStateKeyNID]int, error) { func (s *membershipStatements) SelectJoinedUsersSetForRooms(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID, localOnly bool) (map[types.EventStateKeyNID]int, error) {

View file

@ -121,7 +121,7 @@ func (s *roomAliasesStatements) SelectAliasesFromRoomID(
aliases = append(aliases, alias) aliases = append(aliases, alias)
} }
err = rows.Err()
return return
} }

View file

@ -128,7 +128,7 @@ func (s *roomStatements) SelectRoomIDsWithEvents(ctx context.Context, txn *sql.T
} }
roomIDs = append(roomIDs, roomID) roomIDs = append(roomIDs, roomID)
} }
return roomIDs, nil return roomIDs, rows.Err()
} }
func (s *roomStatements) SelectRoomInfo(ctx context.Context, txn *sql.Tx, roomID string) (*types.RoomInfo, error) { func (s *roomStatements) SelectRoomInfo(ctx context.Context, txn *sql.Tx, roomID string) (*types.RoomInfo, error) {
@ -265,7 +265,7 @@ func (s *roomStatements) SelectRoomVersionsForRoomNIDs(
} }
result[roomNID] = roomVersion result[roomNID] = roomVersion
} }
return result, nil return result, rows.Err()
} }
func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID) ([]string, error) { func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID) ([]string, error) {
@ -293,7 +293,7 @@ func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, txn *sql.Tx, roo
} }
roomIDs = append(roomIDs, roomID) roomIDs = append(roomIDs, roomID)
} }
return roomIDs, nil return roomIDs, rows.Err()
} }
func (s *roomStatements) BulkSelectRoomNIDs(ctx context.Context, txn *sql.Tx, roomIDs []string) ([]types.RoomNID, error) { func (s *roomStatements) BulkSelectRoomNIDs(ctx context.Context, txn *sql.Tx, roomIDs []string) ([]types.RoomNID, error) {
@ -321,5 +321,5 @@ func (s *roomStatements) BulkSelectRoomNIDs(ctx context.Context, txn *sql.Tx, ro
} }
roomNIDs = append(roomNIDs, roomNID) roomNIDs = append(roomNIDs, roomNID)
} }
return roomNIDs, nil return roomNIDs, rows.Err()
} }

View file

@ -133,13 +133,16 @@ func (s *stateSnapshotStatements) BulkSelectStateBlockNIDs(
var stateBlockNIDsJSON string var stateBlockNIDsJSON string
for ; rows.Next(); i++ { for ; rows.Next(); i++ {
result := &results[i] result := &results[i]
if err := rows.Scan(&result.StateSnapshotNID, &stateBlockNIDsJSON); err != nil { if err = rows.Scan(&result.StateSnapshotNID, &stateBlockNIDsJSON); err != nil {
return nil, err return nil, err
} }
if err := json.Unmarshal([]byte(stateBlockNIDsJSON), &result.StateBlockNIDs); err != nil { if err = json.Unmarshal([]byte(stateBlockNIDsJSON), &result.StateBlockNIDs); err != nil {
return nil, err return nil, err
} }
} }
if err = rows.Err(); err != nil {
return nil, err
}
if i != len(stateNIDs) { if i != len(stateNIDs) {
return nil, types.MissingStateError(fmt.Sprintf("storage: state NIDs missing from the database (%d != %d)", i, len(stateNIDs))) return nil, types.MissingStateError(fmt.Sprintf("storage: state NIDs missing from the database (%d != %d)", i, len(stateNIDs)))
} }

View file

@ -177,6 +177,7 @@ func (s *userRoomKeysStatements) SelectAllPublicKeysForUser(ctx context.Context,
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
return nil, nil return nil, nil
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectAllPublicKeysForUser: failed to close rows")
resultMap := make(map[types.RoomNID]ed25519.PublicKey) resultMap := make(map[types.RoomNID]ed25519.PublicKey)
@ -188,5 +189,5 @@ func (s *userRoomKeysStatements) SelectAllPublicKeysForUser(ctx context.Context,
} }
resultMap[roomNID] = pubkey resultMap[roomNID] = pubkey
} }
return resultMap, err return resultMap, rows.Err()
} }

View file

@ -301,7 +301,7 @@ func (p *DB) ChildrenForParent(ctx context.Context, eventID, relType string, rec
} }
children = append(children, evInfo) children = append(children, evInfo)
} }
return children, nil return children, rows.Err()
} }
func (p *DB) ParentForChild(ctx context.Context, eventID, relType string) (*eventInfo, error) { func (p *DB) ParentForChild(ctx context.Context, eventID, relType string) (*eventInfo, error) {

View file

@ -44,7 +44,7 @@ func GetEvent(
rsAPI api.SyncRoomserverAPI, rsAPI api.SyncRoomserverAPI,
) util.JSONResponse { ) util.JSONResponse {
ctx := req.Context() ctx := req.Context()
db, err := syncDB.NewDatabaseTransaction(ctx) db, err := syncDB.NewDatabaseSnapshot(ctx)
logger := util.GetLogger(ctx).WithFields(logrus.Fields{ logger := util.GetLogger(ctx).WithFields(logrus.Fields{
"event_id": eventID, "event_id": eventID,
"room_id": rawRoomID, "room_id": rawRoomID,
@ -56,6 +56,7 @@ func GetEvent(
JSON: spec.InternalServerError{}, JSON: spec.InternalServerError{},
} }
} }
defer db.Rollback() // nolint: errcheck
roomID, err := spec.NewRoomID(rawRoomID) roomID, err := spec.NewRoomID(rawRoomID)
if err != nil { if err != nil {

View file

@ -392,7 +392,7 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
}) })
} }
return events, nil return events, rows.Err()
} }
func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) { func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) {

View file

@ -19,6 +19,7 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
rstypes "github.com/matrix-org/dendrite/roomserver/types" rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
@ -160,6 +161,7 @@ func (s *membershipsStatements) SelectMemberships(
if err != nil { if err != nil {
return return
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectMemberships: failed to close rows")
var ( var (
eventID string eventID string
) )

View file

@ -164,7 +164,7 @@ func (s *peekStatements) SelectPeekingDevices(
devices = append(devices, types.PeekingDevice{UserID: userID, DeviceID: deviceID}) devices = append(devices, types.PeekingDevice{UserID: userID, DeviceID: deviceID})
result[roomID] = devices result[roomID] = devices
} }
return result, nil return result, rows.Err()
} }
func (s *peekStatements) SelectMaxPeekID( func (s *peekStatements) SelectMaxPeekID(

View file

@ -144,7 +144,7 @@ func (p *presenceStatements) GetPresenceForUsers(
presence.ClientFields.Presence = presence.Presence.String() presence.ClientFields.Presence = presence.Presence.String()
result = append(result, presence) result = append(result, presence)
} }
return result, err return result, rows.Err()
} }
func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {

View file

@ -177,7 +177,7 @@ func (s *currentRoomStateStatements) SelectJoinedUsers(
users = append(users, userID) users = append(users, userID)
result[roomID] = users result[roomID] = users
} }
return result, nil return result, rows.Err()
} }
// SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room. // SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room.
@ -236,7 +236,7 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
} }
result = append(result, roomID) result = append(result, roomID)
} }
return result, nil return result, rows.Err()
} }
// SelectRoomIDsWithAnyMembership returns a map of all memberships for the given user. // SelectRoomIDsWithAnyMembership returns a map of all memberships for the given user.
@ -419,7 +419,7 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
}) })
} }
return events, nil return events, rows.Err()
} }
func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) { func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) {

View file

@ -176,7 +176,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
if lastPos == 0 { if lastPos == 0 {
lastPos = r.To lastPos = r.To
} }
return result, retired, lastPos, nil return result, retired, lastPos, rows.Err()
} }
func (s *inviteEventsStatements) SelectMaxInviteID( func (s *inviteEventsStatements) SelectMaxInviteID(

View file

@ -19,6 +19,7 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
rstypes "github.com/matrix-org/dendrite/roomserver/types" rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
@ -163,6 +164,7 @@ func (s *membershipsStatements) SelectMemberships(
if err != nil { if err != nil {
return return
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectMemberships: failed to close rows")
var eventID string var eventID string
for rows.Next() { for rows.Next() {
if err = rows.Scan(&eventID); err != nil { if err = rows.Scan(&eventID); err != nil {

View file

@ -274,7 +274,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
} }
} }
return stateNeeded, eventIDToEvent, nil return stateNeeded, eventIDToEvent, rows.Err()
} }
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied, // MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
@ -520,7 +520,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
ExcludeFromSync: excludeFromSync, ExcludeFromSync: excludeFromSync,
}) })
} }
return result, nil return result, rows.Err()
} }
func (s *outputRoomEventsStatements) SelectContextEvent( func (s *outputRoomEventsStatements) SelectContextEvent(
ctx context.Context, txn *sql.Tx, roomID, eventID string, ctx context.Context, txn *sql.Tx, roomID, eventID string,

View file

@ -18,6 +18,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
rstypes "github.com/matrix-org/dendrite/roomserver/types" rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
@ -137,6 +138,7 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
} else if err != nil { } else if err != nil {
return return
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectEventIDsInRange: failed to close rows")
// Return the IDs. // Return the IDs.
var eventID string var eventID string
@ -155,7 +157,7 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
start = tokens[0] start = tokens[0]
end = tokens[len(tokens)-1] end = tokens[len(tokens)-1]
} }
err = rows.Err()
return return
} }

View file

@ -184,7 +184,7 @@ func (s *peekStatements) SelectPeekingDevices(
devices = append(devices, types.PeekingDevice{UserID: userID, DeviceID: deviceID}) devices = append(devices, types.PeekingDevice{UserID: userID, DeviceID: deviceID})
result[roomID] = devices result[roomID] = devices
} }
return result, nil return result, rows.Err()
} }
func (s *peekStatements) SelectMaxPeekID( func (s *peekStatements) SelectMaxPeekID(

View file

@ -169,7 +169,7 @@ func (p *presenceStatements) GetPresenceForUsers(
presence.ClientFields.Presence = presence.Presence.String() presence.ClientFields.Presence = presence.Presence.String()
result = append(result, presence) result = append(result, presence)
} }
return result, err return result, rows.Err()
} }
func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {

View file

@ -25,6 +25,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/matrix-org/dendrite/federationapi/statistics"
rsapi "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib/fclient" "github.com/matrix-org/gomatrixserverlib/fclient"
"github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/gomatrixserverlib/spec"
@ -108,6 +109,8 @@ type DeviceListUpdater struct {
userIDToChan map[string]chan bool userIDToChan map[string]chan bool
userIDToChanMu *sync.Mutex userIDToChanMu *sync.Mutex
rsAPI rsapi.KeyserverRoomserverAPI rsAPI rsapi.KeyserverRoomserverAPI
isBlacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error)
} }
// DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater. // DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater.
@ -167,8 +170,10 @@ func NewDeviceListUpdater(
process *process.ProcessContext, db DeviceListUpdaterDatabase, process *process.ProcessContext, db DeviceListUpdaterDatabase,
api DeviceListUpdaterAPI, producer KeyChangeProducer, api DeviceListUpdaterAPI, producer KeyChangeProducer,
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int, fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
rsAPI rsapi.KeyserverRoomserverAPI, thisServer spec.ServerName, rsAPI rsapi.KeyserverRoomserverAPI,
thisServer spec.ServerName,
enableMetrics bool, enableMetrics bool,
isBlacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error),
) *DeviceListUpdater { ) *DeviceListUpdater {
if enableMetrics { if enableMetrics {
prometheus.MustRegister(deviceListUpdaterBackpressure, deviceListUpdaterServersRetrying) prometheus.MustRegister(deviceListUpdaterBackpressure, deviceListUpdaterServersRetrying)
@ -186,6 +191,7 @@ func NewDeviceListUpdater(
userIDToChan: make(map[string]chan bool), userIDToChan: make(map[string]chan bool),
userIDToChanMu: &sync.Mutex{}, userIDToChanMu: &sync.Mutex{},
rsAPI: rsAPI, rsAPI: rsAPI,
isBlacklistedOrBackingOffFn: isBlacklistedOrBackingOffFn,
} }
} }
@ -362,13 +368,22 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
if err != nil { if err != nil {
return return
} }
_, err = u.isBlacklistedOrBackingOffFn(remoteServer)
var federationClientError *fedsenderapi.FederationClientError
if errors.As(err, &federationClientError) {
if federationClientError.Blacklisted {
return
}
}
hash := fnv.New32a() hash := fnv.New32a()
_, _ = hash.Write([]byte(remoteServer)) _, _ = hash.Write([]byte(remoteServer))
index := int(int64(hash.Sum32()) % int64(len(u.workerChans))) index := int(int64(hash.Sum32()) % int64(len(u.workerChans)))
ch := u.assignChannel(userID) ch := u.assignChannel(userID)
// Since workerChans are buffered, we only increment here and let the worker
// decrement it once it is done processing.
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc() deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc()
defer deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Dec()
u.workerChans[index] <- remoteServer u.workerChans[index] <- remoteServer
select { select {
case <-ch: case <-ch:
@ -405,24 +420,38 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
go func() { go func() {
var serversToRetry []spec.ServerName var serversToRetry []spec.ServerName
for { for {
serversToRetry = serversToRetry[:0] // reuse memory // nuke serversToRetry by re-slicing it to be "empty".
time.Sleep(time.Second) // The capacity of the slice is unchanged, which ensures we can reuse the memory.
serversToRetry = serversToRetry[:0]
deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
time.Sleep(time.Second * 2)
// -2, so we have space for incoming device list updates over federation
maxServers := (cap(ch) - len(ch)) - 2
if maxServers <= 0 {
continue
}
retriesMu.Lock() retriesMu.Lock()
now := time.Now() now := time.Now()
for srv, retryAt := range retries { for srv, retryAt := range retries {
if now.After(retryAt) { if now.After(retryAt) {
serversToRetry = append(serversToRetry, srv) serversToRetry = append(serversToRetry, srv)
if maxServers == len(serversToRetry) {
break
} }
} }
}
for _, srv := range serversToRetry { for _, srv := range serversToRetry {
delete(retries, srv) delete(retries, srv)
} }
deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
retriesMu.Unlock() retriesMu.Unlock()
for _, srv := range serversToRetry { for _, srv := range serversToRetry {
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc() deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc()
ch <- srv ch <- srv
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
} }
} }
}() }()
@ -430,8 +459,18 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
retriesMu.Lock() retriesMu.Lock()
_, exists := retries[serverName] _, exists := retries[serverName]
retriesMu.Unlock() retriesMu.Unlock()
if exists {
// Don't retry a server that we're already waiting for. // If the serverName is coming from retries, maybe it was
// blacklisted in the meantime.
_, err := u.isBlacklistedOrBackingOffFn(serverName)
var federationClientError *fedsenderapi.FederationClientError
// unwrap errors and check for FederationClientError, if found, federationClientError will be not nil
errors.As(err, &federationClientError)
isBlacklisted := federationClientError != nil && federationClientError.Blacklisted
// Don't retry a server that we're already waiting for or is blacklisted by now.
if exists || isBlacklisted {
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
continue continue
} }
waitTime, shouldRetry := u.processServer(serverName) waitTime, shouldRetry := u.processServer(serverName)
@ -442,6 +481,7 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
} }
retriesMu.Unlock() retriesMu.Unlock()
} }
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
} }
} }

View file

@ -27,6 +27,8 @@ import (
"testing" "testing"
"time" "time"
api2 "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -129,6 +131,10 @@ type mockDeviceListUpdaterAPI struct {
func (d *mockDeviceListUpdaterAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) { func (d *mockDeviceListUpdaterAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) {
} }
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
return &statistics.ServerStatistics{}, nil
}
type roundTripper struct { type roundTripper struct {
fn func(*http.Request) (*http.Response, error) fn func(*http.Request) (*http.Response, error)
} }
@ -162,7 +168,7 @@ func TestUpdateHavePrevID(t *testing.T) {
} }
ap := &mockDeviceListUpdaterAPI{} ap := &mockDeviceListUpdaterAPI{}
producer := &mockKeyChangeProducer{} producer := &mockKeyChangeProducer{}
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics) updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
event := gomatrixserverlib.DeviceListUpdateEvent{ event := gomatrixserverlib.DeviceListUpdateEvent{
DeviceDisplayName: "Foo Bar", DeviceDisplayName: "Foo Bar",
Deleted: false, Deleted: false,
@ -234,7 +240,7 @@ func TestUpdateNoPrevID(t *testing.T) {
`)), `)),
}, nil }, nil
}) })
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics) updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
if err := updater.Start(); err != nil { if err := updater.Start(); err != nil {
t.Fatalf("failed to start updater: %s", err) t.Fatalf("failed to start updater: %s", err)
} }
@ -304,7 +310,7 @@ func TestDebounce(t *testing.T) {
close(incomingFedReq) close(incomingFedReq)
return <-fedCh, nil return <-fedCh, nil
}) })
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics) updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
if err := updater.Start(); err != nil { if err := updater.Start(); err != nil {
t.Fatalf("failed to start updater: %s", err) t.Fatalf("failed to start updater: %s", err)
} }
@ -407,7 +413,7 @@ func TestDeviceListUpdater_CleanUp(t *testing.T) {
updater := NewDeviceListUpdater(processCtx, db, nil, updater := NewDeviceListUpdater(processCtx, db, nil,
nil, nil, nil, nil,
0, rsAPI, "test", caching.DisableMetrics) 0, rsAPI, "test", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
if err := updater.CleanUp(); err != nil { if err := updater.CleanUp(); err != nil {
t.Error(err) t.Error(err)
} }
@ -475,3 +481,68 @@ func Test_dedupeStateList(t *testing.T) {
}) })
} }
} }
func TestDeviceListUpdaterIgnoreBlacklisted(t *testing.T) {
unreachableServer := spec.ServerName("notlocalhost")
updater := DeviceListUpdater{
workerChans: make([]chan spec.ServerName, 1),
isBlacklistedOrBackingOffFn: func(s spec.ServerName) (*statistics.ServerStatistics, error) {
switch s {
case unreachableServer:
return nil, &api2.FederationClientError{Blacklisted: true}
}
return nil, nil
},
mu: &sync.Mutex{},
userIDToChanMu: &sync.Mutex{},
userIDToChan: make(map[string]chan bool),
userIDToMutex: make(map[string]*sync.Mutex),
}
workerCh := make(chan spec.ServerName)
defer close(workerCh)
updater.workerChans[0] = workerCh
// happy case
alice := "@alice:localhost"
aliceCh := updater.assignChannel(alice)
defer updater.clearChannel(alice)
// failing case
bob := "@bob:" + unreachableServer
bobCh := updater.assignChannel(string(bob))
defer updater.clearChannel(string(bob))
expectedServers := map[spec.ServerName]struct{}{
"localhost": {},
}
unexpectedServers := make(map[spec.ServerName]struct{})
go func() {
for serverName := range workerCh {
switch serverName {
case "localhost":
delete(expectedServers, serverName)
aliceCh <- true // unblock notifyWorkers
case unreachableServer: // this should not happen as it is "filtered" away by the blacklist
unexpectedServers[serverName] = struct{}{}
bobCh <- true
default:
unexpectedServers[serverName] = struct{}{}
}
}
}()
// alice is not blacklisted
updater.notifyWorkers(alice)
// bob is blacklisted
updater.notifyWorkers(string(bob))
for server := range expectedServers {
t.Errorf("Server still in expectedServers map: %s", server)
}
for server := range unexpectedServers {
t.Errorf("unexpected server in result: %s", server)
}
}

View file

@ -77,7 +77,7 @@ func (s *crossSigningKeysStatements) SelectCrossSigningKeysForUser(
for rows.Next() { for rows.Next() {
var keyTypeInt int16 var keyTypeInt int16
var keyData spec.Base64Bytes var keyData spec.Base64Bytes
if err := rows.Scan(&keyTypeInt, &keyData); err != nil { if err = rows.Scan(&keyTypeInt, &keyData); err != nil {
return nil, err return nil, err
} }
keyType, ok := types.KeyTypeIntToPurpose[keyTypeInt] keyType, ok := types.KeyTypeIntToPurpose[keyTypeInt]
@ -86,6 +86,7 @@ func (s *crossSigningKeysStatements) SelectCrossSigningKeysForUser(
} }
r[keyType] = keyData r[keyType] = keyData
} }
err = rows.Err()
return return
} }

View file

@ -98,7 +98,7 @@ func (s *crossSigningSigsStatements) SelectCrossSigningSigsForTarget(
var userID string var userID string
var keyID gomatrixserverlib.KeyID var keyID gomatrixserverlib.KeyID
var signature spec.Base64Bytes var signature spec.Base64Bytes
if err := rows.Scan(&userID, &keyID, &signature); err != nil { if err = rows.Scan(&userID, &keyID, &signature); err != nil {
return nil, err return nil, err
} }
if _, ok := r[userID]; !ok { if _, ok := r[userID]; !ok {
@ -106,6 +106,7 @@ func (s *crossSigningSigsStatements) SelectCrossSigningSigsForTarget(
} }
r[userID][keyID] = signature r[userID][keyID] = signature
} }
err = rows.Err()
return return
} }

View file

@ -162,5 +162,5 @@ func unpackKeys(ctx context.Context, rows *sql.Rows) (map[string]map[string]api.
roomData[key.SessionID] = key.KeyBackupSession roomData[key.SessionID] = key.KeyBackupSession
result[key.RoomID] = roomData result[key.RoomID] = roomData
} }
return result, nil return result, rows.Err()
} }

View file

@ -115,7 +115,7 @@ func (s *keyChangesStatements) SelectKeyChanges(
for rows.Next() { for rows.Next() {
var userID string var userID string
var offset int64 var offset int64
if err := rows.Scan(&userID, &offset); err != nil { if err = rows.Scan(&userID, &offset); err != nil {
return nil, 0, err return nil, 0, err
} }
if offset > latestOffset { if offset > latestOffset {
@ -123,5 +123,6 @@ func (s *keyChangesStatements) SelectKeyChanges(
} }
userIDs = append(userIDs, userID) userIDs = append(userIDs, userID)
} }
err = rows.Err()
return return
} }

View file

@ -134,7 +134,7 @@ func (s *oneTimeKeysStatements) CountOneTimeKeys(ctx context.Context, userID, de
} }
counts.KeyCount[algorithm] = count counts.KeyCount[algorithm] = count
} }
return counts, nil return counts, rows.Err()
} }
func (s *oneTimeKeysStatements) InsertOneTimeKeys(ctx context.Context, txn *sql.Tx, keys api.OneTimeKeys) (*api.OneTimeKeysCount, error) { func (s *oneTimeKeysStatements) InsertOneTimeKeys(ctx context.Context, txn *sql.Tx, keys api.OneTimeKeys) (*api.OneTimeKeysCount, error) {

View file

@ -165,5 +165,5 @@ func (s *profilesStatements) SelectProfilesBySearch(
profiles = append(profiles, profile) profiles = append(profiles, profile)
} }
} }
return profiles, nil return profiles, rows.Err()
} }

View file

@ -18,6 +18,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/storage/tables" "github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/gomatrixserverlib/spec"
@ -94,6 +95,7 @@ func (s *threepidStatements) SelectThreePIDsForLocalpart(
if err != nil { if err != nil {
return return
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectThreePIDsForLocalpart: failed to close rows")
threepids = []authtypes.ThreePID{} threepids = []authtypes.ThreePID{}
for rows.Next() { for rows.Next() {
@ -107,7 +109,7 @@ func (s *threepidStatements) SelectThreePIDsForLocalpart(
Medium: medium, Medium: medium,
}) })
} }
err = rows.Err()
return return
} }

View file

@ -19,6 +19,7 @@ import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/storage/tables" "github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/gomatrixserverlib/spec"
@ -95,6 +96,7 @@ func (s *accountDataStatements) SelectAccountData(
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectAccountData: failed to close rows")
global := map[string]json.RawMessage{} global := map[string]json.RawMessage{}
rooms := map[string]map[string]json.RawMessage{} rooms := map[string]map[string]json.RawMessage{}
@ -118,7 +120,7 @@ func (s *accountDataStatements) SelectAccountData(
} }
} }
return global, rooms, nil return global, rooms, rows.Err()
} }
func (s *accountDataStatements) SelectAccountDataByType( func (s *accountDataStatements) SelectAccountDataByType(

View file

@ -76,7 +76,7 @@ func (s *crossSigningKeysStatements) SelectCrossSigningKeysForUser(
for rows.Next() { for rows.Next() {
var keyTypeInt int16 var keyTypeInt int16
var keyData spec.Base64Bytes var keyData spec.Base64Bytes
if err := rows.Scan(&keyTypeInt, &keyData); err != nil { if err = rows.Scan(&keyTypeInt, &keyData); err != nil {
return nil, err return nil, err
} }
keyType, ok := types.KeyTypeIntToPurpose[keyTypeInt] keyType, ok := types.KeyTypeIntToPurpose[keyTypeInt]
@ -85,6 +85,7 @@ func (s *crossSigningKeysStatements) SelectCrossSigningKeysForUser(
} }
r[keyType] = keyData r[keyType] = keyData
} }
err = rows.Err()
return return
} }

View file

@ -96,7 +96,7 @@ func (s *crossSigningSigsStatements) SelectCrossSigningSigsForTarget(
var userID string var userID string
var keyID gomatrixserverlib.KeyID var keyID gomatrixserverlib.KeyID
var signature spec.Base64Bytes var signature spec.Base64Bytes
if err := rows.Scan(&userID, &keyID, &signature); err != nil { if err = rows.Scan(&userID, &keyID, &signature); err != nil {
return nil, err return nil, err
} }
if _, ok := r[userID]; !ok { if _, ok := r[userID]; !ok {
@ -104,6 +104,7 @@ func (s *crossSigningSigsStatements) SelectCrossSigningSigsForTarget(
} }
r[userID][keyID] = signature r[userID][keyID] = signature
} }
err = rows.Err()
return return
} }

View file

@ -296,6 +296,7 @@ func (s *devicesStatements) SelectDevicesByLocalpart(
if err != nil { if err != nil {
return devices, err return devices, err
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectDevicesByLocalpart: failed to close rows")
var dev api.Device var dev api.Device
var lastseents sql.NullInt64 var lastseents sql.NullInt64
@ -325,7 +326,7 @@ func (s *devicesStatements) SelectDevicesByLocalpart(
devices = append(devices, dev) devices = append(devices, dev)
} }
return devices, nil return devices, rows.Err()
} }
func (s *devicesStatements) SelectDevicesByID(ctx context.Context, deviceIDs []string) ([]api.Device, error) { func (s *devicesStatements) SelectDevicesByID(ctx context.Context, deviceIDs []string) ([]api.Device, error) {

View file

@ -162,5 +162,5 @@ func unpackKeys(ctx context.Context, rows *sql.Rows) (map[string]map[string]api.
roomData[key.SessionID] = key.KeyBackupSession roomData[key.SessionID] = key.KeyBackupSession
result[key.RoomID] = roomData result[key.RoomID] = roomData
} }
return result, nil return result, rows.Err()
} }

View file

@ -113,7 +113,7 @@ func (s *keyChangesStatements) SelectKeyChanges(
for rows.Next() { for rows.Next() {
var userID string var userID string
var offset int64 var offset int64
if err := rows.Scan(&userID, &offset); err != nil { if err = rows.Scan(&userID, &offset); err != nil {
return nil, 0, err return nil, 0, err
} }
if offset > latestOffset { if offset > latestOffset {
@ -121,5 +121,6 @@ func (s *keyChangesStatements) SelectKeyChanges(
} }
userIDs = append(userIDs, userID) userIDs = append(userIDs, userID)
} }
err = rows.Err()
return return
} }

View file

@ -140,7 +140,7 @@ func (s *oneTimeKeysStatements) CountOneTimeKeys(ctx context.Context, userID, de
} }
counts.KeyCount[algorithm] = count counts.KeyCount[algorithm] = count
} }
return counts, nil return counts, rows.Err()
} }
func (s *oneTimeKeysStatements) InsertOneTimeKeys( func (s *oneTimeKeysStatements) InsertOneTimeKeys(

View file

@ -173,5 +173,5 @@ func (s *profilesStatements) SelectProfilesBySearch(
profiles = append(profiles, profile) profiles = append(profiles, profile)
} }
} }
return profiles, nil return profiles, rows.Err()
} }

View file

@ -18,10 +18,12 @@ import (
"time" "time"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
rsapi "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api"
@ -47,6 +49,7 @@ func NewInternalAPI(
rsAPI rsapi.UserRoomserverAPI, rsAPI rsapi.UserRoomserverAPI,
fedClient fedsenderapi.KeyserverFederationAPI, fedClient fedsenderapi.KeyserverFederationAPI,
enableMetrics bool, enableMetrics bool,
blacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error),
) *internal.UserInternalAPI { ) *internal.UserInternalAPI {
js, _ := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream) js, _ := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream)
appServices := dendriteCfg.Derived.ApplicationServices appServices := dendriteCfg.Derived.ApplicationServices
@ -100,7 +103,7 @@ func NewInternalAPI(
FedClient: fedClient, FedClient: fedClient,
} }
updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics) updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics, blacklistedOrBackingOffFn)
userAPI.Updater = updater userAPI.Updater = updater
// Remove users which we don't share a room with anymore // Remove users which we don't share a room with anymore
if err := updater.CleanUp(); err != nil { if err := updater.CleanUp(); err != nil {