Merge branch 'main' into neilalexander/jassuko

This commit is contained in:
Neil Alexander 2022-09-22 14:57:40 +01:00
commit 1055206194
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
13 changed files with 129 additions and 45 deletions

View file

@ -1,5 +1,26 @@
# Changelog # Changelog
## Dendrite 0.9.9 (2022-09-22)
### Features
* Dendrite will now try to keep HTTP connections open to remote federated servers for a few minutes after a request and attempt to reuse those connections where possible
* This should reduce the amount of time spent on TLS handshakes and often speed up requests to remote servers
* This new behaviour can be disabled with the `federation_api.disable_http_keepalives` option if needed
* A number of dependencies have been updated
### Fixes
* A bug where the roomserver did not correctly propagate rewritten room state to downstream components (like the federation API and sync API) has been fixed, which could cause issues when performing a federated join to a previously left room
* Event auth now correctly parses the `join_authorised_via_users_server` field in the membership event content
* Database migrations should no longer produce unique constraint errors at Dendrite startup
* The `origin` of device list updates should now be populated correctly
* Send-to-device messages will no longer be dropped if we fail to publish them to specific devices
* The roomserver query to find state after events will now always resolve state if there are multiple prev events
* The roomserver will now return no memberships if querying history visibility for an event which has no state snapshot
* The device list updater will now mark a device list as stale if a requesting device ID is not known
* Transactions sent to appservices should no longer have accidental duplicated transaction IDs (contributed by [tak-hntlabs](https://github.com/tak-hntlabs))
## Dendrite 0.9.8 (2022-09-12) ## Dendrite 0.9.8 (2022-09-12)
### Important ### Important

View file

@ -212,6 +212,13 @@ federation_api:
# enable this option in production as it presents a security risk! # enable this option in production as it presents a security risk!
disable_tls_validation: false disable_tls_validation: false
# Disable HTTP keepalives, which also prevents connection reuse. Dendrite will typically
# keep HTTP connections open to remote hosts for 5 minutes as they can be reused much
# more quickly than opening new connections each time. Disabling keepalives will close
# HTTP connections immediately after a successful request but may result in more CPU and
# memory being used on TLS handshakes for each new connection instead.
disable_http_keepalives: false
# Perspective keyservers to use as a backup when direct key fetches fail. This may # Perspective keyservers to use as a backup when direct key fetches fail. This may
# be required to satisfy key requests for servers that are no longer online when # be required to satisfy key requests for servers that are no longer online when
# joining some rooms. # joining some rooms.

View file

@ -219,6 +219,13 @@ federation_api:
# enable this option in production as it presents a security risk! # enable this option in production as it presents a security risk!
disable_tls_validation: false disable_tls_validation: false
# Disable HTTP keepalives, which also prevents connection reuse. Dendrite will typically
# keep HTTP connections open to remote hosts for 5 minutes as they can be reused much
# more quickly than opening new connections each time. Disabling keepalives will close
# HTTP connections immediately after a successful request but may result in more CPU and
# memory being used on TLS handshakes for each new connection instead.
disable_http_keepalives: false
# Perspective keyservers to use as a backup when direct key fetches fail. This may # Perspective keyservers to use as a backup when direct key fetches fail. This may
# be required to satisfy key requests for servers that are no longer online when # be required to satisfy key requests for servers that are no longer online when
# joining some rooms. # joining some rooms.

View file

@ -14,7 +14,7 @@ GEM
execjs execjs
coffee-script-source (1.11.1) coffee-script-source (1.11.1)
colorator (1.1.0) colorator (1.1.0)
commonmarker (0.23.4) commonmarker (0.23.6)
concurrent-ruby (1.1.10) concurrent-ruby (1.1.10)
dnsruby (1.61.9) dnsruby (1.61.9)
simpleidn (~> 0.1) simpleidn (~> 0.1)

View file

@ -12,12 +12,13 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
) )
type server struct { type server struct {
@ -86,7 +87,12 @@ func TestMain(m *testing.M) {
cfg.Global.JetStream.StoragePath = config.Path(d) cfg.Global.JetStream.StoragePath = config.Path(d)
cfg.Global.KeyID = serverKeyID cfg.Global.KeyID = serverKeyID
cfg.Global.KeyValidityPeriod = s.validity cfg.Global.KeyValidityPeriod = s.validity
cfg.FederationAPI.Database.ConnectionString = config.DataSource("file::memory:") f, err := os.CreateTemp(d, "federation_keys_test*.db")
if err != nil {
return -1
}
defer f.Close()
cfg.FederationAPI.Database.ConnectionString = config.DataSource("file:" + f.Name())
s.config = &cfg.FederationAPI s.config = &cfg.FederationAPI
// Create a transport which redirects federation requests to // Create a transport which redirects federation requests to

View file

@ -10,6 +10,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/federationapi/internal"
@ -20,9 +24,6 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
) )
type fedRoomserverAPI struct { type fedRoomserverAPI struct {
@ -271,7 +272,6 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
cfg.Global.ServerName = gomatrixserverlib.ServerName("localhost") cfg.Global.ServerName = gomatrixserverlib.ServerName("localhost")
cfg.Global.PrivateKey = privKey cfg.Global.PrivateKey = privKey
cfg.Global.JetStream.InMemory = true cfg.Global.JetStream.InMemory = true
cfg.FederationAPI.Database.ConnectionString = config.DataSource("file::memory:")
base := base.NewBaseDendrite(cfg, "Monolith") base := base.NewBaseDendrite(cfg, "Monolith")
keyRing := &test.NopJSONVerifier{} keyRing := &test.NopJSONVerifier{}
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.

2
go.mod
View file

@ -26,7 +26,7 @@ require (
github.com/matrix-org/pinecone v0.0.0-20220915154206-df85cb5026fc github.com/matrix-org/pinecone v0.0.0-20220915154206-df85cb5026fc
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.15 github.com/mattn/go-sqlite3 v1.14.15
github.com/nats-io/nats-server/v2 v2.9.0 github.com/nats-io/nats-server/v2 v2.9.1-0.20220920152220-52d7b481c4b5
github.com/nats-io/nats.go v1.17.0 github.com/nats-io/nats.go v1.17.0
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646

4
go.sum
View file

@ -422,8 +422,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.9.0 h1:DLWu+7/VgGOoChcDKytnUZPAmudpv7o/MhKmNrnH1RE= github.com/nats-io/nats-server/v2 v2.9.1-0.20220920152220-52d7b481c4b5 h1:G/YGSXcJ2bUofD8Ts49it4VNezaJLQldI6fZR+wIUts=
github.com/nats-io/nats-server/v2 v2.9.0/go.mod h1:BWKY6217RvhI+FDoOLZ2BH+hOC37xeKRBlQ1Lz7teKI= github.com/nats-io/nats-server/v2 v2.9.1-0.20220920152220-52d7b481c4b5/go.mod h1:BWKY6217RvhI+FDoOLZ2BH+hOC37xeKRBlQ1Lz7teKI=
github.com/nats-io/nats.go v1.17.0 h1:1jp5BThsdGlN91hW0k3YEfJbfACjiOYtUiLXG0RL4IE= github.com/nats-io/nats.go v1.17.0 h1:1jp5BThsdGlN91hW0k3YEfJbfACjiOYtUiLXG0RL4IE=
github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=

View file

@ -49,12 +49,13 @@ type Migration struct {
Down func(ctx context.Context, txn *sql.Tx) error Down func(ctx context.Context, txn *sql.Tx) error
} }
// Migrator // Migrator contains fields required to run migrations.
type Migrator struct { type Migrator struct {
db *sql.DB db *sql.DB
migrations []Migration migrations []Migration
knownMigrations map[string]struct{} knownMigrations map[string]struct{}
mutex *sync.Mutex mutex *sync.Mutex
insertStmt *sql.Stmt
} }
// NewMigrator creates a new DB migrator. // NewMigrator creates a new DB migrator.
@ -82,35 +83,26 @@ func (m *Migrator) AddMigrations(migrations ...Migration) {
// Up executes all migrations in order they were added. // Up executes all migrations in order they were added.
func (m *Migrator) Up(ctx context.Context) error { func (m *Migrator) Up(ctx context.Context) error {
var (
err error
dendriteVersion = internal.VersionString()
)
// ensure there is a table for known migrations // ensure there is a table for known migrations
executedMigrations, err := m.ExecutedMigrations(ctx) executedMigrations, err := m.ExecutedMigrations(ctx)
if err != nil { if err != nil {
return fmt.Errorf("unable to create/get migrations: %w", err) return fmt.Errorf("unable to create/get migrations: %w", err)
} }
// ensure we close the insert statement, as it's not needed anymore
defer m.close()
return WithTransaction(m.db, func(txn *sql.Tx) error { return WithTransaction(m.db, func(txn *sql.Tx) error {
for i := range m.migrations { for i := range m.migrations {
now := time.Now().UTC().Format(time.RFC3339)
migration := m.migrations[i] migration := m.migrations[i]
// Skip migration if it was already executed // Skip migration if it was already executed
if _, ok := executedMigrations[migration.Version]; ok { if _, ok := executedMigrations[migration.Version]; ok {
continue continue
} }
logrus.Debugf("Executing database migration '%s'", migration.Version) logrus.Debugf("Executing database migration '%s'", migration.Version)
err = migration.Up(ctx, txn)
if err != nil { if err = migration.Up(ctx, txn); err != nil {
return fmt.Errorf("unable to execute migration '%s': %w", migration.Version, err) return fmt.Errorf("unable to execute migration '%s': %w", migration.Version, err)
} }
_, err = txn.ExecContext(ctx, insertVersionSQL, if err = m.insertMigration(ctx, txn, migration.Version); err != nil {
migration.Version,
now,
dendriteVersion,
)
if err != nil {
return fmt.Errorf("unable to insert executed migrations: %w", err) return fmt.Errorf("unable to insert executed migrations: %w", err)
} }
} }
@ -118,6 +110,23 @@ func (m *Migrator) Up(ctx context.Context) error {
}) })
} }
func (m *Migrator) insertMigration(ctx context.Context, txn *sql.Tx, migrationName string) error {
if m.insertStmt == nil {
stmt, err := m.db.Prepare(insertVersionSQL)
if err != nil {
return fmt.Errorf("unable to prepare insert statement: %w", err)
}
m.insertStmt = stmt
}
stmt := TxStmtContext(ctx, txn, m.insertStmt)
_, err := stmt.ExecContext(ctx,
migrationName,
time.Now().Format(time.RFC3339),
internal.VersionString(),
)
return err
}
// ExecutedMigrations returns a map with already executed migrations in addition to creating the // ExecutedMigrations returns a map with already executed migrations in addition to creating the
// migrations table, if it doesn't exist. // migrations table, if it doesn't exist.
func (m *Migrator) ExecutedMigrations(ctx context.Context) (map[string]struct{}, error) { func (m *Migrator) ExecutedMigrations(ctx context.Context) (map[string]struct{}, error) {
@ -146,19 +155,20 @@ func (m *Migrator) ExecutedMigrations(ctx context.Context) (map[string]struct{},
// inserts a migration given their name to the database. // inserts a migration given their name to the database.
// This should only be used when manually inserting migrations. // This should only be used when manually inserting migrations.
func InsertMigration(ctx context.Context, db *sql.DB, migrationName string) error { func InsertMigration(ctx context.Context, db *sql.DB, migrationName string) error {
_, err := db.ExecContext(ctx, createDBMigrationsSQL) m := NewMigrator(db)
defer m.close()
existingMigrations, err := m.ExecutedMigrations(ctx)
if err != nil { if err != nil {
return fmt.Errorf("unable to create db_migrations: %w", err) return err
} }
_, err = db.ExecContext(ctx, insertVersionSQL, if _, ok := existingMigrations[migrationName]; ok {
migrationName,
time.Now().Format(time.RFC3339),
internal.VersionString(),
)
// If the migration was already executed, we'll get a unique constraint error,
// return nil instead, to avoid unnecessary logging.
if IsUniqueConstraintViolationErr(err) {
return nil return nil
} }
return err return m.insertMigration(ctx, nil, migrationName)
}
func (m *Migrator) close() {
if m.insertStmt != nil {
internal.CloseAndLogIfError(context.Background(), m.insertStmt, "unable to close insert statement")
}
} }

View file

@ -7,9 +7,10 @@ import (
"reflect" "reflect"
"testing" "testing"
_ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test"
_ "github.com/mattn/go-sqlite3"
) )
var dummyMigrations = []sqlutil.Migration{ var dummyMigrations = []sqlutil.Migration{
@ -81,11 +82,12 @@ func Test_migrations_Up(t *testing.T) {
} }
ctx := context.Background() ctx := context.Background()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
conStr, close := test.PrepareDBConnectionString(t, dbType) conStr, close := test.PrepareDBConnectionString(t, dbType)
defer close() defer close()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
driverName := "sqlite3" driverName := "sqlite3"
if dbType == test.DBTypePostgres { if dbType == test.DBTypePostgres {
driverName = "postgres" driverName = "postgres"
@ -107,6 +109,30 @@ func Test_migrations_Up(t *testing.T) {
t.Errorf("expected: %+v, got %v", tt.wantResult, result) t.Errorf("expected: %+v, got %v", tt.wantResult, result)
} }
}) })
})
} }
})
}
func Test_insertMigration(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
conStr, close := test.PrepareDBConnectionString(t, dbType)
defer close()
driverName := "sqlite3"
if dbType == test.DBTypePostgres {
driverName = "postgres"
}
db, err := sql.Open(driverName, conStr)
if err != nil {
t.Errorf("unable to open database: %v", err)
}
if err := sqlutil.InsertMigration(context.Background(), db, "testing"); err != nil {
t.Fatalf("unable to insert migration: %s", err)
}
// Second insert should not return an error, as it was already executed.
if err := sqlutil.InsertMigration(context.Background(), db, "testing"); err != nil {
t.Fatalf("unable to insert migration: %s", err)
}
})
} }

