diff --git a/appservice/storage/cosmosdb/appservice_events_table.go b/appservice/storage/cosmosdb/appservice_events_table.go index f69940870..846fb75b6 100644 --- a/appservice/storage/cosmosdb/appservice_events_table.go +++ b/appservice/storage/cosmosdb/appservice_events_table.go @@ -19,45 +19,80 @@ import ( "context" "database/sql" "encoding/json" + "fmt" "time" + "github.com/matrix-org/dendrite/internal/cosmosdbapi" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" ) -const appserviceEventsSchema = ` --- Stores events to be sent to application services -CREATE TABLE IF NOT EXISTS appservice_events ( - -- An auto-incrementing id unique to each event in the table - id INTEGER PRIMARY KEY AUTOINCREMENT, - -- The ID of the application service the event will be sent to - as_id TEXT NOT NULL, - -- JSON representation of the event - headered_event_json TEXT NOT NULL, - -- The ID of the transaction that this event is a part of - txn_id INTEGER NOT NULL -); +// const appserviceEventsSchema = ` +// -- Stores events to be sent to application services +// CREATE TABLE IF NOT EXISTS appservice_events ( +// -- An auto-incrementing id unique to each event in the table +// id INTEGER PRIMARY KEY AUTOINCREMENT, +// -- The ID of the application service the event will be sent to +// as_id TEXT NOT NULL, +// -- JSON representation of the event +// headered_event_json TEXT NOT NULL, +// -- The ID of the transaction that this event is a part of +// txn_id INTEGER NOT NULL +// ); -CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); -` +// CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); +// ` +type EventCosmos struct { + ID int64 `json:"id"` + AppServiceID string `json:"as_id"` + HeaderedEventJSON []byte `json:"headered_event_json"` + TXNID int64 `json:"txn_id"` +} + +type EventNumberCosmosData struct { + Number int `json:"number"` +} + +type EventCosmosData struct { + Id string `json:"id"` + Pk string `json:"_pk"` + Cn string `json:"_cn"` + ETag string `json:"_etag"` + Timestamp int64 `json:"_ts"` + Event EventCosmos `json:"mx_appservice_event"` +} + +// "SELECT id, headered_event_json, txn_id " + +// "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" const selectEventsByApplicationServiceIDSQL = "" + - "SELECT id, headered_event_json, txn_id " + - "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" + "select * from c where c._cn = @x1 " + + "and c.mx_appservice_event.as_id = @x2 " + + "order by c.mx_appservice_event.txn_id desc " + + "c.mx_appservice_event.id asc" +// "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" const countEventsByApplicationServiceIDSQL = "" + - "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" + "select count(c._ts) as number from c where c._cn = @x1 " + + "and c.mx_appservice_event.as_id = @x2 " -const insertEventSQL = "" + - "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " + - "VALUES ($1, $2, $3)" +// const insertEventSQL = "" + +// "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " + +// "VALUES ($1, $2, $3)" +// "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3" const updateTxnIDForEventsSQL = "" + - "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3" + "select * from c where c._cn = @x1 " + + "and c.mx_appservice_event.as_id = @x2 " + + "and c.mx_appservice_event.id <= @x3 " +// "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2" const deleteEventsBeforeAndIncludingIDSQL = "" + - "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2" + "select * from c where c._cn = @x1 " + + "and c.mx_appservice_event.as_id = @x2 " + + "and c.mx_appservice_event.id <= @x3 " const ( // A transaction ID number that no transaction should ever have. Used for @@ -66,42 +101,97 @@ const ( ) type eventsStatements struct { - db *sql.DB + db *Database writer sqlutil.Writer - selectEventsByApplicationServiceIDStmt *sql.Stmt - countEventsByApplicationServiceIDStmt *sql.Stmt - insertEventStmt *sql.Stmt - updateTxnIDForEventsStmt *sql.Stmt - deleteEventsBeforeAndIncludingIDStmt *sql.Stmt + selectEventsByApplicationServiceIDStmt string + countEventsByApplicationServiceIDStmt string + // insertEventStmt *sql.Stmt + updateTxnIDForEventsStmt string + deleteEventsBeforeAndIncludingIDStmt string + tableName string } -func (s *eventsStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) { +func (s *eventsStatements) prepare(db *Database, writer sqlutil.Writer) (err error) { s.db = db s.writer = writer - _, err = db.Exec(appserviceEventsSchema) - if err != nil { - return - } - - if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil { - return - } - if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil { - return - } - if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { - return - } - if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil { - return - } - if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil { - return - } + s.selectEventsByApplicationServiceIDStmt = selectEventsByApplicationServiceIDSQL + s.countEventsByApplicationServiceIDStmt = countEventsByApplicationServiceIDSQL + s.updateTxnIDForEventsStmt = updateTxnIDForEventsSQL + s.deleteEventsBeforeAndIncludingIDStmt = deleteEventsBeforeAndIncludingIDSQL + s.tableName = "events" return } +func queryEvent(s *eventsStatements, ctx context.Context, qry string, params map[string]interface{}) ([]EventCosmosData, error) { + var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) + var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.ContainerName, dbCollectionName) + var response []EventCosmosData + + var optionsQry = cosmosdbapi.GetQueryDocumentsOptions(pk) + var query = cosmosdbapi.GetQuery(qry, params) + _, err := cosmosdbapi.GetClient(s.db.connection).QueryDocuments( + ctx, + s.db.cosmosConfig.DatabaseName, + s.db.cosmosConfig.ContainerName, + query, + &response, + optionsQry) + + if err != nil { + return nil, err + } + return response, nil +} + +func queryEventEventNumber(s *eventsStatements, ctx context.Context, qry string, params map[string]interface{}) ([]EventNumberCosmosData, error) { + var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) + var pk = cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.ContainerName, dbCollectionName) + var response []EventNumberCosmosData + + var optionsQry = cosmosdbapi.GetQueryDocumentsOptions(pk) + var query = cosmosdbapi.GetQuery(qry, params) + _, err := cosmosdbapi.GetClient(s.db.connection).QueryDocuments( + ctx, + s.db.cosmosConfig.DatabaseName, + s.db.cosmosConfig.ContainerName, + query, + &response, + optionsQry) + + if err != nil { + return nil, err + } + return response, nil +} + +func setEvent(s *eventsStatements, ctx context.Context, event EventCosmosData) (*EventCosmosData, error) { + var optionsReplace = cosmosdbapi.GetReplaceDocumentOptions(event.Pk, event.ETag) + var _, _, ex = cosmosdbapi.GetClient(s.db.connection).ReplaceDocument( + ctx, + s.db.cosmosConfig.DatabaseName, + s.db.cosmosConfig.ContainerName, + event.Id, + &event, + optionsReplace) + return &event, ex +} + +func deleteEvent(s *eventsStatements, ctx context.Context, event EventCosmosData) error { + var options = cosmosdbapi.GetDeleteDocumentOptions(event.Pk) + var _, err = cosmosdbapi.GetClient(s.db.connection).DeleteDocument( + ctx, + s.db.cosmosConfig.DatabaseName, + s.db.cosmosConfig.ContainerName, + event.Id, + options) + + if err != nil { + return err + } + return err +} + // selectEventsByApplicationServiceID takes in an application service ID and // returns a slice of events that need to be sent to that application service, // as well as an int later used to remove these same events from the database @@ -116,19 +206,24 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( eventsRemaining bool, err error, ) { - defer func() { - if err != nil { - log.WithFields(log.Fields{ - "appservice": applicationServiceID, - }).WithError(err).Fatalf("appservice unable to select new events to send") - } - }() - // Retrieve events from the database. Unsuccessfully sent events first - eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID) - if err != nil { - return + + // "SELECT id, headered_event_json, txn_id " + + // "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" + + var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) + params := map[string]interface{}{ + "@x1": dbCollectionName, + "@x2": applicationServiceID, } - defer checkNamedErr(eventRows.Close, &err) + + eventRows, err := queryEvent(s, ctx, s.selectEventsByApplicationServiceIDStmt, params) + + if err != nil { + log.WithFields(log.Fields{ + "appservice": applicationServiceID, + }).WithError(err).Fatalf("appservice unable to select new events to send") + } + events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit) if err != nil { return @@ -144,7 +239,7 @@ func checkNamedErr(fn func() error, err *error) { } } -func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) { +func retrieveEvents(eventRows []EventCosmosData, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) { // Get current time for use in calculating event age nowMilli := time.Now().UnixNano() / int64(time.Millisecond) @@ -152,15 +247,14 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib. // If txn_id changes dramatically, we've switched from collecting old events to // new ones. Send back those events first. lastTxnID := invalidTxnID - for eventsProcessed := 0; eventRows.Next(); { + for eventsProcessed := 0; eventsProcessed < len(eventRows); { var event gomatrixserverlib.HeaderedEvent var eventJSON []byte var id int - err = eventRows.Scan( - &id, - &eventJSON, - &txnID, - ) + item := eventRows[eventsProcessed] + id = int(item.Event.ID) + eventJSON = item.Event.HeaderedEventJSON + txnID = int(item.Event.TXNID) if err != nil { return nil, 0, 0, false, err } @@ -208,10 +302,21 @@ func (s *eventsStatements) countEventsByApplicationServiceID( appServiceID string, ) (int, error) { var count int - err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count) + + // "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" + + var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) + params := map[string]interface{}{ + "@x1": dbCollectionName, + "@x2": appServiceID, + } + + response, err := queryEventEventNumber(s, ctx, s.countEventsByApplicationServiceIDStmt, params) + if err != nil && err != sql.ErrNoRows { return 0, err } + count = response[0].Number return count, nil } @@ -229,15 +334,48 @@ func (s *eventsStatements) insertEvent( return err } - return s.writer.Do(s.db, nil, func(txn *sql.Tx) error { - _, err := s.insertEventStmt.ExecContext( - ctx, - appServiceID, - eventJSON, - -1, // No transaction ID yet - ) - return err - }) + // "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " + + // "VALUES ($1, $2, $3)" + + // id INTEGER PRIMARY KEY AUTOINCREMENT, + idSeq, seqErr := GetNextEventID(s, ctx) + if seqErr != nil { + return seqErr + } + + var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) + docId := fmt.Sprintf("%d", idSeq) + cosmosDocId := cosmosdbapi.GetDocumentId(s.db.cosmosConfig.ContainerName, dbCollectionName, docId) + pk := cosmosdbapi.GetPartitionKey(s.db.cosmosConfig.ContainerName, dbCollectionName) + + // appServiceID, + // eventJSON, + // -1, // No transaction ID yet + data := EventCosmos{ + AppServiceID: appServiceID, + HeaderedEventJSON: eventJSON, + ID: idSeq, + TXNID: -1, + } + + dbData := &EventCosmosData{ + Id: cosmosDocId, + Cn: dbCollectionName, + Pk: pk, + Timestamp: time.Now().Unix(), + Event: data, + } + + var options = cosmosdbapi.GetUpsertDocumentOptions(dbData.Pk) + _, _, err = cosmosdbapi.GetClient(s.db.connection).CreateDocument( + ctx, + s.db.cosmosConfig.DatabaseName, + s.db.cosmosConfig.ContainerName, + &dbData, + options) + + return err + } // updateTxnIDForEvents sets the transactionID for a collection of events. Done @@ -248,10 +386,27 @@ func (s *eventsStatements) updateTxnIDForEvents( appserviceID string, maxID, txnID int, ) (err error) { - return s.writer.Do(s.db, nil, func(txn *sql.Tx) error { - _, err := s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID) + // "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3" + + var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) + params := map[string]interface{}{ + "@x1": dbCollectionName, + "@x2": appserviceID, + "@x3": maxID, + } + + response, err := queryEvent(s, ctx, s.updateTxnIDForEventsStmt, params) + if err != nil { return err - }) + } + + for _, item := range response { + item.Event.TXNID = int64(txnID) + // _, err := s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID) + _, err = setEvent(s, ctx, item) + } + + return err } // deleteEventsBeforeAndIncludingID removes events matching given IDs from the database. @@ -260,8 +415,23 @@ func (s *eventsStatements) deleteEventsBeforeAndIncludingID( appserviceID string, eventTableID int, ) (err error) { - return s.writer.Do(s.db, nil, func(txn *sql.Tx) error { - _, err := s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID) + // "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2" + + var dbCollectionName = cosmosdbapi.GetCollectionName(s.db.databaseName, s.tableName) + params := map[string]interface{}{ + "@x1": dbCollectionName, + "@x2": appserviceID, + "@x3": eventTableID, + } + + response, err := queryEvent(s, ctx, s.deleteEventsBeforeAndIncludingIDStmt, params) + if err != nil { return err - }) + } + + for _, item := range response { + // _, err := s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID) + err = deleteEvent(s, ctx, item) + } + return err } diff --git a/appservice/storage/cosmosdb/appservice_events_table_id_seq.go b/appservice/storage/cosmosdb/appservice_events_table_id_seq.go new file mode 100644 index 000000000..4b87878bd --- /dev/null +++ b/appservice/storage/cosmosdb/appservice_events_table_id_seq.go @@ -0,0 +1,12 @@ +package cosmosdb + +import ( + "context" + + "github.com/matrix-org/dendrite/internal/cosmosdbutil" +) + +func GetNextEventID(s *eventsStatements, ctx context.Context) (int64, error) { + const docId = "id_seq" + return cosmosdbutil.GetNextSequence(ctx, s.db.connection, s.db.cosmosConfig, s.db.databaseName, s.tableName, docId, 1) +} diff --git a/appservice/storage/cosmosdb/storage.go b/appservice/storage/cosmosdb/storage.go index 3639010e1..bd5cdae93 100644 --- a/appservice/storage/cosmosdb/storage.go +++ b/appservice/storage/cosmosdb/storage.go @@ -17,7 +17,10 @@ package cosmosdb import ( "context" - "database/sql" + + "github.com/matrix-org/dendrite/internal/cosmosdbapi" + + "github.com/matrix-org/dendrite/internal/cosmosdbutil" // Import SQLite database driver "github.com/matrix-org/dendrite/internal/sqlutil" @@ -29,35 +32,38 @@ import ( // Database stores events intended to be later sent to application services type Database struct { sqlutil.PartitionOffsetStatements - events eventsStatements - txnID txnStatements - db *sql.DB - writer sqlutil.Writer + events eventsStatements + txnID txnStatements + writer cosmosdbutil.Writer + connection cosmosdbapi.CosmosConnection + databaseName string + cosmosConfig cosmosdbapi.CosmosConfig + serverName gomatrixserverlib.ServerName } // NewDatabase opens a new database func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { - var result Database - var err error - if result.db, err = sqlutil.Open(dbProperties); err != nil { - return nil, err + conn := cosmosdbutil.GetCosmosConnection(&dbProperties.ConnectionString) + config := cosmosdbutil.GetCosmosConfig(&dbProperties.ConnectionString) + result := &Database{ + databaseName: "appservice", + connection: conn, + cosmosConfig: config, } - result.writer = sqlutil.NewExclusiveWriter() + var err error + result.writer = cosmosdbutil.NewExclusiveWriterFake() if err = result.prepare(); err != nil { return nil, err } - if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil { - return nil, err - } - return &result, nil + return result, nil } func (d *Database) prepare() error { - if err := d.events.prepare(d.db, d.writer); err != nil { + if err := d.events.prepare(nil, d.writer); err != nil { return err } - return d.txnID.prepare(d.db, d.writer) + return d.txnID.prepare(nil, d.writer) } // StoreEvent takes in a gomatrixserverlib.HeaderedEvent and stores it in the database diff --git a/appservice/storage/cosmosdb/txn_id_counter_table.go b/appservice/storage/cosmosdb/txn_id_counter_table.go index 73b13f3db..192452239 100644 --- a/appservice/storage/cosmosdb/txn_id_counter_table.go +++ b/appservice/storage/cosmosdb/txn_id_counter_table.go @@ -22,38 +22,32 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" ) -const txnIDSchema = ` --- Keeps a count of the current transaction ID -CREATE TABLE IF NOT EXISTS appservice_counters ( - name TEXT PRIMARY KEY NOT NULL, - last_id INTEGER DEFAULT 1 -); -INSERT OR IGNORE INTO appservice_counters (name, last_id) VALUES('txn_id', 1); -` +// const txnIDSchema = ` +// -- Keeps a count of the current transaction ID +// CREATE TABLE IF NOT EXISTS appservice_counters ( +// name TEXT PRIMARY KEY NOT NULL, +// last_id INTEGER DEFAULT 1 +// ); +// INSERT OR IGNORE INTO appservice_counters (name, last_id) VALUES('txn_id', 1); +// ` -const selectTxnIDSQL = ` - SELECT last_id FROM appservice_counters WHERE name='txn_id'; - UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id'; -` +// const selectTxnIDSQL = ` +// SELECT last_id FROM appservice_counters WHERE name='txn_id'; +// UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id'; +// ` type txnStatements struct { - db *sql.DB + db *Database writer sqlutil.Writer selectTxnIDStmt *sql.Stmt + tableName string } -func (s *txnStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) { +func (s *txnStatements) prepare(db *Database, writer sqlutil.Writer) (err error) { s.db = db s.writer = writer - _, err = db.Exec(txnIDSchema) - if err != nil { - return - } - - if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil { - return - } - + //Only used for the seq generation + s.tableName = "counters" return } @@ -61,9 +55,5 @@ func (s *txnStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) { func (s *txnStatements) selectTxnID( ctx context.Context, ) (txnID int, err error) { - err = s.writer.Do(s.db, nil, func(txn *sql.Tx) error { - err := s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID) - return err - }) - return + return GetNextCounterTXNID(s, ctx) } diff --git a/appservice/storage/cosmosdb/txn_id_counter_table_txn_id_seq.go b/appservice/storage/cosmosdb/txn_id_counter_table_txn_id_seq.go new file mode 100644 index 000000000..1fa528c15 --- /dev/null +++ b/appservice/storage/cosmosdb/txn_id_counter_table_txn_id_seq.go @@ -0,0 +1,16 @@ +package cosmosdb + +import ( + "context" + + "github.com/matrix-org/dendrite/internal/cosmosdbutil" +) + +func GetNextCounterTXNID(s *txnStatements, ctx context.Context) (int, error) { + const docId = "txn_id_seq" + result, err := cosmosdbutil.GetNextSequence(ctx, s.db.connection, s.db.cosmosConfig, s.db.databaseName, s.tableName, docId, 1) + if(err != nil) { + return -1, err + } + return int(result), err +} diff --git a/dendrite-config-cosmosdb.yaml b/dendrite-config-cosmosdb.yaml index 9dd980b1b..03d4983f7 100644 --- a/dendrite-config-cosmosdb.yaml +++ b/dendrite-config-cosmosdb.yaml @@ -122,7 +122,7 @@ app_service_api: listen: http://localhost:7777 connect: http://localhost:7777 database: - connection_string: file:appservice.db + connection_string: "cosmosdb:AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;DatabaseName=safezone_local;ContainerName=test.criticalarc.com;" max_open_conns: 10 max_idle_conns: 2 conn_max_lifetime: -1