Don't error on now rows

This commit is contained in:
Neil Alexander 2020-06-30 14:19:49 +01:00
parent 2caaaf7a68
commit 5082179ce3
4 changed files with 26 additions and 12 deletions

View file

@ -54,7 +54,7 @@ const selectQueueNextTransactionIDSQL = "" +
const selectQueuePDUsByTransactionSQL = "" + const selectQueuePDUsByTransactionSQL = "" +
"SELECT json_nid FROM federationsender_queue_pdus" + "SELECT json_nid FROM federationsender_queue_pdus" +
" WHERE server_name = $1 AND transaction_id = $2" + " WHERE server_name = $1 AND transaction_id = $2" +
" LIMIT 50" " LIMIT $3"
type queuePDUsStatements struct { type queuePDUsStatements struct {
insertQueuePDUStmt *sql.Stmt insertQueuePDUStmt *sql.Stmt
@ -111,11 +111,14 @@ func (s *queuePDUsStatements) deleteQueueTransaction(
} }
func (s *queuePDUsStatements) selectQueueNextTransactionID( func (s *queuePDUsStatements) selectQueueNextTransactionID(
ctx context.Context, txn *sql.Tx, serverName, sendType string, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) (string, error) { ) (string, error) {
var transactionID string var transactionID string
stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt) stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt)
err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID) err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID)
if err == sql.ErrNoRows {
return "", nil
}
return transactionID, err return transactionID, err
} }
@ -123,7 +126,7 @@ func (s *queuePDUsStatements) selectQueuePDUs(
ctx context.Context, txn *sql.Tx, serverName string, transactionID string, limit int, ctx context.Context, txn *sql.Tx, serverName string, transactionID string, limit int,
) ([]int64, error) { ) ([]int64, error) {
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)
rows, err := stmt.QueryContext(ctx, serverName, transactionID) rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -177,14 +177,18 @@ func (d *Database) GetNextTransactionPDUs(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error) { ) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error) {
transactionID, err := d.selectQueueNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU) transactionID, err := d.selectQueueNextTransactionID(ctx, nil, serverName)
if err != nil { if err != nil {
return "", nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err) return "", nil, fmt.Errorf("d.selectQueueNextTransactionID: %w", err)
}
if transactionID == "" {
return "", nil, nil
} }
nids, err := d.selectQueuePDUs(ctx, nil, string(serverName), transactionID, limit) nids, err := d.selectQueuePDUs(ctx, nil, string(serverName), transactionID, limit)
if err != nil { if err != nil {
return "", nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err) return "", nil, fmt.Errorf("d.selectQueuePDUs: %w", err)
} }
blobs, err := d.selectJSON(ctx, nil, nids) blobs, err := d.selectJSON(ctx, nil, nids)

View file

@ -54,7 +54,7 @@ const selectQueueNextTransactionIDSQL = "" +
const selectQueuePDUsByTransactionSQL = "" + const selectQueuePDUsByTransactionSQL = "" +
"SELECT json_nid FROM federationsender_queue_pdus" + "SELECT json_nid FROM federationsender_queue_pdus" +
" WHERE server_name = $1 AND transaction_id = $2" + " WHERE server_name = $1 AND transaction_id = $2" +
" LIMIT 50" " LIMIT $3"
type queuePDUsStatements struct { type queuePDUsStatements struct {
insertQueuePDUStmt *sql.Stmt insertQueuePDUStmt *sql.Stmt
@ -111,11 +111,14 @@ func (s *queuePDUsStatements) deleteQueueTransaction(
} }
func (s *queuePDUsStatements) selectQueueNextTransactionID( func (s *queuePDUsStatements) selectQueueNextTransactionID(
ctx context.Context, txn *sql.Tx, serverName, sendType string, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) (string, error) { ) (string, error) {
var transactionID string var transactionID string
stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt) stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt)
err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID) err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID)
if err == sql.ErrNoRows {
return "", nil
}
return transactionID, err return transactionID, err
} }
@ -123,7 +126,7 @@ func (s *queuePDUsStatements) selectQueuePDUs(
ctx context.Context, txn *sql.Tx, serverName string, transactionID string, limit int, ctx context.Context, txn *sql.Tx, serverName string, transactionID string, limit int,
) ([]int64, error) { ) ([]int64, error) {
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)
rows, err := stmt.QueryContext(ctx, serverName, transactionID) rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -183,14 +183,18 @@ func (d *Database) GetNextTransactionPDUs(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error) { ) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error) {
transactionID, err := d.selectQueueNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU) transactionID, err := d.selectQueueNextTransactionID(ctx, nil, serverName)
if err != nil { if err != nil {
return "", nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err) return "", nil, fmt.Errorf("d.selectQueueNextTransactionID: %w", err)
}
if transactionID == "" {
return "", nil, nil
} }
nids, err := d.selectQueuePDUs(ctx, nil, string(serverName), transactionID, limit) nids, err := d.selectQueuePDUs(ctx, nil, string(serverName), transactionID, limit)
if err != nil { if err != nil {
return "", nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err) return "", nil, fmt.Errorf("d.selectQueuePDUs: %w", err)
} }
blobs, err := d.selectJSON(ctx, nil, nids) blobs, err := d.selectJSON(ctx, nil, nids)