View file

@ -17,7 +17,7 @@ var build string
const ( const (
VersionMajor = 0 VersionMajor = 0
VersionMinor = 9 VersionMinor = 9
VersionPatch = 8 VersionPatch = 9
VersionTag = "" // example: "rc1" VersionTag = "" // example: "rc1"
) )

View file

@ -373,6 +373,7 @@ func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationCli
opts := []gomatrixserverlib.ClientOption{ opts := []gomatrixserverlib.ClientOption{
gomatrixserverlib.WithTimeout(time.Minute * 5), gomatrixserverlib.WithTimeout(time.Minute * 5),
gomatrixserverlib.WithSkipVerify(b.Cfg.FederationAPI.DisableTLSValidation), gomatrixserverlib.WithSkipVerify(b.Cfg.FederationAPI.DisableTLSValidation),
gomatrixserverlib.WithKeepAlives(!b.Cfg.FederationAPI.DisableHTTPKeepalives),
} }
if b.Cfg.Global.DNSCache.Enabled { if b.Cfg.Global.DNSCache.Enabled {
opts = append(opts, gomatrixserverlib.WithDNSCache(b.DNSCache)) opts = append(opts, gomatrixserverlib.WithDNSCache(b.DNSCache))

View file

@ -22,6 +22,11 @@ type FederationAPI struct {
// on remote federation endpoints. This is not recommended in production! // on remote federation endpoints. This is not recommended in production!
DisableTLSValidation bool `yaml:"disable_tls_validation"` DisableTLSValidation bool `yaml:"disable_tls_validation"`
// DisableHTTPKeepalives prevents Dendrite from keeping HTTP connections
// open for reuse for future requests. Connections will be closed quicker
// but we may spend more time on TLS handshakes instead.
DisableHTTPKeepalives bool `yaml:"disable_http_keepalives"`
// Perspective keyservers, to use as a backup when direct key fetch // Perspective keyservers, to use as a backup when direct key fetch
// requests don't succeed // requests don't succeed
KeyPerspectives KeyPerspectives `yaml:"key_perspectives"` KeyPerspectives KeyPerspectives `yaml:"key_perspectives"`
@ -39,6 +44,7 @@ func (c *FederationAPI) Defaults(opts DefaultOpts) {
} }
c.FederationMaxRetries = 16 c.FederationMaxRetries = 16
c.DisableTLSValidation = false c.DisableTLSValidation = false
c.DisableHTTPKeepalives = false
if opts.Generate { if opts.Generate {
c.KeyPerspectives = KeyPerspectives{ c.KeyPerspectives = KeyPerspectives{
{ {