From 54e953f14955761583604a595ac132e334ee3f6c Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 27 Jun 2018 17:05:41 +0100 Subject: [PATCH] Switch to using a sequence for transaction IDs --- .../dendrite/appservice/storage/storage.go | 19 ++-------- .../storage/txn_id_counter_table.go | 38 +++---------------- .../workers/transaction_scheduler.go | 34 ++++------------- 3 files changed, 16 insertions(+), 75 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go index 16f6f5ad6..19a7ce6b9 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go @@ -102,22 +102,9 @@ func (d *Database) RemoveEventsBeforeAndIncludingID( return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID) } -// GetTxnIDWithAppServiceID takes in an application service ID and returns the -// last used transaction ID associated with it. -func (d *Database) GetTxnIDWithAppServiceID( +// GetLatestTxnID returns the latest available transaction id +func (d *Database) GetLatestTxnID( ctx context.Context, - appServiceID string, ) (int, error) { - return d.txnID.selectTxnID(ctx, appServiceID) -} - -// UpsertTxnIDWithAppServiceID takes in an application service ID and a -// transaction ID and stores them in the DB, unless the pair already exists, in -// which case it updates them. -func (d *Database) UpsertTxnIDWithAppServiceID( - ctx context.Context, - appServiceID string, - txnID int, -) error { - return d.txnID.upsertTxnID(ctx, appServiceID, txnID) + return d.txnID.selectTxnID(ctx) } diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go index ac0fad2b6..7b0fa3786 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go @@ -20,26 +20,14 @@ import ( ) const txnIDSchema = ` --- Keeps a count of the current transaction ID per application service -CREATE TABLE IF NOT EXISTS txn_id_counter ( - -- The ID of the application service the this count belongs to - as_id TEXT NOT NULL PRIMARY KEY, - -- The last-used transaction ID - txn_id INTEGER NOT NULL -); +-- Keeps a count of the current transaction ID +CREATE SEQUENCE IF NOT EXISTS txn_id_counter START 1; ` -const selectTxnIDSQL = "" + - "SELECT txn_id FROM txn_id_counter WHERE as_id = $1" - -const upsertTxnIDSQL = "" + - "INSERT INTO txn_id_counter(as_id, txn_id) VALUES ($1, $2) " + - "ON CONFLICT (as_id) DO UPDATE " + - "SET txn_id = $2" +const selectTxnIDSQL = "SELECT nextval('txn_id_counter')" type txnStatements struct { selectTxnIDStmt *sql.Stmt - upsertTxnIDStmt *sql.Stmt } func (s *txnStatements) prepare(db *sql.DB) (err error) { @@ -51,30 +39,14 @@ func (s *txnStatements) prepare(db *sql.DB) (err error) { if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil { return } - if s.upsertTxnIDStmt, err = db.Prepare(upsertTxnIDSQL); err != nil { - return - } return } -// selectTxnID inserts a new transactionID mapped to its corresponding -// application service ID into the db. +// selectTxnID selects the latest ascending transaction ID func (s *txnStatements) selectTxnID( ctx context.Context, - appServiceID string, ) (txnID int, err error) { - err = s.selectTxnIDStmt.QueryRowContext(ctx, appServiceID).Scan(&txnID) - return -} - -// upsertTxnID inserts or updates on existing rows a new transactionID mapped to -// its corresponding application service ID into the db. -func (s *txnStatements) upsertTxnID( - ctx context.Context, - appServiceID string, - txnID int, -) (err error) { - _, err = s.upsertTxnIDStmt.ExecContext(ctx, appServiceID, txnID) + err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID) return } diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index 04291a699..ba6277e58 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -17,7 +17,6 @@ package workers import ( "bytes" "context" - "database/sql" "encoding/json" "fmt" "math" @@ -36,8 +35,6 @@ var ( transactionBatchSize = 50 // Timeout for sending a single transaction to an application service. transactionTimeout = time.Second * 60 - // The current transaction ID. Increments after every successful transaction. - currentTransactionID = 0 ) // SetupTransactionWorkers spawns a separate goroutine for each application @@ -67,16 +64,6 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { }).Info("starting application service") ctx := context.Background() - // Initialize transaction ID counter - var err error - currentTransactionID, err = db.GetTxnIDWithAppServiceID(ctx, ws.AppService.ID) - if err != nil && err != sql.ErrNoRows { - log.WithFields(log.Fields{ - "appservice": ws.AppService.ID, - }).WithError(err).Fatal("appservice worker unable to get latest transaction ID from DB") - return - } - // Grab the HTTP client for sending requests to app services client := &http.Client{ Timeout: transactionTimeout, @@ -133,15 +120,6 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { }).WithError(err).Fatal("unable to remove appservice events from the database") return } - - // Update transactionID - currentTransactionID++ - if err = db.UpsertTxnIDWithAppServiceID(ctx, ws.AppService.ID, currentTransactionID); err != nil { - log.WithFields(log.Fields{ - "appservice": ws.AppService.ID, - }).WithError(err).Fatal("unable to update transaction ID") - return - } } } @@ -186,14 +164,18 @@ func createTransaction( return } - // Check if these events already have a transaction ID + // Check if these events do not already have a transaction ID if txnID == -1 { - txnID = currentTransactionID - // Mark new events with current transactionID - err := db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, currentTransactionID) + // If not, grab next available ID from the DB + txnID, err = db.GetLatestTxnID(ctx) if err != nil { return 0, 0, 0, nil, err } + + // Mark new events with current transactionID + if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil { + return 0, 0, 0, nil, err + } } // Create a transaction and store the events inside