From 5082179ce39de71df9e8ecd924dd9104f60195aa Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 30 Jun 2020 14:19:49 +0100 Subject: [PATCH] Don't error on now rows --- federationsender/storage/postgres/queue_pdus_table.go | 9 ++++++--- federationsender/storage/postgres/storage.go | 10 +++++++--- federationsender/storage/sqlite3/queue_pdus_table.go | 9 ++++++--- federationsender/storage/sqlite3/storage.go | 10 +++++++--- 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/federationsender/storage/postgres/queue_pdus_table.go b/federationsender/storage/postgres/queue_pdus_table.go index a0a14a6ef..4ba22bc20 100644 --- a/federationsender/storage/postgres/queue_pdus_table.go +++ b/federationsender/storage/postgres/queue_pdus_table.go @@ -54,7 +54,7 @@ const selectQueueNextTransactionIDSQL = "" + const selectQueuePDUsByTransactionSQL = "" + "SELECT json_nid FROM federationsender_queue_pdus" + " WHERE server_name = $1 AND transaction_id = $2" + - " LIMIT 50" + " LIMIT $3" type queuePDUsStatements struct { insertQueuePDUStmt *sql.Stmt @@ -111,11 +111,14 @@ func (s *queuePDUsStatements) deleteQueueTransaction( } func (s *queuePDUsStatements) selectQueueNextTransactionID( - ctx context.Context, txn *sql.Tx, serverName, sendType string, + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ) (string, error) { var transactionID string stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt) err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID) + if err == sql.ErrNoRows { + return "", nil + } return transactionID, err } @@ -123,7 +126,7 @@ func (s *queuePDUsStatements) selectQueuePDUs( ctx context.Context, txn *sql.Tx, serverName string, transactionID string, limit int, ) ([]int64, error) { stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) - rows, err := stmt.QueryContext(ctx, serverName, transactionID) + rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit) if err != nil { return nil, err } diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index 7a64d4573..fb69a2f0f 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -177,14 +177,18 @@ func (d *Database) GetNextTransactionPDUs( serverName gomatrixserverlib.ServerName, limit int, ) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error) { - transactionID, err := d.selectQueueNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU) + transactionID, err := d.selectQueueNextTransactionID(ctx, nil, serverName) if err != nil { - return "", nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err) + return "", nil, fmt.Errorf("d.selectQueueNextTransactionID: %w", err) + } + + if transactionID == "" { + return "", nil, nil } nids, err := d.selectQueuePDUs(ctx, nil, string(serverName), transactionID, limit) if err != nil { - return "", nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err) + return "", nil, fmt.Errorf("d.selectQueuePDUs: %w", err) } blobs, err := d.selectJSON(ctx, nil, nids) diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go index 1742ec073..5bfa528e1 100644 --- a/federationsender/storage/sqlite3/queue_pdus_table.go +++ b/federationsender/storage/sqlite3/queue_pdus_table.go @@ -54,7 +54,7 @@ const selectQueueNextTransactionIDSQL = "" + const selectQueuePDUsByTransactionSQL = "" + "SELECT json_nid FROM federationsender_queue_pdus" + " WHERE server_name = $1 AND transaction_id = $2" + - " LIMIT 50" + " LIMIT $3" type queuePDUsStatements struct { insertQueuePDUStmt *sql.Stmt @@ -111,11 +111,14 @@ func (s *queuePDUsStatements) deleteQueueTransaction( } func (s *queuePDUsStatements) selectQueueNextTransactionID( - ctx context.Context, txn *sql.Tx, serverName, sendType string, + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ) (string, error) { var transactionID string stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt) err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID) + if err == sql.ErrNoRows { + return "", nil + } return transactionID, err } @@ -123,7 +126,7 @@ func (s *queuePDUsStatements) selectQueuePDUs( ctx context.Context, txn *sql.Tx, serverName string, transactionID string, limit int, ) ([]int64, error) { stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) - rows, err := stmt.QueryContext(ctx, serverName, transactionID) + rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit) if err != nil { return nil, err } diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 7aab83fba..f5adaa10b 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -183,14 +183,18 @@ func (d *Database) GetNextTransactionPDUs( serverName gomatrixserverlib.ServerName, limit int, ) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error) { - transactionID, err := d.selectQueueNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU) + transactionID, err := d.selectQueueNextTransactionID(ctx, nil, serverName) if err != nil { - return "", nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err) + return "", nil, fmt.Errorf("d.selectQueueNextTransactionID: %w", err) + } + + if transactionID == "" { + return "", nil, nil } nids, err := d.selectQueuePDUs(ctx, nil, string(serverName), transactionID, limit) if err != nil { - return "", nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err) + return "", nil, fmt.Errorf("d.selectQueuePDUs: %w", err) } blobs, err := d.selectJSON(ctx, nil, nids)