From 5781716af709d83352c7d75ca559d9ae98e1171b Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Tue, 9 Aug 2022 09:27:28 +0200 Subject: [PATCH] Never expire m.direct_to_device --- .../deltas/2022042812473400_addexpiresat.go | 2 +- .../storage/postgres/queue_edus_table.go | 7 +++--- federationapi/storage/shared/storage_edus.go | 5 ++++ .../deltas/2022042812473400_addexpiresat.go | 2 +- .../storage/sqlite3/queue_edus_table.go | 7 +++--- federationapi/storage/storage_test.go | 24 ++++++++++++++++--- 6 files changed, 36 insertions(+), 11 deletions(-) diff --git a/federationapi/storage/postgres/deltas/2022042812473400_addexpiresat.go b/federationapi/storage/postgres/deltas/2022042812473400_addexpiresat.go index bf295bf63..53a7a025e 100644 --- a/federationapi/storage/postgres/deltas/2022042812473400_addexpiresat.go +++ b/federationapi/storage/postgres/deltas/2022042812473400_addexpiresat.go @@ -28,7 +28,7 @@ func UpAddexpiresat(ctx context.Context, tx *sql.Tx) error { if err != nil { return fmt.Errorf("failed to execute upgrade: %w", err) } - _, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24))) + _, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1 WHERE edu_type != 'm.direct_to_device'", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24))) if err != nil { return fmt.Errorf("failed to update queue_edus: %w", err) } diff --git a/federationapi/storage/postgres/queue_edus_table.go b/federationapi/storage/postgres/queue_edus_table.go index 23a764d02..d6507e13b 100644 --- a/federationapi/storage/postgres/queue_edus_table.go +++ b/federationapi/storage/postgres/queue_edus_table.go @@ -19,10 +19,11 @@ import ( "database/sql" "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/federationapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/gomatrixserverlib" ) const queueEDUsSchema = ` @@ -69,10 +70,10 @@ const selectQueueServerNamesSQL = "" + "SELECT DISTINCT server_name FROM federationsender_queue_edus" const selectExpiredEDUsSQL = "" + - "SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at IS NOT NULL AND expires_at <= $1" + "SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1" const deleteExpiredEDUsSQL = "" + - "DELETE FROM federationsender_queue_edus WHERE expires_at IS NOT NULL AND expires_at <= $1" + "DELETE FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1" type queueEDUsStatements struct { db *sql.DB diff --git a/federationapi/storage/shared/storage_edus.go b/federationapi/storage/shared/storage_edus.go index 6583bdfdc..b62e5d9c5 100644 --- a/federationapi/storage/shared/storage_edus.go +++ b/federationapi/storage/shared/storage_edus.go @@ -53,6 +53,11 @@ func (d *Database) AssociateEDUWithDestination( // Keep EDUs for at least x minutes before deleting them expiresAt = gomatrixserverlib.AsTimestamp(time.Now().Add(duration)) } + // We forcibly set m.direct_to_device events to 0, as we always want them + // to be delivered. (required for E2EE) + if eduType == gomatrixserverlib.MDirectToDevice { + expiresAt = 0 + } return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { if err := d.FederationQueueEDUs.InsertQueueEDU( ctx, // context diff --git a/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go b/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go index 40eb35715..c5030163b 100644 --- a/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go +++ b/federationapi/storage/sqlite3/deltas/2022042812473400_addexpiresat.go @@ -52,7 +52,7 @@ INSERT if err != nil { return fmt.Errorf("failed to update queue_edus: %w", err) } - _, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24))) + _, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1 WHERE edu_type != 'm.direct_to_device'", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24))) if err != nil { return fmt.Errorf("failed to update queue_edus: %w", err) } diff --git a/federationapi/storage/sqlite3/queue_edus_table.go b/federationapi/storage/sqlite3/queue_edus_table.go index be3027a59..8e7e7901f 100644 --- a/federationapi/storage/sqlite3/queue_edus_table.go +++ b/federationapi/storage/sqlite3/queue_edus_table.go @@ -20,10 +20,11 @@ import ( "fmt" "strings" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/federationapi/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/gomatrixserverlib" ) const queueEDUsSchema = ` @@ -70,10 +71,10 @@ const selectQueueServerNamesSQL = "" + "SELECT DISTINCT server_name FROM federationsender_queue_edus" const selectExpiredEDUsSQL = "" + - "SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at IS NOT NULL AND expires_at <= $1" + "SELECT DISTINCT json_nid FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1" const deleteExpiredEDUsSQL = "" + - "DELETE FROM federationsender_queue_edus WHERE expires_at IS NOT NULL AND expires_at <= $1" + "DELETE FROM federationsender_queue_edus WHERE expires_at > 0 AND expires_at <= $1" type queueEDUsStatements struct { db *sql.DB diff --git a/federationapi/storage/storage_test.go b/federationapi/storage/storage_test.go index 7b3235524..7eba2cbee 100644 --- a/federationapi/storage/storage_test.go +++ b/federationapi/storage/storage_test.go @@ -5,11 +5,13 @@ import ( "testing" "time" + "github.com/matrix-org/gomatrixserverlib" + "github.com/stretchr/testify/assert" + "github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" - "github.com/stretchr/testify/assert" ) func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { @@ -29,7 +31,7 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Dat func TestExpireEDUs(t *testing.T) { var expireEDUTypes = map[string]time.Duration{ - "m.receipt": time.Millisecond, + gomatrixserverlib.MReceipt: time.Millisecond, } ctx := context.Background() @@ -41,13 +43,14 @@ func TestExpireEDUs(t *testing.T) { receipt, err := db.StoreJSON(ctx, "{}") assert.NoError(t, err) - err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, "m.receipt", expireEDUTypes) + err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, gomatrixserverlib.MReceipt, expireEDUTypes) assert.NoError(t, err) } // add data without expiry receipt, err := db.StoreJSON(ctx, "{}") assert.NoError(t, err) + // m.read_marker gets the default expiry of 24h, so won't be deleted further down in this test err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, "m.read_marker", expireEDUTypes) assert.NoError(t, err) @@ -59,5 +62,20 @@ func TestExpireEDUs(t *testing.T) { data, err := db.GetPendingEDUs(ctx, "localhost", 100) assert.NoError(t, err) assert.Equal(t, 1, len(data)) + + // check that m.direct_to_device is never expired + receipt, err = db.StoreJSON(ctx, "{}") + assert.NoError(t, err) + + err = db.AssociateEDUWithDestination(ctx, "localhost", receipt, gomatrixserverlib.MDirectToDevice, expireEDUTypes) + assert.NoError(t, err) + + err = db.DeleteExpiredEDUs(ctx) + assert.NoError(t, err) + + // We should get two EDUs, the m.read_marker and the m.direct_to_device + data, err = db.GetPendingEDUs(ctx, "localhost", 100) + assert.NoError(t, err) + assert.Equal(t, 2, len(data)) }) }