From e7997c1e97be6fe4a6bb75da1fdca7dc6609c591 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 29 Jun 2020 15:29:28 +0100 Subject: [PATCH] Initial work on persistent queues --- federationsender/federationsender.go | 3 +- federationsender/queue/destinationqueue.go | 82 ++++++--- federationsender/queue/queue.go | 5 + federationsender/storage/interface.go | 3 + .../storage/postgres/queue_retry_table.go | 167 ++++++++++++++++++ federationsender/storage/postgres/storage.go | 49 +++++ .../storage/sqlite3/queue_retry_table.go | 167 ++++++++++++++++++ federationsender/storage/sqlite3/storage.go | 49 +++++ federationsender/types/types.go | 6 + 9 files changed, 503 insertions(+), 28 deletions(-) create mode 100644 federationsender/storage/postgres/queue_retry_table.go create mode 100644 federationsender/storage/sqlite3/queue_retry_table.go diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 10ac51c8a..acf524146 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -50,7 +50,8 @@ func NewInternalAPI( statistics := &types.Statistics{} queues := queue.NewOutgoingQueues( - base.Cfg.Matrix.ServerName, federation, rsAPI, statistics, &queue.SigningInfo{ + federationSenderDB, base.Cfg.Matrix.ServerName, federation, rsAPI, statistics, + &queue.SigningInfo{ KeyID: base.Cfg.Matrix.KeyID, PrivateKey: base.Cfg.Matrix.PrivateKey, ServerName: base.Cfg.Matrix.ServerName, diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 4449f9e63..d5144d562 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrix" @@ -34,22 +35,23 @@ import ( // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { - signing *SigningInfo - rsAPI api.RoomserverInternalAPI - client *gomatrixserverlib.FederationClient // federation client - origin gomatrixserverlib.ServerName // origin of requests - destination gomatrixserverlib.ServerName // destination of requests - running atomic.Bool // is the queue worker running? - backingOff atomic.Bool // true if we're backing off - statistics *types.ServerStatistics // statistics about this remote server - incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send - incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send - incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send - lastTransactionIDs []gomatrixserverlib.TransactionID // last transaction ID - pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend - pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend - pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend - retryServerCh chan bool // interrupts backoff + db storage.Database + signing *SigningInfo + rsAPI api.RoomserverInternalAPI + client *gomatrixserverlib.FederationClient // federation client + origin gomatrixserverlib.ServerName // origin of requests + destination gomatrixserverlib.ServerName // destination of requests + running atomic.Bool // is the queue worker running? + backingOff atomic.Bool // true if we're backing off + statistics *types.ServerStatistics // statistics about this remote server + incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send + incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send + incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send + transactionID gomatrixserverlib.TransactionID // last transaction ID + pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend + pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend + pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend + retryServerCh chan bool // interrupts backoff } // retry will clear the blacklist state and attempt to send built up events to the server, @@ -200,19 +202,49 @@ func (oq *destinationQueue) backgroundSend() { // If we have pending PDUs or EDUs then construct a transaction. if numPDUs > 0 || numEDUs > 0 { + // Generate a transaction ID. + if oq.transactionID == "" { + now := gomatrixserverlib.AsTimestamp(time.Now()) + oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount())) + } + // Try sending the next transaction and see what happens. - transaction, terr := oq.nextTransaction(oq.pendingPDUs, oq.pendingEDUs, oq.statistics.SuccessCount()) + transaction, terr := oq.nextTransaction(oq.transactionID, oq.pendingPDUs, oq.pendingEDUs, oq.statistics.SuccessCount()) if terr != nil { // We failed to send the transaction. - if giveUp := oq.statistics.Failure(); giveUp { + giveUp := oq.statistics.Failure() + // TODO: commit the transaction to the database + if terr = oq.db.StoreFailedPDUs( + context.TODO(), + oq.transactionID, + oq.destination, + oq.pendingPDUs, + ); terr != nil { + // We failed to persist the events to the database for some + // reason, so we'll keep them in memory for now. Hopefully + // it's a temporary condition but log it. + logrus.WithError(terr).Errorf("Failed to persist failed sends for server %q to database", oq.destination) + } else { + // Reallocate so that the underlying arrays can be GC'd, as + // opposed to growing forever. + for i := 0; i < numPDUs; i++ { + oq.pendingPDUs[i] = nil + } + oq.pendingPDUs = append( + []*gomatrixserverlib.HeaderedEvent{}, + oq.pendingPDUs[numPDUs:]..., + ) + } + if giveUp { // It's been suggested that we should give up because // the backoff has exceeded a maximum allowable value. return } } else if transaction { // If we successfully sent the transaction then clear out - // the pending events and EDUs. + // the pending events and EDUs, and wipe our transaction ID. oq.statistics.Success() + oq.transactionID = "" // Reallocate so that the underlying arrays can be GC'd, as // opposed to growing forever. for i := 0; i < numPDUs; i++ { @@ -262,6 +294,7 @@ func (oq *destinationQueue) backgroundSend() { // queue and sends it. Returns true if a transaction was sent or // false otherwise. func (oq *destinationQueue) nextTransaction( + transactionID gomatrixserverlib.TransactionID, pendingPDUs []*gomatrixserverlib.HeaderedEvent, pendingEDUs []*gomatrixserverlib.EDU, sentCounter uint32, @@ -270,17 +303,12 @@ func (oq *destinationQueue) nextTransaction( PDUs: []json.RawMessage{}, EDUs: []gomatrixserverlib.EDU{}, } - now := gomatrixserverlib.AsTimestamp(time.Now()) - t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, sentCounter)) + t.TransactionID = transactionID t.Origin = oq.origin t.Destination = oq.destination - t.OriginServerTS = now - t.PreviousIDs = oq.lastTransactionIDs - if t.PreviousIDs == nil { - t.PreviousIDs = []gomatrixserverlib.TransactionID{} - } + t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now()) - oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} + oq.transactionID = t.TransactionID for _, pdu := range pendingPDUs { // Append the JSON of the event, since this is a json.RawMessage type in the diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 240343559..bb1ed2258 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -19,6 +19,7 @@ import ( "fmt" "sync" + "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -29,6 +30,7 @@ import ( // OutgoingQueues is a collection of queues for sending transactions to other // matrix servers type OutgoingQueues struct { + db storage.Database rsAPI api.RoomserverInternalAPI origin gomatrixserverlib.ServerName client *gomatrixserverlib.FederationClient @@ -40,6 +42,7 @@ type OutgoingQueues struct { // NewOutgoingQueues makes a new OutgoingQueues func NewOutgoingQueues( + db storage.Database, origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient, rsAPI api.RoomserverInternalAPI, @@ -47,6 +50,7 @@ func NewOutgoingQueues( signing *SigningInfo, ) *OutgoingQueues { return &OutgoingQueues{ + db: db, rsAPI: rsAPI, origin: origin, client: client, @@ -76,6 +80,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d oq := oqs.queues[destination] if oq == nil { oq = &destinationQueue{ + db: oqs.db, rsAPI: oqs.rsAPI, origin: oqs.origin, destination: destination, diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index be195382b..d063f3c53 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -19,10 +19,13 @@ import ( "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/gomatrixserverlib" ) type Database interface { internal.PartitionStorer UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) + GetFailedPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName) ([]*gomatrixserverlib.HeaderedEvent, error) + StoreFailedPDUs(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, pdus []*gomatrixserverlib.HeaderedEvent) error } diff --git a/federationsender/storage/postgres/queue_retry_table.go b/federationsender/storage/postgres/queue_retry_table.go new file mode 100644 index 000000000..c9b46f6fd --- /dev/null +++ b/federationsender/storage/postgres/queue_retry_table.go @@ -0,0 +1,167 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const queueRetrySchema = ` +-- The queue_retry table contains events that we failed to +-- send to a destination host, such that we can try them again +-- later. +CREATE TABLE IF NOT EXISTS federationsender_queue_retry ( + -- The string ID of the room. + transaction_id TEXT NOT NULL, + -- The event type: "pdu", "invite", "send_to_device". + send_type TEXT NOT NULL, + -- The event ID of the m.room.member join event. + event_id TEXT NOT NULL, + -- The origin server TS of the event. + origin_server_ts BIGINT NOT NULL, + -- The domain part of the user ID the m.room.member event is for. + server_name TEXT NOT NULL, + -- The JSON body. + json_body BYTEA NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_retry_event_id_idx + ON federationsender_queue_retry (event_id); +` + +const insertRetrySQL = "" + + "INSERT INTO federationsender_queue_retry (transaction_id, send_type, event_id, origin_server_ts, server_name, json_body)" + + " VALUES ($1, $2, $3, $4, $5, $6)" + +const deleteRetrySQL = "" + + "DELETE FROM federationsender_queue_retry WHERE event_id = ANY($1)" + +const selectRetryNextTransactionIDSQL = "" + + "SELECT transaction_id FROM federationsender_queue_retry" + + " WHERE server_name = $1 AND send_type = $2" + + " ORDER BY transaction_id ASC" + + " LIMIT 1" + +const selectRetryPDUsByTransactionSQL = "" + + "SELECT event_id, server_name, origin_server_ts, json_body FROM federationsender_queue_retry" + + " WHERE server_name = $1 AND send_type = $2 AND transaction_id = $3" + + " LIMIT 50" + +type queueRetryStatements struct { + insertRetryStmt *sql.Stmt + deleteRetryStmt *sql.Stmt + selectRetryNextTransactionIDStmt *sql.Stmt + selectRetryPDUsByTransactionStmt *sql.Stmt +} + +func (s *queueRetryStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(queueRetrySchema) + if err != nil { + return + } + if s.insertRetryStmt, err = db.Prepare(insertRetrySQL); err != nil { + return + } + if s.deleteRetryStmt, err = db.Prepare(deleteRetrySQL); err != nil { + return + } + if s.selectRetryNextTransactionIDStmt, err = db.Prepare(selectRetryNextTransactionIDSQL); err != nil { + return + } + if s.selectRetryPDUsByTransactionStmt, err = db.Prepare(selectRetryPDUsByTransactionSQL); err != nil { + return + } + return +} + +func (s *queueRetryStatements) insertQueueRetry( + ctx context.Context, + txn *sql.Tx, + transactionID string, + sendtype string, + event gomatrixserverlib.Event, + serverName gomatrixserverlib.ServerName, +) error { + stmt := sqlutil.TxStmt(txn, s.insertRetryStmt) + _, err := stmt.ExecContext( + ctx, + transactionID, // the transaction ID that we initially attempted + sendtype, // either "pdu", "invite", "send_to_device" + event.EventID(), // the event ID + event.OriginServerTS(), // the event origin server TS + serverName, // destination server name + event.JSON(), // JSON body + ) + return err +} + +func (s *queueRetryStatements) deleteQueueRetry( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteRetryStmt) + _, err := stmt.ExecContext(ctx, pq.StringArray(eventIDs)) + return err +} + +func (s *queueRetryStatements) selectRetryNextTransactionID( + ctx context.Context, txn *sql.Tx, serverName, sendType string, +) (string, error) { + var transactionID string + stmt := sqlutil.TxStmt(txn, s.selectRetryNextTransactionIDStmt) + err := stmt.QueryRowContext(ctx, serverName, types.FailedEventTypePDU).Scan(&transactionID) + return transactionID, err +} + +func (s *queueRetryStatements) selectQueueRetryPDUs( + ctx context.Context, txn *sql.Tx, serverName string, transactionID string, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + + stmt := sqlutil.TxStmt(txn, s.selectRetryPDUsByTransactionStmt) + rows, err := stmt.QueryContext(ctx, serverName, types.FailedEventTypePDU, transactionID) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "queueRetryFromStmt: rows.close() failed") + + var result []*gomatrixserverlib.HeaderedEvent + for rows.Next() { + var transactionID, eventID string + var originServerTS int64 + var jsonBody []byte + if err = rows.Scan(&transactionID, &eventID, &originServerTS, &jsonBody); err != nil { + return nil, err + } + var event gomatrixserverlib.HeaderedEvent + if err = json.Unmarshal(jsonBody, &event); err != nil { + return nil, fmt.Errorf("json.Unmarshal: %w", err) + } + if event.EventID() != eventID { + return nil, fmt.Errorf("event ID %q doesn't match expected %q", event.EventID(), eventID) + } + result = append(result, &event) + } + + return result, rows.Err() +} diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index 8fd4c11a3..602ba862c 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -18,15 +18,18 @@ package postgres import ( "context" "database/sql" + "fmt" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" ) // Database stores information needed by the federation sender type Database struct { joinedHostsStatements roomStatements + queueRetryStatements sqlutil.PartitionOffsetStatements db *sql.DB } @@ -55,6 +58,10 @@ func (d *Database) prepare() error { return err } + if err = d.queueRetryStatements.prepare(d.db); err != nil { + return err + } + return d.PartitionOffsetStatements.Prepare(d.db, "federationsender") } @@ -120,3 +127,45 @@ func (d *Database) GetJoinedHosts( ) ([]types.JoinedHost, error) { return d.selectJoinedHosts(ctx, roomID) } + +// GetFailedPDUs retrieves PDUs that we have failed to send on +// a specific destination queue. +func (d *Database) GetFailedPDUs( + ctx context.Context, + serverName gomatrixserverlib.ServerName, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + transactionID, err := d.selectRetryNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU) + if err != nil { + return nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err) + } + + events, err := d.selectQueueRetryPDUs(ctx, nil, string(serverName), transactionID) + if err != nil { + return nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err) + } + return events, nil +} + +// StoreFailedPDUs stores PDUs that we have failed to send on +// a specific destination queue. +func (d *Database) StoreFailedPDUs( + ctx context.Context, + transactionID gomatrixserverlib.TransactionID, + serverName gomatrixserverlib.ServerName, + pdus []*gomatrixserverlib.HeaderedEvent, +) error { + for _, pdu := range pdus { + if _, err := d.insertRetryStmt.ExecContext( + ctx, + string(transactionID), // transaction ID + types.FailedEventTypePDU, // type of event that was queued + pdu.EventID(), // event ID + pdu.OriginServerTS(), // event origin server TS + string(serverName), // destination server name + pdu.JSON(), // JSON body + ); err != nil { + return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err) + } + } + return nil +} diff --git a/federationsender/storage/sqlite3/queue_retry_table.go b/federationsender/storage/sqlite3/queue_retry_table.go new file mode 100644 index 000000000..0596a7193 --- /dev/null +++ b/federationsender/storage/sqlite3/queue_retry_table.go @@ -0,0 +1,167 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const queueRetrySchema = ` +-- The queue_retry table contains events that we failed to +-- send to a destination host, such that we can try them again +-- later. +CREATE TABLE IF NOT EXISTS federationsender_queue_retry ( + -- The string ID of the room. + transaction_id TEXT NOT NULL, + -- The event type: "pdu", "invite", "send_to_device". + send_type TEXT NOT NULL, + -- The event ID of the m.room.member join event. + event_id TEXT NOT NULL, + -- The origin server TS of the event. + origin_server_ts BIGINT NOT NULL, + -- The domain part of the user ID the m.room.member event is for. + server_name TEXT NOT NULL, + -- The JSON body. + json_body BYTEA NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_retry_event_id_idx + ON federationsender_queue_retry (event_id); +` + +const insertRetrySQL = "" + + "INSERT INTO federationsender_queue_retry (transaction_id, send_type, event_id, origin_server_ts, server_name, json_body)" + + " VALUES ($1, $2, $3, $4, $5, $6)" + +const deleteRetrySQL = "" + + "DELETE FROM federationsender_queue_retry WHERE event_id = ANY($1)" + +const selectRetryNextTransactionIDSQL = "" + + "SELECT transaction_id FROM federationsender_queue_retry" + + " WHERE server_name = $1 AND send_type = $2" + + " ORDER BY transaction_id ASC" + + " LIMIT 1" + +const selectRetryPDUsByTransactionSQL = "" + + "SELECT event_id, server_name, origin_server_ts, json_body FROM federationsender_queue_retry" + + " WHERE server_name = $1 AND send_type = $2 AND transaction_id = $3" + + " LIMIT 50" + +type queueRetryStatements struct { + insertRetryStmt *sql.Stmt + deleteRetryStmt *sql.Stmt + selectRetryNextTransactionIDStmt *sql.Stmt + selectRetryPDUsByTransactionStmt *sql.Stmt +} + +func (s *queueRetryStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(queueRetrySchema) + if err != nil { + return + } + if s.insertRetryStmt, err = db.Prepare(insertRetrySQL); err != nil { + return + } + if s.deleteRetryStmt, err = db.Prepare(deleteRetrySQL); err != nil { + return + } + if s.selectRetryNextTransactionIDStmt, err = db.Prepare(selectRetryNextTransactionIDSQL); err != nil { + return + } + if s.selectRetryPDUsByTransactionStmt, err = db.Prepare(selectRetryPDUsByTransactionSQL); err != nil { + return + } + return +} + +func (s *queueRetryStatements) insertQueueRetry( + ctx context.Context, + txn *sql.Tx, + transactionID string, + sendtype string, + event gomatrixserverlib.Event, + serverName gomatrixserverlib.ServerName, +) error { + stmt := sqlutil.TxStmt(txn, s.insertRetryStmt) + _, err := stmt.ExecContext( + ctx, + transactionID, // the transaction ID that we initially attempted + sendtype, // either "pdu", "invite", "send_to_device" + event.EventID(), // the event ID + event.OriginServerTS(), // the event origin server TS + serverName, // destination server name + event.JSON(), // JSON body + ) + return err +} + +func (s *queueRetryStatements) deleteQueueRetry( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteRetryStmt) + _, err := stmt.ExecContext(ctx, pq.StringArray(eventIDs)) + return err +} + +func (s *queueRetryStatements) selectRetryNextTransactionID( + ctx context.Context, txn *sql.Tx, serverName, sendType string, +) (string, error) { + var transactionID string + stmt := sqlutil.TxStmt(txn, s.selectRetryNextTransactionIDStmt) + err := stmt.QueryRowContext(ctx, serverName, types.FailedEventTypePDU).Scan(&transactionID) + return transactionID, err +} + +func (s *queueRetryStatements) selectQueueRetryPDUs( + ctx context.Context, txn *sql.Tx, serverName string, transactionID string, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + + stmt := sqlutil.TxStmt(txn, s.selectRetryPDUsByTransactionStmt) + rows, err := stmt.QueryContext(ctx, serverName, types.FailedEventTypePDU, transactionID) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "queueRetryFromStmt: rows.close() failed") + + var result []*gomatrixserverlib.HeaderedEvent + for rows.Next() { + var transactionID, eventID string + var originServerTS int64 + var jsonBody []byte + if err = rows.Scan(&transactionID, &eventID, &originServerTS, &jsonBody); err != nil { + return nil, err + } + var event gomatrixserverlib.HeaderedEvent + if err = json.Unmarshal(jsonBody, &event); err != nil { + return nil, fmt.Errorf("json.Unmarshal: %w", err) + } + if event.EventID() != eventID { + return nil, fmt.Errorf("event ID %q doesn't match expected %q", event.EventID(), eventID) + } + result = append(result, &event) + } + + return result, rows.Err() +} diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index ac303f646..2373a85bd 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -18,17 +18,20 @@ package sqlite3 import ( "context" "database/sql" + "fmt" _ "github.com/mattn/go-sqlite3" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" ) // Database stores information needed by the federation sender type Database struct { joinedHostsStatements roomStatements + queueRetryStatements sqlutil.PartitionOffsetStatements db *sql.DB } @@ -61,6 +64,10 @@ func (d *Database) prepare() error { return err } + if err = d.queueRetryStatements.prepare(d.db); err != nil { + return err + } + return d.PartitionOffsetStatements.Prepare(d.db, "federationsender") } @@ -126,3 +133,45 @@ func (d *Database) GetJoinedHosts( ) ([]types.JoinedHost, error) { return d.selectJoinedHosts(ctx, roomID) } + +// GetFailedPDUs retrieves PDUs that we have failed to send on +// a specific destination queue. +func (d *Database) GetFailedPDUs( + ctx context.Context, + serverName gomatrixserverlib.ServerName, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + transactionID, err := d.selectRetryNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU) + if err != nil { + return nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err) + } + + events, err := d.selectQueueRetryPDUs(ctx, nil, string(serverName), transactionID) + if err != nil { + return nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err) + } + return events, nil +} + +// StoreFailedPDUs stores PDUs that we have failed to send on +// a specific destination queue. +func (d *Database) StoreFailedPDUs( + ctx context.Context, + transactionID gomatrixserverlib.TransactionID, + serverName gomatrixserverlib.ServerName, + pdus []*gomatrixserverlib.HeaderedEvent, +) error { + for _, pdu := range pdus { + if _, err := d.insertRetryStmt.ExecContext( + ctx, + string(transactionID), // transaction ID + types.FailedEventTypePDU, // type of event that was queued + pdu.EventID(), // event ID + pdu.OriginServerTS(), // event origin server TS + string(serverName), // destination server name + pdu.JSON(), // JSON body + ); err != nil { + return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err) + } + } + return nil +} diff --git a/federationsender/types/types.go b/federationsender/types/types.go index 398d32677..db9dbf34e 100644 --- a/federationsender/types/types.go +++ b/federationsender/types/types.go @@ -20,6 +20,12 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +const ( + FailedEventTypePDU = "pdu" + FailedEventTypeInvite = "invite" + FailedEventTypeSendToDevice = "send_to_device" +) + // A JoinedHost is a server that is joined to a matrix room. type JoinedHost struct { // The MemberEventID of a m.room.member join event.