diff --git a/federationapi/storage/interface.go b/federationapi/storage/interface.go index 9f2b992f7..c032ad0ae 100644 --- a/federationapi/storage/interface.go +++ b/federationapi/storage/interface.go @@ -16,6 +16,7 @@ package storage import ( "context" + "time" "github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/federationapi/types" @@ -39,7 +40,7 @@ type Database interface { GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[*shared.Receipt]*gomatrixserverlib.EDU, err error) AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error - AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt, eduType string) error + AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt, eduType string, expireEDUTypes map[string]time.Duration) error CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error diff --git a/federationapi/storage/postgres/queue_edus_table.go b/federationapi/storage/postgres/queue_edus_table.go index 787a35cbb..cc6cc9e91 100644 --- a/federationapi/storage/postgres/queue_edus_table.go +++ b/federationapi/storage/postgres/queue_edus_table.go @@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_edus ( -- The JSON NID from the federationsender_queue_edus_json table. json_nid BIGINT NOT NULL, -- The expiry time of this edu, if any. - expires_at BIGINT + expires_at BIGINT NOT NULL DEFAULT 0 ); CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx @@ -108,7 +108,7 @@ func (s *queueEDUsStatements) InsertQueueEDU( eduType string, serverName gomatrixserverlib.ServerName, nid int64, - expiresAt *gomatrixserverlib.Timestamp, + expiresAt gomatrixserverlib.Timestamp, ) error { stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) _, err := stmt.ExecContext( diff --git a/federationapi/storage/shared/storage_edus.go b/federationapi/storage/shared/storage_edus.go index 8bb43a2d1..6583bdfdc 100644 --- a/federationapi/storage/shared/storage_edus.go +++ b/federationapi/storage/shared/storage_edus.go @@ -25,9 +25,12 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) -// expireEDUTypes contains EDUs which can/should be expired after a given time +// defaultExpiry for EDUs if not listed below +var defaultExpiry = time.Hour * 24 + +// defaultExpireEDUTypes contains EDUs which can/should be expired after a given time // if the target server isn't reachable for some reason. -var expireEDUTypes = map[string]time.Duration{ +var defaultExpireEDUTypes = map[string]time.Duration{ gomatrixserverlib.MTyping: time.Minute, gomatrixserverlib.MPresence: time.Minute * 10, } @@ -40,12 +43,15 @@ func (d *Database) AssociateEDUWithDestination( serverName gomatrixserverlib.ServerName, receipt *Receipt, eduType string, + expireEDUTypes map[string]time.Duration, ) error { - var expiresAt *gomatrixserverlib.Timestamp + if expireEDUTypes == nil { + expireEDUTypes = defaultExpireEDUTypes + } + expiresAt := gomatrixserverlib.AsTimestamp(time.Now().Add(defaultExpiry)) if duration, ok := expireEDUTypes[eduType]; ok { // Keep EDUs for at least x minutes before deleting them - ts := gomatrixserverlib.AsTimestamp(time.Now().Add(duration)) - expiresAt = &ts + expiresAt = gomatrixserverlib.AsTimestamp(time.Now().Add(duration)) } return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { if err := d.FederationQueueEDUs.InsertQueueEDU( diff --git a/federationapi/storage/sqlite3/queue_edus_table.go b/federationapi/storage/sqlite3/queue_edus_table.go index 013f73923..f790f6ab4 100644 --- a/federationapi/storage/sqlite3/queue_edus_table.go +++ b/federationapi/storage/sqlite3/queue_edus_table.go @@ -34,7 +34,7 @@ CREATE TABLE IF NOT EXISTS federationsender_queue_edus ( -- The JSON NID from the federationsender_queue_edus_json table. json_nid BIGINT NOT NULL, -- The expiry time of this edu, if any. - expires_at BIGINT + expires_at BIGINT NOT NULL DEFAULT 0 ); CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx @@ -108,7 +108,7 @@ func (s *queueEDUsStatements) InsertQueueEDU( eduType string, serverName gomatrixserverlib.ServerName, nid int64, - expiresAt *gomatrixserverlib.Timestamp, + expiresAt gomatrixserverlib.Timestamp, ) error { stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) _, err := stmt.ExecContext( diff --git a/federationapi/storage/tables/interface.go b/federationapi/storage/tables/interface.go index ffb70a47e..3c116a1d0 100644 --- a/federationapi/storage/tables/interface.go +++ b/federationapi/storage/tables/interface.go @@ -34,7 +34,7 @@ type FederationQueuePDUs interface { } type FederationQueueEDUs interface { - InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid int64, expiresAt *gomatrixserverlib.Timestamp) error + InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid int64, expiresAt gomatrixserverlib.Timestamp) error DeleteQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error SelectQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error) SelectQueueEDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error)