mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-10 15:43:09 -06:00
Never expire m.direct_to_device
This commit is contained in:
parent
ae4a45f7e9
commit
5781716af7
|
|
@ -28,7 +28,7 @@ func UpAddexpiresat(ctx context.Context, tx *sql.Tx) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||||
}
|
}
|
||||||
_, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24)))
|
_, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1 WHERE edu_type != 'm.direct_to_device'", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to update queue_edus: %w", err)
|
return fmt.Errorf("failed to update queue_edus: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,10 +19,11 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/storage/postgres/deltas"
|
"github.com/matrix-org/dendrite/federationapi/storage/postgres/deltas"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const queueEDUsSchema = `
|
const queueEDUsSchema = `
|
||||||
|
|
@ -69,10 +70,10 @@ const selectQueueServerNamesSQL = "" +
|
||||||
"SELECT DISTINCT server_name FROM federationsender_queue_edus"
|
"SELECT DISTINCT server_name FROM federationsender_queue_edus"
|
||||||
|
|
||||||
const selectExpiredEDUsSQL = "" +
|
const selectExpiredEDUsSQL = "" +
|
||||||
"SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at IS NOT NULL AND expires_at <= $1"
|
"SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1"
|
||||||
|
|
||||||
const deleteExpiredEDUsSQL = "" +
|
const deleteExpiredEDUsSQL = "" +
|
||||||
"DELETE FROM federationsender_queue_edus WHERE expires_at IS NOT NULL AND expires_at <= $1"
|
"DELETE FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1"
|
||||||
|
|
||||||
type queueEDUsStatements struct {
|
type queueEDUsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
|
|
||||||
|
|
@ -53,6 +53,11 @@ func (d *Database) AssociateEDUWithDestination(
|
||||||
// Keep EDUs for at least x minutes before deleting them
|
// Keep EDUs for at least x minutes before deleting them
|
||||||
expiresAt = gomatrixserverlib.AsTimestamp(time.Now().Add(duration))
|
expiresAt = gomatrixserverlib.AsTimestamp(time.Now().Add(duration))
|
||||||
}
|
}
|
||||||
|
// We forcibly set m.direct_to_device events to 0, as we always want them
|
||||||
|
// to be delivered. (required for E2EE)
|
||||||
|
if eduType == gomatrixserverlib.MDirectToDevice {
|
||||||
|
expiresAt = 0
|
||||||
|
}
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
if err := d.FederationQueueEDUs.InsertQueueEDU(
|
if err := d.FederationQueueEDUs.InsertQueueEDU(
|
||||||
ctx, // context
|
ctx, // context
|
||||||
|
|
|
||||||
|
|
@ -52,7 +52,7 @@ INSERT
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to update queue_edus: %w", err)
|
return fmt.Errorf("failed to update queue_edus: %w", err)
|
||||||
}
|
}
|
||||||
_, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24)))
|
_, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1 WHERE edu_type != 'm.direct_to_device'", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to update queue_edus: %w", err)
|
return fmt.Errorf("failed to update queue_edus: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,10 +20,11 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/storage/sqlite3/deltas"
|
"github.com/matrix-org/dendrite/federationapi/storage/sqlite3/deltas"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const queueEDUsSchema = `
|
const queueEDUsSchema = `
|
||||||
|
|
@ -70,10 +71,10 @@ const selectQueueServerNamesSQL = "" +
|
||||||
"SELECT DISTINCT server_name FROM federationsender_queue_edus"
|
"SELECT DISTINCT server_name FROM federationsender_queue_edus"
|
||||||
|
|
||||||
const selectExpiredEDUsSQL = "" +
|
const selectExpiredEDUsSQL = "" +
|
||||||
"SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at IS NOT NULL AND expires_at <= $1"
|
"SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1"
|
||||||
|
|
||||||
const deleteExpiredEDUsSQL = "" +
|
const deleteExpiredEDUsSQL = "" +
|
||||||
"DELETE FROM federationsender_queue_edus WHERE expires_at IS NOT NULL AND expires_at <= $1"
|
"DELETE FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1"
|
||||||
|
|
||||||
type queueEDUsStatements struct {
|
type queueEDUsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
|
|
||||||
|
|
@ -5,11 +5,13 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/test"
|
"github.com/matrix-org/dendrite/test"
|
||||||
"github.com/matrix-org/dendrite/test/testrig"
|
"github.com/matrix-org/dendrite/test/testrig"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
|
func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
|
||||||
|
|
@ -29,7 +31,7 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Dat
|
||||||
|
|
||||||
func TestExpireEDUs(t *testing.T) {
|
func TestExpireEDUs(t *testing.T) {
|
||||||
var expireEDUTypes = map[string]time.Duration{
|
var expireEDUTypes = map[string]time.Duration{
|
||||||
"m.receipt": time.Millisecond,
|
gomatrixserverlib.MReceipt: time.Millisecond,
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
@ -41,13 +43,14 @@ func TestExpireEDUs(t *testing.T) {
|
||||||
receipt, err := db.StoreJSON(ctx, "{}")
|
receipt, err := db.StoreJSON(ctx, "{}")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, "m.receipt", expireEDUTypes)
|
err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, gomatrixserverlib.MReceipt, expireEDUTypes)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
// add data without expiry
|
// add data without expiry
|
||||||
receipt, err := db.StoreJSON(ctx, "{}")
|
receipt, err := db.StoreJSON(ctx, "{}")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// m.read_marker gets the default expiry of 24h, so won't be deleted further down in this test
|
||||||
err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, "m.read_marker", expireEDUTypes)
|
err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, "m.read_marker", expireEDUTypes)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
|
@ -59,5 +62,20 @@ func TestExpireEDUs(t *testing.T) {
|
||||||
data, err := db.GetPendingEDUs(ctx, "localhost", 100)
|
data, err := db.GetPendingEDUs(ctx, "localhost", 100)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, 1, len(data))
|
assert.Equal(t, 1, len(data))
|
||||||
|
|
||||||
|
// check that m.direct_to_device is never expired
|
||||||
|
receipt, err = db.StoreJSON(ctx, "{}")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, gomatrixserverlib.MDirectToDevice, expireEDUTypes)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.DeleteExpiredEDUs(ctx)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// We should get two EDUs, the m.read_marker and the m.direct_to_device
|
||||||
|
data, err = db.GetPendingEDUs(ctx, "localhost", 100)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 2, len(data))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue