mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-12 17:33:09 -06:00
Switch to using a sequence for transaction IDs
This commit is contained in:
parent
801b58c927
commit
54e953f149
|
|
@ -102,22 +102,9 @@ func (d *Database) RemoveEventsBeforeAndIncludingID(
|
||||||
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
|
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetTxnIDWithAppServiceID takes in an application service ID and returns the
|
// GetLatestTxnID returns the latest available transaction id
|
||||||
// last used transaction ID associated with it.
|
func (d *Database) GetLatestTxnID(
|
||||||
func (d *Database) GetTxnIDWithAppServiceID(
|
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
appServiceID string,
|
|
||||||
) (int, error) {
|
) (int, error) {
|
||||||
return d.txnID.selectTxnID(ctx, appServiceID)
|
return d.txnID.selectTxnID(ctx)
|
||||||
}
|
|
||||||
|
|
||||||
// UpsertTxnIDWithAppServiceID takes in an application service ID and a
|
|
||||||
// transaction ID and stores them in the DB, unless the pair already exists, in
|
|
||||||
// which case it updates them.
|
|
||||||
func (d *Database) UpsertTxnIDWithAppServiceID(
|
|
||||||
ctx context.Context,
|
|
||||||
appServiceID string,
|
|
||||||
txnID int,
|
|
||||||
) error {
|
|
||||||
return d.txnID.upsertTxnID(ctx, appServiceID, txnID)
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,26 +20,14 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const txnIDSchema = `
|
const txnIDSchema = `
|
||||||
-- Keeps a count of the current transaction ID per application service
|
-- Keeps a count of the current transaction ID
|
||||||
CREATE TABLE IF NOT EXISTS txn_id_counter (
|
CREATE SEQUENCE IF NOT EXISTS txn_id_counter START 1;
|
||||||
-- The ID of the application service the this count belongs to
|
|
||||||
as_id TEXT NOT NULL PRIMARY KEY,
|
|
||||||
-- The last-used transaction ID
|
|
||||||
txn_id INTEGER NOT NULL
|
|
||||||
);
|
|
||||||
`
|
`
|
||||||
|
|
||||||
const selectTxnIDSQL = "" +
|
const selectTxnIDSQL = "SELECT nextval('txn_id_counter')"
|
||||||
"SELECT txn_id FROM txn_id_counter WHERE as_id = $1"
|
|
||||||
|
|
||||||
const upsertTxnIDSQL = "" +
|
|
||||||
"INSERT INTO txn_id_counter(as_id, txn_id) VALUES ($1, $2) " +
|
|
||||||
"ON CONFLICT (as_id) DO UPDATE " +
|
|
||||||
"SET txn_id = $2"
|
|
||||||
|
|
||||||
type txnStatements struct {
|
type txnStatements struct {
|
||||||
selectTxnIDStmt *sql.Stmt
|
selectTxnIDStmt *sql.Stmt
|
||||||
upsertTxnIDStmt *sql.Stmt
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *txnStatements) prepare(db *sql.DB) (err error) {
|
func (s *txnStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
|
@ -51,30 +39,14 @@ func (s *txnStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
|
if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if s.upsertTxnIDStmt, err = db.Prepare(upsertTxnIDSQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectTxnID inserts a new transactionID mapped to its corresponding
|
// selectTxnID selects the latest ascending transaction ID
|
||||||
// application service ID into the db.
|
|
||||||
func (s *txnStatements) selectTxnID(
|
func (s *txnStatements) selectTxnID(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
appServiceID string,
|
|
||||||
) (txnID int, err error) {
|
) (txnID int, err error) {
|
||||||
err = s.selectTxnIDStmt.QueryRowContext(ctx, appServiceID).Scan(&txnID)
|
err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// upsertTxnID inserts or updates on existing rows a new transactionID mapped to
|
|
||||||
// its corresponding application service ID into the db.
|
|
||||||
func (s *txnStatements) upsertTxnID(
|
|
||||||
ctx context.Context,
|
|
||||||
appServiceID string,
|
|
||||||
txnID int,
|
|
||||||
) (err error) {
|
|
||||||
_, err = s.upsertTxnIDStmt.ExecContext(ctx, appServiceID, txnID)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,6 @@ package workers
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
|
@ -36,8 +35,6 @@ var (
|
||||||
transactionBatchSize = 50
|
transactionBatchSize = 50
|
||||||
// Timeout for sending a single transaction to an application service.
|
// Timeout for sending a single transaction to an application service.
|
||||||
transactionTimeout = time.Second * 60
|
transactionTimeout = time.Second * 60
|
||||||
// The current transaction ID. Increments after every successful transaction.
|
|
||||||
currentTransactionID = 0
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetupTransactionWorkers spawns a separate goroutine for each application
|
// SetupTransactionWorkers spawns a separate goroutine for each application
|
||||||
|
|
@ -67,16 +64,6 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
|
||||||
}).Info("starting application service")
|
}).Info("starting application service")
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Initialize transaction ID counter
|
|
||||||
var err error
|
|
||||||
currentTransactionID, err = db.GetTxnIDWithAppServiceID(ctx, ws.AppService.ID)
|
|
||||||
if err != nil && err != sql.ErrNoRows {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"appservice": ws.AppService.ID,
|
|
||||||
}).WithError(err).Fatal("appservice worker unable to get latest transaction ID from DB")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Grab the HTTP client for sending requests to app services
|
// Grab the HTTP client for sending requests to app services
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Timeout: transactionTimeout,
|
Timeout: transactionTimeout,
|
||||||
|
|
@ -133,15 +120,6 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
|
||||||
}).WithError(err).Fatal("unable to remove appservice events from the database")
|
}).WithError(err).Fatal("unable to remove appservice events from the database")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update transactionID
|
|
||||||
currentTransactionID++
|
|
||||||
if err = db.UpsertTxnIDWithAppServiceID(ctx, ws.AppService.ID, currentTransactionID); err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"appservice": ws.AppService.ID,
|
|
||||||
}).WithError(err).Fatal("unable to update transaction ID")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -186,14 +164,18 @@ func createTransaction(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if these events already have a transaction ID
|
// Check if these events do not already have a transaction ID
|
||||||
if txnID == -1 {
|
if txnID == -1 {
|
||||||
txnID = currentTransactionID
|
// If not, grab next available ID from the DB
|
||||||
// Mark new events with current transactionID
|
txnID, err = db.GetLatestTxnID(ctx)
|
||||||
err := db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, currentTransactionID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, 0, nil, err
|
return 0, 0, 0, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark new events with current transactionID
|
||||||
|
if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil {
|
||||||
|
return 0, 0, 0, nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a transaction and store the events inside
|
// Create a transaction and store the events inside
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue