From 2c3540d82b8e8fee6d31deacf22b8e027d7e4e62 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 30 Jun 2020 13:31:56 +0100 Subject: [PATCH] Duplicate postgres code into sqlite for now just to stop build errors, will fix SQLite soon --- .../storage/postgres/queue_pdus_table.go | 32 ++-- federationsender/storage/postgres/storage.go | 4 +- .../storage/sqlite3/queue_json_table.go | 113 ++++++++++++ .../storage/sqlite3/queue_pdus_table.go | 141 +++++++++++++++ .../storage/sqlite3/queue_retry_table.go | 167 ------------------ federationsender/storage/sqlite3/storage.go | 103 ++++++++--- 6 files changed, 348 insertions(+), 212 deletions(-) create mode 100644 federationsender/storage/sqlite3/queue_json_table.go create mode 100644 federationsender/storage/sqlite3/queue_pdus_table.go delete mode 100644 federationsender/storage/sqlite3/queue_retry_table.go diff --git a/federationsender/storage/postgres/queue_pdus_table.go b/federationsender/storage/postgres/queue_pdus_table.go index 609fd554c..84251529d 100644 --- a/federationsender/storage/postgres/queue_pdus_table.go +++ b/federationsender/storage/postgres/queue_pdus_table.go @@ -24,7 +24,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) -const queueSchema = ` +const queuePDUsSchema = ` CREATE TABLE IF NOT EXISTS federationsender_queue_pdus ( -- The transaction ID that was generated before persisting the event. transaction_id TEXT NOT NULL, @@ -38,11 +38,11 @@ CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_event_id_idx ON federationsender_queue (event_id, server_name); ` -const insertQueueSQL = "" + +const insertQueuePDUSQL = "" + "INSERT INTO federationsender_queue (transaction_id, server_name, json_nid)" + " VALUES ($1, $2, $3)" -const deleteQueueTransactionSQL = "" + +const deleteQueueTransactionPDUsSQL = "" + "DELETE FROM federationsender_queue WHERE server_name = $1 AND transaction_id = $2" const selectQueueNextTransactionIDSQL = "" + @@ -56,22 +56,22 @@ const selectQueuePDUsByTransactionSQL = "" + " WHERE server_name = $1 AND transaction_id = $2" + " LIMIT 50" -type queueStatements struct { - insertQueueStmt *sql.Stmt - deleteQueueTransactionStmt *sql.Stmt +type queuePDUsStatements struct { + insertQueuePDUStmt *sql.Stmt + deleteQueueTransactionPDUsStmt *sql.Stmt selectQueueNextTransactionIDStmt *sql.Stmt selectQueuePDUsByTransactionStmt *sql.Stmt } -func (s *queueStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(queueSchema) +func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(queuePDUsSchema) if err != nil { return } - if s.insertQueueStmt, err = db.Prepare(insertQueueSQL); err != nil { + if s.insertQueuePDUStmt, err = db.Prepare(insertQueuePDUSQL); err != nil { return } - if s.deleteQueueTransactionStmt, err = db.Prepare(deleteQueueTransactionSQL); err != nil { + if s.deleteQueueTransactionPDUsStmt, err = db.Prepare(deleteQueueTransactionPDUsSQL); err != nil { return } if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil { @@ -83,14 +83,14 @@ func (s *queueStatements) prepare(db *sql.DB) (err error) { return } -func (s *queueStatements) insertQueuePDU( +func (s *queuePDUsStatements) insertQueuePDU( ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64, ) error { - stmt := sqlutil.TxStmt(txn, s.insertQueueStmt) + stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) _, err := stmt.ExecContext( ctx, transactionID, // the transaction ID that we initially attempted @@ -100,17 +100,17 @@ func (s *queueStatements) insertQueuePDU( return err } -func (s *queueStatements) deleteQueueTransaction( +func (s *queuePDUsStatements) deleteQueueTransaction( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID, ) error { - stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionStmt) + stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionPDUsStmt) _, err := stmt.ExecContext(ctx, serverName, transactionID) return err } -func (s *queueStatements) selectQueueNextTransactionID( +func (s *queuePDUsStatements) selectQueueNextTransactionID( ctx context.Context, txn *sql.Tx, serverName, sendType string, ) (string, error) { var transactionID string @@ -119,7 +119,7 @@ func (s *queueStatements) selectQueueNextTransactionID( return transactionID, err } -func (s *queueStatements) selectQueuePDUs( +func (s *queuePDUsStatements) selectQueuePDUs( ctx context.Context, txn *sql.Tx, serverName string, transactionID string, limit int, ) ([]int64, error) { stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index 7a3d5384c..d4719c7aa 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -30,7 +30,7 @@ import ( type Database struct { joinedHostsStatements roomStatements - queueStatements + queuePDUsStatements queueJSONStatements sqlutil.PartitionOffsetStatements db *sql.DB @@ -60,7 +60,7 @@ func (d *Database) prepare() error { return err } - if err = d.queueStatements.prepare(d.db); err != nil { + if err = d.queuePDUsStatements.prepare(d.db); err != nil { return err } diff --git a/federationsender/storage/sqlite3/queue_json_table.go b/federationsender/storage/sqlite3/queue_json_table.go new file mode 100644 index 000000000..1d5d811a7 --- /dev/null +++ b/federationsender/storage/sqlite3/queue_json_table.go @@ -0,0 +1,113 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +const queueJSONSchema = ` +-- The queue_retry_json table contains event contents that +-- we failed to send. +CREATE TABLE IF NOT EXISTS federationsender_queue_retry_json ( + -- The JSON NID. This allows the federationsender_queue_retry table to + -- cross-reference to find the JSON blob. + json_nid BIGSERIAL, + -- The JSON body. Text so that we preserve UTF-8. + json_body TEXT NOT NULL +); +` + +const insertJSONSQL = "" + + "INSERT INTO federationsender_queue_retry_json (json_body)" + + " VALUES ($1)" + + " ON CONFLICT DO NOTHING" + +const deleteJSONSQL = "" + + "DELETE FROM federationsender_queue_retry_json WHERE json_nid = ANY($1)" + +const selectJSONSQL = "" + + "SELECT json_nid, json_body FROM federationsender_queue_retry_json" + + " WHERE json_nid = ANY($1)" + +type queueJSONStatements struct { + insertJSONStmt *sql.Stmt + deleteJSONStmt *sql.Stmt + selectJSONStmt *sql.Stmt +} + +func (s *queueJSONStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(queueJSONSchema) + if err != nil { + return + } + if s.insertJSONStmt, err = db.Prepare(insertJSONSQL); err != nil { + return + } + if s.deleteJSONStmt, err = db.Prepare(deleteJSONSQL); err != nil { + return + } + if s.selectJSONStmt, err = db.Prepare(selectJSONSQL); err != nil { + return + } + return +} + +func (s *queueJSONStatements) insertQueueJSON( + ctx context.Context, txn *sql.Tx, json string, +) (int64, error) { + stmt := sqlutil.TxStmt(txn, s.insertJSONStmt) + res, err := stmt.ExecContext(ctx, json) + if err != nil { + return 0, err + } + lastid, err := res.LastInsertId() + return lastid, err +} + +func (s *queueJSONStatements) deleteQueueJSON( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteJSONStmt) + _, err := stmt.ExecContext(ctx, pq.StringArray(eventIDs)) + return err +} + +func (s *queueJSONStatements) selectJSON( + ctx context.Context, txn *sql.Tx, jsonNIDs []int64, +) (map[int64][]byte, error) { + blobs := map[int64][]byte{} + stmt := sqlutil.TxStmt(txn, s.selectJSONStmt) + rows, err := stmt.QueryContext(ctx, pq.Int64Array(jsonNIDs)) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed") + for rows.Next() { + var nid int64 + var blob []byte + if err = rows.Scan(&nid, &blob); err != nil { + return nil, err + } + blobs[nid] = blob + } + return blobs, err +} diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go new file mode 100644 index 000000000..b1ab7b2bc --- /dev/null +++ b/federationsender/storage/sqlite3/queue_pdus_table.go @@ -0,0 +1,141 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const queuePDUsSchema = ` +CREATE TABLE IF NOT EXISTS federationsender_queue_pdus ( + -- The transaction ID that was generated before persisting the event. + transaction_id TEXT NOT NULL, + -- The domain part of the user ID the m.room.member event is for. + server_name TEXT NOT NULL, + -- The JSON NID from the federationsender_queue_json table. + json_nid BIGINT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_event_id_idx + ON federationsender_queue (event_id, server_name); +` + +const insertQueuePDUSQL = "" + + "INSERT INTO federationsender_queue (transaction_id, server_name, json_nid)" + + " VALUES ($1, $2, $3)" + +const deleteQueueTransactionPDUsSQL = "" + + "DELETE FROM federationsender_queue WHERE server_name = $1 AND transaction_id = $2" + +const selectQueueNextTransactionIDSQL = "" + + "SELECT transaction_id FROM federationsender_queue" + + " WHERE server_name = $1" + + " ORDER BY transaction_id ASC" + + " LIMIT 1" + +const selectQueuePDUsByTransactionSQL = "" + + "SELECT json_nid FROM federationsender_queue" + + " WHERE server_name = $1 AND transaction_id = $2" + + " LIMIT 50" + +type queuePDUsStatements struct { + insertQueuePDUStmt *sql.Stmt + deleteQueueTransactionPDUsStmt *sql.Stmt + selectQueueNextTransactionIDStmt *sql.Stmt + selectQueuePDUsByTransactionStmt *sql.Stmt +} + +func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(queuePDUsSchema) + if err != nil { + return + } + if s.insertQueuePDUStmt, err = db.Prepare(insertQueuePDUSQL); err != nil { + return + } + if s.deleteQueueTransactionPDUsStmt, err = db.Prepare(deleteQueueTransactionPDUsSQL); err != nil { + return + } + if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil { + return + } + if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil { + return + } + return +} + +func (s *queuePDUsStatements) insertQueuePDU( + ctx context.Context, + txn *sql.Tx, + transactionID gomatrixserverlib.TransactionID, + serverName gomatrixserverlib.ServerName, + nid int64, +) error { + stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) + _, err := stmt.ExecContext( + ctx, + transactionID, // the transaction ID that we initially attempted + serverName, // destination server name + nid, // JSON blob NID + ) + return err +} + +func (s *queuePDUsStatements) deleteQueueTransaction( + ctx context.Context, txn *sql.Tx, + serverName gomatrixserverlib.ServerName, + transactionID gomatrixserverlib.TransactionID, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionPDUsStmt) + _, err := stmt.ExecContext(ctx, serverName, transactionID) + return err +} + +func (s *queuePDUsStatements) selectQueueNextTransactionID( + ctx context.Context, txn *sql.Tx, serverName, sendType string, +) (string, error) { + var transactionID string + stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt) + err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID) + return transactionID, err +} + +func (s *queuePDUsStatements) selectQueuePDUs( + ctx context.Context, txn *sql.Tx, serverName string, transactionID string, limit int, +) ([]int64, error) { + stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) + rows, err := stmt.QueryContext(ctx, serverName, transactionID) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed") + var result []int64 + for rows.Next() { + var nid int64 + if err = rows.Scan(&nid); err != nil { + return nil, err + } + result = append(result, nid) + } + + return result, rows.Err() +} diff --git a/federationsender/storage/sqlite3/queue_retry_table.go b/federationsender/storage/sqlite3/queue_retry_table.go deleted file mode 100644 index 0596a7193..000000000 --- a/federationsender/storage/sqlite3/queue_retry_table.go +++ /dev/null @@ -1,167 +0,0 @@ -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sqlite3 - -import ( - "context" - "database/sql" - "encoding/json" - "fmt" - - "github.com/lib/pq" - "github.com/matrix-org/dendrite/federationsender/types" - "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/gomatrixserverlib" -) - -const queueRetrySchema = ` --- The queue_retry table contains events that we failed to --- send to a destination host, such that we can try them again --- later. -CREATE TABLE IF NOT EXISTS federationsender_queue_retry ( - -- The string ID of the room. - transaction_id TEXT NOT NULL, - -- The event type: "pdu", "invite", "send_to_device". - send_type TEXT NOT NULL, - -- The event ID of the m.room.member join event. - event_id TEXT NOT NULL, - -- The origin server TS of the event. - origin_server_ts BIGINT NOT NULL, - -- The domain part of the user ID the m.room.member event is for. - server_name TEXT NOT NULL, - -- The JSON body. - json_body BYTEA NOT NULL -); - -CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_retry_event_id_idx - ON federationsender_queue_retry (event_id); -` - -const insertRetrySQL = "" + - "INSERT INTO federationsender_queue_retry (transaction_id, send_type, event_id, origin_server_ts, server_name, json_body)" + - " VALUES ($1, $2, $3, $4, $5, $6)" - -const deleteRetrySQL = "" + - "DELETE FROM federationsender_queue_retry WHERE event_id = ANY($1)" - -const selectRetryNextTransactionIDSQL = "" + - "SELECT transaction_id FROM federationsender_queue_retry" + - " WHERE server_name = $1 AND send_type = $2" + - " ORDER BY transaction_id ASC" + - " LIMIT 1" - -const selectRetryPDUsByTransactionSQL = "" + - "SELECT event_id, server_name, origin_server_ts, json_body FROM federationsender_queue_retry" + - " WHERE server_name = $1 AND send_type = $2 AND transaction_id = $3" + - " LIMIT 50" - -type queueRetryStatements struct { - insertRetryStmt *sql.Stmt - deleteRetryStmt *sql.Stmt - selectRetryNextTransactionIDStmt *sql.Stmt - selectRetryPDUsByTransactionStmt *sql.Stmt -} - -func (s *queueRetryStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(queueRetrySchema) - if err != nil { - return - } - if s.insertRetryStmt, err = db.Prepare(insertRetrySQL); err != nil { - return - } - if s.deleteRetryStmt, err = db.Prepare(deleteRetrySQL); err != nil { - return - } - if s.selectRetryNextTransactionIDStmt, err = db.Prepare(selectRetryNextTransactionIDSQL); err != nil { - return - } - if s.selectRetryPDUsByTransactionStmt, err = db.Prepare(selectRetryPDUsByTransactionSQL); err != nil { - return - } - return -} - -func (s *queueRetryStatements) insertQueueRetry( - ctx context.Context, - txn *sql.Tx, - transactionID string, - sendtype string, - event gomatrixserverlib.Event, - serverName gomatrixserverlib.ServerName, -) error { - stmt := sqlutil.TxStmt(txn, s.insertRetryStmt) - _, err := stmt.ExecContext( - ctx, - transactionID, // the transaction ID that we initially attempted - sendtype, // either "pdu", "invite", "send_to_device" - event.EventID(), // the event ID - event.OriginServerTS(), // the event origin server TS - serverName, // destination server name - event.JSON(), // JSON body - ) - return err -} - -func (s *queueRetryStatements) deleteQueueRetry( - ctx context.Context, txn *sql.Tx, eventIDs []string, -) error { - stmt := sqlutil.TxStmt(txn, s.deleteRetryStmt) - _, err := stmt.ExecContext(ctx, pq.StringArray(eventIDs)) - return err -} - -func (s *queueRetryStatements) selectRetryNextTransactionID( - ctx context.Context, txn *sql.Tx, serverName, sendType string, -) (string, error) { - var transactionID string - stmt := sqlutil.TxStmt(txn, s.selectRetryNextTransactionIDStmt) - err := stmt.QueryRowContext(ctx, serverName, types.FailedEventTypePDU).Scan(&transactionID) - return transactionID, err -} - -func (s *queueRetryStatements) selectQueueRetryPDUs( - ctx context.Context, txn *sql.Tx, serverName string, transactionID string, -) ([]*gomatrixserverlib.HeaderedEvent, error) { - - stmt := sqlutil.TxStmt(txn, s.selectRetryPDUsByTransactionStmt) - rows, err := stmt.QueryContext(ctx, serverName, types.FailedEventTypePDU, transactionID) - if err != nil { - return nil, err - } - defer internal.CloseAndLogIfError(ctx, rows, "queueRetryFromStmt: rows.close() failed") - - var result []*gomatrixserverlib.HeaderedEvent - for rows.Next() { - var transactionID, eventID string - var originServerTS int64 - var jsonBody []byte - if err = rows.Scan(&transactionID, &eventID, &originServerTS, &jsonBody); err != nil { - return nil, err - } - var event gomatrixserverlib.HeaderedEvent - if err = json.Unmarshal(jsonBody, &event); err != nil { - return nil, fmt.Errorf("json.Unmarshal: %w", err) - } - if event.EventID() != eventID { - return nil, fmt.Errorf("event ID %q doesn't match expected %q", event.EventID(), eventID) - } - result = append(result, &event) - } - - return result, rows.Err() -} diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 2373a85bd..b0fe039e3 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -18,6 +18,7 @@ package sqlite3 import ( "context" "database/sql" + "encoding/json" "fmt" _ "github.com/mattn/go-sqlite3" @@ -31,7 +32,8 @@ import ( type Database struct { joinedHostsStatements roomStatements - queueRetryStatements + queuePDUsStatements + queueJSONStatements sqlutil.PartitionOffsetStatements db *sql.DB } @@ -64,7 +66,11 @@ func (d *Database) prepare() error { return err } - if err = d.queueRetryStatements.prepare(d.db); err != nil { + if err = d.queuePDUsStatements.prepare(d.db); err != nil { + return err + } + + if err = d.queueJSONStatements.prepare(d.db); err != nil { return err } @@ -134,44 +140,87 @@ func (d *Database) GetJoinedHosts( return d.selectJoinedHosts(ctx, roomID) } -// GetFailedPDUs retrieves PDUs that we have failed to send on -// a specific destination queue. -func (d *Database) GetFailedPDUs( - ctx context.Context, - serverName gomatrixserverlib.ServerName, -) ([]*gomatrixserverlib.HeaderedEvent, error) { - transactionID, err := d.selectRetryNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU) +// StoreJSON adds a JSON blob into the queue JSON table and returns +// a NID. The NID will then be used when inserting the per-destination +// metadata entries. +func (d *Database) StoreJSON( + ctx context.Context, js []byte, +) (int64, error) { + res, err := d.insertJSONStmt.ExecContext(ctx, js) if err != nil { - return nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err) + return 0, fmt.Errorf("d.insertRetryJSONStmt: %w", err) } - - events, err := d.selectQueueRetryPDUs(ctx, nil, string(serverName), transactionID) + nid, err := res.LastInsertId() if err != nil { - return nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err) + return 0, fmt.Errorf("res.LastInsertID: %w", err) } - return events, nil + return nid, nil } -// StoreFailedPDUs stores PDUs that we have failed to send on -// a specific destination queue. -func (d *Database) StoreFailedPDUs( +// AssociatePDUWithDestination creates an association that the +// destination queues will use to determine which JSON blobs to send +// to which servers. +func (d *Database) AssociatePDUWithDestination( ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, - pdus []*gomatrixserverlib.HeaderedEvent, + nids []int64, ) error { - for _, pdu := range pdus { - if _, err := d.insertRetryStmt.ExecContext( - ctx, - string(transactionID), // transaction ID - types.FailedEventTypePDU, // type of event that was queued - pdu.EventID(), // event ID - pdu.OriginServerTS(), // event origin server TS - string(serverName), // destination server name - pdu.JSON(), // JSON body + for _, nid := range nids { + if err := d.insertQueuePDU( + ctx, // context + nil, // SQL transaction + transactionID, // transaction ID + serverName, // destination server name + nid, // NID from the federationsender_queue_json table ); err != nil { return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err) } } return nil } + +// GetNextTransactionPDUs retrieves events from the database for +// the next pending transaction, up to the limit specified. +func (d *Database) GetNextTransactionPDUs( + ctx context.Context, + serverName gomatrixserverlib.ServerName, + limit int, +) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error) { + transactionID, err := d.selectQueueNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU) + if err != nil { + return "", nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err) + } + + nids, err := d.selectQueuePDUs(ctx, nil, string(serverName), transactionID, limit) + if err != nil { + return "", nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err) + } + + blobs, err := d.selectJSON(ctx, nil, nids) + if err != nil { + return "", nil, fmt.Errorf("d.selectJSON: %w", err) + } + + var events []*gomatrixserverlib.HeaderedEvent + for _, blob := range blobs { + var event gomatrixserverlib.HeaderedEvent + if err := json.Unmarshal(blob, &event); err != nil { + return "", nil, fmt.Errorf("json.Unmarshal: %w", err) + } + events = append(events, &event) + } + + return gomatrixserverlib.TransactionID(transactionID), events, nil +} + +// CleanTransactionPDUs cleans up all associated events for a +// given transaction. This is done when the transaction was sent +// successfully. +func (d *Database) CleanTransactionPDUs( + ctx context.Context, + serverName gomatrixserverlib.ServerName, + transactionID gomatrixserverlib.TransactionID, +) error { + return d.deleteQueueTransaction(ctx, nil, serverName, transactionID) +}