mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-26 08:13:09 -06:00
Implement Cosmos DB for the AppService (#7)
* - Implement Cosmos for the devices_table - Use the ConnectionString in the YAML to include the Tenant - Revert all other non implemented tables back to use SQLLite3 * - Change the Config to use "test.criticicalarc.com" Container - Add generic function GetDocumentOrNil to standardize GetDocument - Add func to return CrossPartition queries for Aggregates - Add func GetNextSequence() as generic seq generator for AutoIncrement - Add cosmosdbutil.ErrNoRows to return (emulate) sql.ErrNoRows - Add a "fake" ExclusiveWriterFake - Add standard "getXX", "setXX" and "queryXX" to all TABLE class files - Add specific Table SEQ for the Events table - Add specific Table SEQ for the Rooms table - Add specific Table SEQ for the StateSnapshot table * - Use CosmosDB for the KeyServer - Replace the ConnString in the YAML to Cosmos - Update the 4 tables to use Cosmos * - Add SEQ for Event and Counters - Replace SQLLite with Cosmos in Config and Code * - Fix typo
This commit is contained in:
parent
b4382bd8b9
commit
af4219f38e
|
|
@ -19,45 +19,80 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/cosmosdbapi"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const appserviceEventsSchema = `
|
||||
-- Stores events to be sent to application services
|
||||
CREATE TABLE IF NOT EXISTS appservice_events (
|
||||
-- An auto-incrementing id unique to each event in the table
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
-- The ID of the application service the event will be sent to
|
||||
as_id TEXT NOT NULL,
|
||||
-- JSON representation of the event
|
||||
headered_event_json TEXT NOT NULL,
|
||||
-- The ID of the transaction that this event is a part of
|
||||
txn_id INTEGER NOT NULL
|
||||
);
|
||||
// const appserviceEventsSchema = `
|
||||
// -- Stores events to be sent to application services
|
||||
// CREATE TABLE IF NOT EXISTS appservice_events (
|
||||
// -- An auto-incrementing id unique to each event in the table
|
||||
// id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
// -- The ID of the application service the event will be sent to
|
||||
// as_id TEXT NOT NULL,
|
||||
// -- JSON representation of the event
|
||||
// headered_event_json TEXT NOT NULL,
|
||||
// -- The ID of the transaction that this event is a part of
|
||||
// txn_id INTEGER NOT NULL
|
||||
// );
|
||||
|
||||
CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
|
||||
`
|
||||
// CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
|
||||
// `
|
||||
|
||||
type EventCosmos struct {
|
||||
ID int64 `json:"id"`
|
||||
AppServiceID string `json:"as_id"`
|
||||
HeaderedEventJSON []byte `json:"headered_event_json"`
|
||||
TXNID int64 `json:"txn_id"`
|
||||
}
|
||||
|
||||
type EventNumberCosmosData struct {
|
||||
Number int `json:"number"`
|
||||
}
|
||||
|
||||
type EventCosmosData struct {
|
||||
Id string `json:"id"`
|
||||
Pk string `json:"_pk"`
|
||||
Cn string `json:"_cn"`
|
||||
ETag string `json:"_etag"`
|
||||
Timestamp int64 `json:"_ts"`
|
||||
Event EventCosmos `json:"mx_appservice_event"`
|
||||
}
|
||||
|
||||
// "SELECT id, headered_event_json, txn_id " +
|
||||
// "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
|
||||
const selectEventsByApplicationServiceIDSQL = "" +
|
||||
"SELECT id, headered_event_json, txn_id " +
|
||||
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
|
||||
"select * from c where c._cn = @x1 " +
|
||||
"and c.mx_appservice_event.as_id = @x2 " +
|
||||
"order by c.mx_appservice_event.txn_id desc " +
|
||||
"c.mx_appservice_event.id asc"
|
||||
|
||||
// "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
|
||||
const countEventsByApplicationServiceIDSQL = "" +
|
||||
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
|
||||
"select count(c._ts) as number from c where c._cn = @x1 " +
|
||||
"and c.mx_appservice_event.as_id = @x2 "
|
||||
|
||||
const insertEventSQL = "" +
|
||||
"INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " +
|
||||
"VALUES ($1, $2, $3)"
|
||||
// const insertEventSQL = "" +
|
||||
// "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " +
|
||||
// "VALUES ($1, $2, $3)"
|
||||
|
||||
// "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
|
||||
const updateTxnIDForEventsSQL = "" +
|
||||
"UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
|
||||
"select * from c where c._cn = @x1 " +
|
||||
"and c.mx_appservice_event.as_id = @x2 " +
|
||||
"and c.mx_appservice_event.id <= @x3 "
|
||||
|
||||
// "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
|
||||
const deleteEventsBeforeAndIncludingIDSQL = "" +
|
||||
"DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
|
||||
"select * from c where c._cn = @x1 " +
|
||||
"and c.mx_appservice_event.as_id = @x2 " +
|
||||
"and c.mx_appservice_event.id <= @x3 "
|
||||
|
||||
const (
|
||||
// A transaction ID number that no transaction should ever have. Used for
|
||||
|
|
@ -66,42 +101,97 @@ const (
|
|||
)
|
||||
|
||||
type eventsStatements struct {
|
||||
db *sql.DB
|
||||
db *Database
|
||||
writer sqlutil.Writer
|
||||
selectEventsByApplicationServiceIDStmt *sql.Stmt
|
||||
countEventsByApplicationServiceIDStmt *sql.Stmt
|
||||
insertEventStmt *sql.Stmt
|
||||
updateTxnIDForEventsStmt *sql.Stmt
|
||||
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
|
||||
selectEventsByApplicationServiceIDStmt string
|
||||
countEventsByApplicationServiceIDStmt string
|
||||
// insertEventStmt *sql.Stmt
|
||||
updateTxnIDForEventsStmt string
|
||||
deleteEventsBeforeAndIncludingIDStmt string
|
||||
tableName string
|
||||
}
|
||||
|
||||
func (s *eventsStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||
func (s *eventsStatements) prepare(db *Database, writer sqlutil.Writer) (err error) {
|
||||
s.db = db
|
||||
s.writer = writer
|
||||
_, err = db.Exec(appserviceEventsSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.selectEventsByApplicationServiceIDStmt = selectEventsByApplicationServiceIDSQL
|
||||
s.countEventsByApplicationServiceIDStmt = countEventsByApplicationServiceIDSQL
|
||||
s.updateTxnIDForEventsStmt = updateTxnIDForEventsSQL
|
||||
s.deleteEventsBeforeAndIncludingIDStmt = deleteEventsBeforeAndIncludingIDSQL
|
||||
s.tableName = "events"
|
||||
return
|
||||
}
|
||||
|
||||
func queryEvent(s *eventsStatements, ctx context.Context, qry string, params map[string]interface{}) ([]EventCosmosData, error) {
|
||||
var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName)
|
||||
var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.ContainerName, dbCollectionName)
|
||||
var response []EventCosmosData
|
||||
|
||||
var optionsQry = cosmosdbapi.GetQueryDocumentsOptions(pk)
|
||||
var query = cosmosdbapi.GetQuery(qry, params)
|
||||
_, err := cosmosdbapi.GetClient(s.db.connection).QueryDocuments(
|
||||
ctx,
|
||||
s.db.cosmosConfig.DatabaseName,
|
||||
s.db.cosmosConfig.ContainerName,
|
||||
query,
|
||||
&response,
|
||||
optionsQry)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func queryEventEventNumber(s *eventsStatements, ctx context.Context, qry string, params map[string]interface{}) ([]EventNumberCosmosData, error) {
|
||||
var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName)
|
||||
var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.ContainerName, dbCollectionName)
|
||||
var response []EventNumberCosmosData
|
||||
|
||||
var optionsQry = cosmosdbapi.GetQueryDocumentsOptions(pk)
|
||||
var query = cosmosdbapi.GetQuery(qry, params)
|
||||
_, err := cosmosdbapi.GetClient(s.db.connection).QueryDocuments(
|
||||
ctx,
|
||||
s.db.cosmosConfig.DatabaseName,
|
||||
s.db.cosmosConfig.ContainerName,
|
||||
query,
|
||||
&response,
|
||||
optionsQry)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func setEvent(s *eventsStatements, ctx context.Context, event EventCosmosData) (*EventCosmosData, error) {
|
||||
var optionsReplace = cosmosdbapi.GetReplaceDocumentOptions(event.Pk, event.ETag)
|
||||
var _, _, ex = cosmosdbapi.GetClient(s.db.connection).ReplaceDocument(
|
||||
ctx,
|
||||
s.db.cosmosConfig.DatabaseName,
|
||||
s.db.cosmosConfig.ContainerName,
|
||||
event.Id,
|
||||
&event,
|
||||
optionsReplace)
|
||||
return &event, ex
|
||||
}
|
||||
|
||||
func deleteEvent(s *eventsStatements, ctx context.Context, event EventCosmosData) error {
|
||||
var options = cosmosdbapi.GetDeleteDocumentOptions(event.Pk)
|
||||
var _, err = cosmosdbapi.GetClient(s.db.connection).DeleteDocument(
|
||||
ctx,
|
||||
s.db.cosmosConfig.DatabaseName,
|
||||
s.db.cosmosConfig.ContainerName,
|
||||
event.Id,
|
||||
options)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// selectEventsByApplicationServiceID takes in an application service ID and
|
||||
// returns a slice of events that need to be sent to that application service,
|
||||
// as well as an int later used to remove these same events from the database
|
||||
|
|
@ -116,19 +206,24 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
|
|||
eventsRemaining bool,
|
||||
err error,
|
||||
) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"appservice": applicationServiceID,
|
||||
}).WithError(err).Fatalf("appservice unable to select new events to send")
|
||||
}
|
||||
}()
|
||||
// Retrieve events from the database. Unsuccessfully sent events first
|
||||
eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
// "SELECT id, headered_event_json, txn_id " +
|
||||
// "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
|
||||
|
||||
var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName)
|
||||
params := map[string]interface{}{
|
||||
"@x1": dbCollectionName,
|
||||
"@x2": applicationServiceID,
|
||||
}
|
||||
defer checkNamedErr(eventRows.Close, &err)
|
||||
|
||||
eventRows, err := queryEvent(s, ctx, s.selectEventsByApplicationServiceIDStmt, params)
|
||||
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"appservice": applicationServiceID,
|
||||
}).WithError(err).Fatalf("appservice unable to select new events to send")
|
||||
}
|
||||
|
||||
events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
@ -144,7 +239,7 @@ func checkNamedErr(fn func() error, err *error) {
|
|||
}
|
||||
}
|
||||
|
||||
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) {
|
||||
func retrieveEvents(eventRows []EventCosmosData, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) {
|
||||
// Get current time for use in calculating event age
|
||||
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
|
||||
|
|
@ -152,15 +247,14 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.
|
|||
// If txn_id changes dramatically, we've switched from collecting old events to
|
||||
// new ones. Send back those events first.
|
||||
lastTxnID := invalidTxnID
|
||||
for eventsProcessed := 0; eventRows.Next(); {
|
||||
for eventsProcessed := 0; eventsProcessed < len(eventRows); {
|
||||
var event gomatrixserverlib.HeaderedEvent
|
||||
var eventJSON []byte
|
||||
var id int
|
||||
err = eventRows.Scan(
|
||||
&id,
|
||||
&eventJSON,
|
||||
&txnID,
|
||||
)
|
||||
item := eventRows[eventsProcessed]
|
||||
id = int(item.Event.ID)
|
||||
eventJSON = item.Event.HeaderedEventJSON
|
||||
txnID = int(item.Event.TXNID)
|
||||
if err != nil {
|
||||
return nil, 0, 0, false, err
|
||||
}
|
||||
|
|
@ -208,10 +302,21 @@ func (s *eventsStatements) countEventsByApplicationServiceID(
|
|||
appServiceID string,
|
||||
) (int, error) {
|
||||
var count int
|
||||
err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
|
||||
|
||||
// "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
|
||||
|
||||
var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName)
|
||||
params := map[string]interface{}{
|
||||
"@x1": dbCollectionName,
|
||||
"@x2": appServiceID,
|
||||
}
|
||||
|
||||
response, err := queryEventEventNumber(s, ctx, s.countEventsByApplicationServiceIDStmt, params)
|
||||
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return 0, err
|
||||
}
|
||||
count = response[0].Number
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
|
@ -229,15 +334,48 @@ func (s *eventsStatements) insertEvent(
|
|||
return err
|
||||
}
|
||||
|
||||
return s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
|
||||
_, err := s.insertEventStmt.ExecContext(
|
||||
ctx,
|
||||
appServiceID,
|
||||
eventJSON,
|
||||
-1, // No transaction ID yet
|
||||
)
|
||||
return err
|
||||
})
|
||||
// "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " +
|
||||
// "VALUES ($1, $2, $3)"
|
||||
|
||||
// id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
idSeq, seqErr := GetNextEventID(s, ctx)
|
||||
if seqErr != nil {
|
||||
return seqErr
|
||||
}
|
||||
|
||||
var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName)
|
||||
docId := fmt.Sprintf("%d", idSeq)
|
||||
cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.ContainerName, dbCollectionName, docId)
|
||||
pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.ContainerName, dbCollectionName)
|
||||
|
||||
// appServiceID,
|
||||
// eventJSON,
|
||||
// -1, // No transaction ID yet
|
||||
data := EventCosmos{
|
||||
AppServiceID: appServiceID,
|
||||
HeaderedEventJSON: eventJSON,
|
||||
ID: idSeq,
|
||||
TXNID: -1,
|
||||
}
|
||||
|
||||
dbData := &EventCosmosData{
|
||||
Id: cosmosDocId,
|
||||
Cn: dbCollectionName,
|
||||
Pk: pk,
|
||||
Timestamp: time.Now().Unix(),
|
||||
Event: data,
|
||||
}
|
||||
|
||||
var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk)
|
||||
_, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument(
|
||||
ctx,
|
||||
s.db.cosmosConfig.DatabaseName,
|
||||
s.db.cosmosConfig.ContainerName,
|
||||
&dbData,
|
||||
options)
|
||||
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
// updateTxnIDForEvents sets the transactionID for a collection of events. Done
|
||||
|
|
@ -248,10 +386,27 @@ func (s *eventsStatements) updateTxnIDForEvents(
|
|||
appserviceID string,
|
||||
maxID, txnID int,
|
||||
) (err error) {
|
||||
return s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
|
||||
_, err := s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
|
||||
// "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
|
||||
|
||||
var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName)
|
||||
params := map[string]interface{}{
|
||||
"@x1": dbCollectionName,
|
||||
"@x2": appserviceID,
|
||||
"@x3": maxID,
|
||||
}
|
||||
|
||||
response, err := queryEvent(s, ctx, s.updateTxnIDForEventsStmt, params)
|
||||
if err != nil {
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
for _, item := range response {
|
||||
item.Event.TXNID = int64(txnID)
|
||||
// _, err := s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
|
||||
_, err = setEvent(s, ctx, item)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
|
||||
|
|
@ -260,8 +415,23 @@ func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
|
|||
appserviceID string,
|
||||
eventTableID int,
|
||||
) (err error) {
|
||||
return s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
|
||||
_, err := s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
|
||||
// "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
|
||||
|
||||
var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName)
|
||||
params := map[string]interface{}{
|
||||
"@x1": dbCollectionName,
|
||||
"@x2": appserviceID,
|
||||
"@x3": eventTableID,
|
||||
}
|
||||
|
||||
response, err := queryEvent(s, ctx, s.deleteEventsBeforeAndIncludingIDStmt, params)
|
||||
if err != nil {
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
for _, item := range response {
|
||||
// _, err := s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
|
||||
err = deleteEvent(s, ctx, item)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,12 @@
|
|||
package cosmosdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/cosmosdbutil"
|
||||
)
|
||||
|
||||
func GetNextEventID(s *eventsStatements, ctx context.Context) (int64, error) {
|
||||
const docId = "id_seq"
|
||||
return cosmosdbutil.GetNextSequence(ctx, s.db.connection, s.db.cosmosConfig, s.db.databaseName, s.tableName, docId, 1)
|
||||
}
|
||||
|
|
@ -17,7 +17,10 @@ package cosmosdb
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/cosmosdbapi"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/cosmosdbutil"
|
||||
|
||||
// Import SQLite database driver
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
|
@ -29,35 +32,38 @@ import (
|
|||
// Database stores events intended to be later sent to application services
|
||||
type Database struct {
|
||||
sqlutil.PartitionOffsetStatements
|
||||
events eventsStatements
|
||||
txnID txnStatements
|
||||
db *sql.DB
|
||||
writer sqlutil.Writer
|
||||
events eventsStatements
|
||||
txnID txnStatements
|
||||
writer cosmosdbutil.Writer
|
||||
connection cosmosdbapi.CosmosConnection
|
||||
databaseName string
|
||||
cosmosConfig cosmosdbapi.CosmosConfig
|
||||
serverName gomatrixserverlib.ServerName
|
||||
}
|
||||
|
||||
// NewDatabase opens a new database
|
||||
func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||
var result Database
|
||||
var err error
|
||||
if result.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||
return nil, err
|
||||
conn := cosmosdbutil.GetCosmosConnection(&dbProperties.ConnectionString)
|
||||
config := cosmosdbutil.GetCosmosConfig(&dbProperties.ConnectionString)
|
||||
result := &Database{
|
||||
databaseName: "appservice",
|
||||
connection: conn,
|
||||
cosmosConfig: config,
|
||||
}
|
||||
result.writer = sqlutil.NewExclusiveWriter()
|
||||
var err error
|
||||
result.writer = cosmosdbutil.NewExclusiveWriterFake()
|
||||
if err = result.prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (d *Database) prepare() error {
|
||||
if err := d.events.prepare(d.db, d.writer); err != nil {
|
||||
if err := d.events.prepare(nil, d.writer); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.txnID.prepare(d.db, d.writer)
|
||||
return d.txnID.prepare(nil, d.writer)
|
||||
}
|
||||
|
||||
// StoreEvent takes in a gomatrixserverlib.HeaderedEvent and stores it in the database
|
||||
|
|
|
|||
|
|
@ -22,38 +22,32 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
const txnIDSchema = `
|
||||
-- Keeps a count of the current transaction ID
|
||||
CREATE TABLE IF NOT EXISTS appservice_counters (
|
||||
name TEXT PRIMARY KEY NOT NULL,
|
||||
last_id INTEGER DEFAULT 1
|
||||
);
|
||||
INSERT OR IGNORE INTO appservice_counters (name, last_id) VALUES('txn_id', 1);
|
||||
`
|
||||
// const txnIDSchema = `
|
||||
// -- Keeps a count of the current transaction ID
|
||||
// CREATE TABLE IF NOT EXISTS appservice_counters (
|
||||
// name TEXT PRIMARY KEY NOT NULL,
|
||||
// last_id INTEGER DEFAULT 1
|
||||
// );
|
||||
// INSERT OR IGNORE INTO appservice_counters (name, last_id) VALUES('txn_id', 1);
|
||||
// `
|
||||
|
||||
const selectTxnIDSQL = `
|
||||
SELECT last_id FROM appservice_counters WHERE name='txn_id';
|
||||
UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id';
|
||||
`
|
||||
// const selectTxnIDSQL = `
|
||||
// SELECT last_id FROM appservice_counters WHERE name='txn_id';
|
||||
// UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id';
|
||||
// `
|
||||
|
||||
type txnStatements struct {
|
||||
db *sql.DB
|
||||
db *Database
|
||||
writer sqlutil.Writer
|
||||
selectTxnIDStmt *sql.Stmt
|
||||
tableName string
|
||||
}
|
||||
|
||||
func (s *txnStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||
func (s *txnStatements) prepare(db *Database, writer sqlutil.Writer) (err error) {
|
||||
s.db = db
|
||||
s.writer = writer
|
||||
_, err = db.Exec(txnIDSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
//Only used for the seq generation
|
||||
s.tableName = "counters"
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -61,9 +55,5 @@ func (s *txnStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
|||
func (s *txnStatements) selectTxnID(
|
||||
ctx context.Context,
|
||||
) (txnID int, err error) {
|
||||
err = s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
|
||||
err := s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
|
||||
return err
|
||||
})
|
||||
return
|
||||
return GetNextCounterTXNID(s, ctx)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,16 @@
|
|||
package cosmosdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/cosmosdbutil"
|
||||
)
|
||||
|
||||
func GetNextCounterTXNID(s *txnStatements, ctx context.Context) (int, error) {
|
||||
const docId = "txn_id_seq"
|
||||
result, err := cosmosdbutil.GetNextSequence(ctx, s.db.connection, s.db.cosmosConfig, s.db.databaseName, s.tableName, docId, 1)
|
||||
if(err != nil) {
|
||||
return -1, err
|
||||
}
|
||||
return int(result), err
|
||||
}
|
||||
|
|
@ -122,7 +122,7 @@ app_service_api:
|
|||
listen: http://localhost:7777
|
||||
connect: http://localhost:7777
|
||||
database:
|
||||
connection_string: file:appservice.db
|
||||
connection_string: "cosmosdb:AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;DatabaseName=safezone_local;ContainerName=test.criticalarc.com;"
|
||||
max_open_conns: 10
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
|
|
|||
Loading…
Reference in a new issue