diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index b9a567954..6d3ea808f 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -34,7 +34,7 @@ import ( type OutputRoomEventConsumer struct { roomServerConsumer *common.ContinualConsumer db accounts.Database - asDB *storage.Database + asDB storage.Database query api.RoomserverQueryAPI alias api.RoomserverAliasAPI serverName string @@ -47,7 +47,7 @@ func NewOutputRoomEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, store accounts.Database, - appserviceDB *storage.Database, + appserviceDB storage.Database, queryAPI api.RoomserverQueryAPI, aliasAPI api.RoomserverAliasAPI, workerStates []types.ApplicationServiceWorkerState, diff --git a/appservice/storage/appservice_events_table.go b/appservice/storage/postgres/appservice_events_table.go similarity index 99% rename from appservice/storage/appservice_events_table.go rename to appservice/storage/postgres/appservice_events_table.go index 285bbf483..d72faeeae 100644 --- a/appservice/storage/appservice_events_table.go +++ b/appservice/storage/postgres/appservice_events_table.go @@ -1,4 +1,5 @@ // Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package postgres import ( "context" diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go new file mode 100644 index 000000000..c47564689 --- /dev/null +++ b/appservice/storage/postgres/storage.go @@ -0,0 +1,111 @@ +// Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + // Import postgres database driver + _ "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" +) + +// Database stores events intended to be later sent to application services +type Database struct { + events eventsStatements + txnID txnStatements + db *sql.DB +} + +// NewDatabase opens a new database +func NewDatabase(dataSourceName string) (*Database, error) { + var result Database + var err error + if result.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = result.prepare(); err != nil { + return nil, err + } + return &result, nil +} + +func (d *Database) prepare() error { + if err := d.events.prepare(d.db); err != nil { + return err + } + + return d.txnID.prepare(d.db) +} + +// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database +// for a transaction worker to pull and later send to an application service. +func (d *Database) StoreEvent( + ctx context.Context, + appServiceID string, + event *gomatrixserverlib.Event, +) error { + return d.events.insertEvent(ctx, appServiceID, event) +} + +// GetEventsWithAppServiceID returns a slice of events and their IDs intended to +// be sent to an application service given its ID. +func (d *Database) GetEventsWithAppServiceID( + ctx context.Context, + appServiceID string, + limit int, +) (int, int, []gomatrixserverlib.Event, bool, error) { + return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) +} + +// CountEventsWithAppServiceID returns the number of events destined for an +// application service given its ID. +func (d *Database) CountEventsWithAppServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + return d.events.countEventsByApplicationServiceID(ctx, appServiceID) +} + +// UpdateTxnIDForEvents takes in an application service ID and a +// and stores them in the DB, unless the pair already exists, in +// which case it updates them. +func (d *Database) UpdateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) error { + return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID) +} + +// RemoveEventsBeforeAndIncludingID removes all events from the database that +// are less than or equal to a given maximum ID. IDs here are implemented as a +// serial, thus this should always delete events in chronological order. +func (d *Database) RemoveEventsBeforeAndIncludingID( + ctx context.Context, + appserviceID string, + eventTableID int, +) error { + return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID) +} + +// GetLatestTxnID returns the latest available transaction id +func (d *Database) GetLatestTxnID( + ctx context.Context, +) (int, error) { + return d.txnID.selectTxnID(ctx) +} diff --git a/appservice/storage/txn_id_counter_table.go b/appservice/storage/postgres/txn_id_counter_table.go similarity index 94% rename from appservice/storage/txn_id_counter_table.go rename to appservice/storage/postgres/txn_id_counter_table.go index 7b0fa3786..a96a0e360 100644 --- a/appservice/storage/txn_id_counter_table.go +++ b/appservice/storage/postgres/txn_id_counter_table.go @@ -1,4 +1,5 @@ // Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package postgres import ( "context" diff --git a/appservice/storage/sqlite3/appservice_events_table.go b/appservice/storage/sqlite3/appservice_events_table.go new file mode 100644 index 000000000..846f09f7f --- /dev/null +++ b/appservice/storage/sqlite3/appservice_events_table.go @@ -0,0 +1,249 @@ +// Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "encoding/json" + "time" + + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +const appserviceEventsSchema = ` +-- Stores events to be sent to application services +CREATE TABLE IF NOT EXISTS appservice_events ( + -- An auto-incrementing id unique to each event in the table + id INTEGER PRIMARY KEY AUTOINCREMENT, + -- The ID of the application service the event will be sent to + as_id TEXT NOT NULL, + -- JSON representation of the event + event_json TEXT NOT NULL, + -- The ID of the transaction that this event is a part of + txn_id INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); +` + +const selectEventsByApplicationServiceIDSQL = "" + + "SELECT id, event_json, txn_id " + + "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" + +const countEventsByApplicationServiceIDSQL = "" + + "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" + +const insertEventSQL = "" + + "INSERT INTO appservice_events(as_id, event_json, txn_id) " + + "VALUES ($1, $2, $3)" + +const updateTxnIDForEventsSQL = "" + + "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3" + +const deleteEventsBeforeAndIncludingIDSQL = "" + + "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2" + +const ( + // A transaction ID number that no transaction should ever have. Used for + // checking again the default value. + invalidTxnID = -2 +) + +type eventsStatements struct { + selectEventsByApplicationServiceIDStmt *sql.Stmt + countEventsByApplicationServiceIDStmt *sql.Stmt + insertEventStmt *sql.Stmt + updateTxnIDForEventsStmt *sql.Stmt + deleteEventsBeforeAndIncludingIDStmt *sql.Stmt +} + +func (s *eventsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(appserviceEventsSchema) + if err != nil { + return + } + + if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil { + return + } + if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil { + return + } + if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { + return + } + if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil { + return + } + if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil { + return + } + + return +} + +// selectEventsByApplicationServiceID takes in an application service ID and +// returns a slice of events that need to be sent to that application service, +// as well as an int later used to remove these same events from the database +// once successfully sent to an application service. +func (s *eventsStatements) selectEventsByApplicationServiceID( + ctx context.Context, + applicationServiceID string, + limit int, +) ( + txnID, maxID int, + events []gomatrixserverlib.Event, + eventsRemaining bool, + err error, +) { + // Retrieve events from the database. Unsuccessfully sent events first + eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID) + if err != nil { + return + } + defer func() { + err = eventRows.Close() + if err != nil { + log.WithFields(log.Fields{ + "appservice": applicationServiceID, + }).WithError(err).Fatalf("appservice unable to select new events to send") + } + }() + events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit) + if err != nil { + return + } + + return +} + +func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) { + // Get current time for use in calculating event age + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + + // Iterate through each row and store event contents + // If txn_id changes dramatically, we've switched from collecting old events to + // new ones. Send back those events first. + lastTxnID := invalidTxnID + for eventsProcessed := 0; eventRows.Next(); { + var event gomatrixserverlib.Event + var eventJSON []byte + var id int + err = eventRows.Scan( + &id, + &eventJSON, + &txnID, + ) + if err != nil { + return nil, 0, 0, false, err + } + + // Unmarshal eventJSON + if err = json.Unmarshal(eventJSON, &event); err != nil { + return nil, 0, 0, false, err + } + + // If txnID has changed on this event from the previous event, then we've + // reached the end of a transaction's events. Return only those events. + if lastTxnID > invalidTxnID && lastTxnID != txnID { + return events, maxID, lastTxnID, true, nil + } + lastTxnID = txnID + + // Limit events that aren't part of an old transaction + if txnID == -1 { + // Return if we've hit the limit + if eventsProcessed++; eventsProcessed > limit { + return events, maxID, lastTxnID, true, nil + } + } + + if id > maxID { + maxID = id + } + + // Portion of the event that is unsigned due to rapid change + // TODO: Consider removing age as not many app services use it + if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil { + return nil, 0, 0, false, err + } + + events = append(events, event) + } + + return +} + +// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service +// IDs into the db. +func (s *eventsStatements) countEventsByApplicationServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + var count int + err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count) + if err != nil && err != sql.ErrNoRows { + return 0, err + } + + return count, nil +} + +// insertEvent inserts an event mapped to its corresponding application service +// IDs into the db. +func (s *eventsStatements) insertEvent( + ctx context.Context, + appServiceID string, + event *gomatrixserverlib.Event, +) (err error) { + // Convert event to JSON before inserting + eventJSON, err := json.Marshal(event) + if err != nil { + return err + } + + _, err = s.insertEventStmt.ExecContext( + ctx, + appServiceID, + eventJSON, + -1, // No transaction ID yet + ) + return +} + +// updateTxnIDForEvents sets the transactionID for a collection of events. Done +// before sending them to an AppService. Referenced before sending to make sure +// we aren't constructing multiple transactions with the same events. +func (s *eventsStatements) updateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) (err error) { + _, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID) + return +} + +// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database. +func (s *eventsStatements) deleteEventsBeforeAndIncludingID( + ctx context.Context, + appserviceID string, + eventTableID int, +) (err error) { + _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID) + return +} diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go new file mode 100644 index 000000000..56ab55f2f --- /dev/null +++ b/appservice/storage/sqlite3/storage.go @@ -0,0 +1,111 @@ +// Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + // Import SQLite database driver + "github.com/matrix-org/gomatrixserverlib" + _ "github.com/mattn/go-sqlite3" +) + +// Database stores events intended to be later sent to application services +type Database struct { + events eventsStatements + txnID txnStatements + db *sql.DB +} + +// NewDatabase opens a new database +func NewDatabase(dataSourceName string) (*Database, error) { + var result Database + var err error + if result.db, err = sql.Open("sqlite3", dataSourceName); err != nil { + return nil, err + } + if err = result.prepare(); err != nil { + return nil, err + } + return &result, nil +} + +func (d *Database) prepare() error { + if err := d.events.prepare(d.db); err != nil { + return err + } + + return d.txnID.prepare(d.db) +} + +// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database +// for a transaction worker to pull and later send to an application service. +func (d *Database) StoreEvent( + ctx context.Context, + appServiceID string, + event *gomatrixserverlib.Event, +) error { + return d.events.insertEvent(ctx, appServiceID, event) +} + +// GetEventsWithAppServiceID returns a slice of events and their IDs intended to +// be sent to an application service given its ID. +func (d *Database) GetEventsWithAppServiceID( + ctx context.Context, + appServiceID string, + limit int, +) (int, int, []gomatrixserverlib.Event, bool, error) { + return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) +} + +// CountEventsWithAppServiceID returns the number of events destined for an +// application service given its ID. +func (d *Database) CountEventsWithAppServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + return d.events.countEventsByApplicationServiceID(ctx, appServiceID) +} + +// UpdateTxnIDForEvents takes in an application service ID and a +// and stores them in the DB, unless the pair already exists, in +// which case it updates them. +func (d *Database) UpdateTxnIDForEvents( + ctx context.Context, + appserviceID string, + maxID, txnID int, +) error { + return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID) +} + +// RemoveEventsBeforeAndIncludingID removes all events from the database that +// are less than or equal to a given maximum ID. IDs here are implemented as a +// serial, thus this should always delete events in chronological order. +func (d *Database) RemoveEventsBeforeAndIncludingID( + ctx context.Context, + appserviceID string, + eventTableID int, +) error { + return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID) +} + +// GetLatestTxnID returns the latest available transaction id +func (d *Database) GetLatestTxnID( + ctx context.Context, +) (int, error) { + return d.txnID.selectTxnID(ctx) +} diff --git a/appservice/storage/sqlite3/txn_id_counter_table.go b/appservice/storage/sqlite3/txn_id_counter_table.go new file mode 100644 index 000000000..b1ee60766 --- /dev/null +++ b/appservice/storage/sqlite3/txn_id_counter_table.go @@ -0,0 +1,60 @@ +// Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" +) + +const txnIDSchema = ` +-- Keeps a count of the current transaction ID +CREATE TABLE IF NOT EXISTS appservice_counters ( + name TEXT PRIMARY KEY NOT NULL, + last_id INTEGER DEFAULT 1 +); +INSERT OR IGNORE INTO appservice_counters (name, last_id) VALUES('txn_id', 1); +` + +const selectTxnIDSQL = ` + SELECT last_id FROM appservice_counters WHERE name='txn_id'; + UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id'; +` + +type txnStatements struct { + selectTxnIDStmt *sql.Stmt +} + +func (s *txnStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(txnIDSchema) + if err != nil { + return + } + + if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil { + return + } + + return +} + +// selectTxnID selects the latest ascending transaction ID +func (s *txnStatements) selectTxnID( + ctx context.Context, +) (txnID int, err error) { + err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID) + return +} diff --git a/appservice/storage/storage.go b/appservice/storage/storage.go index b68989fb1..ce3776bc3 100644 --- a/appservice/storage/storage.go +++ b/appservice/storage/storage.go @@ -1,4 +1,4 @@ -// Copyright 2018 New Vector Ltd +// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,95 +16,33 @@ package storage import ( "context" - "database/sql" + "net/url" - // Import postgres database driver - _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/appservice/storage/postgres" + "github.com/matrix-org/dendrite/appservice/storage/sqlite3" "github.com/matrix-org/gomatrixserverlib" ) -// Database stores events intended to be later sent to application services -type Database struct { - events eventsStatements - txnID txnStatements - db *sql.DB +type Database interface { + StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.Event) error + GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.Event, bool, error) + CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error) + UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error + RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error + GetLatestTxnID(ctx context.Context) (int, error) } -// NewDatabase opens a new database -func NewDatabase(dataSourceName string) (*Database, error) { - var result Database - var err error - if result.db, err = sql.Open("postgres", dataSourceName); err != nil { - return nil, err +func NewDatabase(dataSourceName string) (Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { + return postgres.NewDatabase(dataSourceName) } - if err = result.prepare(); err != nil { - return nil, err + switch uri.Scheme { + case "postgres": + return postgres.NewDatabase(dataSourceName) + case "file": + return sqlite3.NewDatabase(dataSourceName) + default: + return postgres.NewDatabase(dataSourceName) } - return &result, nil -} - -func (d *Database) prepare() error { - if err := d.events.prepare(d.db); err != nil { - return err - } - - return d.txnID.prepare(d.db) -} - -// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database -// for a transaction worker to pull and later send to an application service. -func (d *Database) StoreEvent( - ctx context.Context, - appServiceID string, - event *gomatrixserverlib.Event, -) error { - return d.events.insertEvent(ctx, appServiceID, event) -} - -// GetEventsWithAppServiceID returns a slice of events and their IDs intended to -// be sent to an application service given its ID. -func (d *Database) GetEventsWithAppServiceID( - ctx context.Context, - appServiceID string, - limit int, -) (int, int, []gomatrixserverlib.Event, bool, error) { - return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) -} - -// CountEventsWithAppServiceID returns the number of events destined for an -// application service given its ID. -func (d *Database) CountEventsWithAppServiceID( - ctx context.Context, - appServiceID string, -) (int, error) { - return d.events.countEventsByApplicationServiceID(ctx, appServiceID) -} - -// UpdateTxnIDForEvents takes in an application service ID and a -// and stores them in the DB, unless the pair already exists, in -// which case it updates them. -func (d *Database) UpdateTxnIDForEvents( - ctx context.Context, - appserviceID string, - maxID, txnID int, -) error { - return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID) -} - -// RemoveEventsBeforeAndIncludingID removes all events from the database that -// are less than or equal to a given maximum ID. IDs here are implemented as a -// serial, thus this should always delete events in chronological order. -func (d *Database) RemoveEventsBeforeAndIncludingID( - ctx context.Context, - appserviceID string, - eventTableID int, -) error { - return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID) -} - -// GetLatestTxnID returns the latest available transaction id -func (d *Database) GetLatestTxnID( - ctx context.Context, -) (int, error) { - return d.txnID.selectTxnID(ctx) } diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go index 0330eb9ea..faa1e4a96 100644 --- a/appservice/workers/transaction_scheduler.go +++ b/appservice/workers/transaction_scheduler.go @@ -43,7 +43,7 @@ var ( // size), then send that off to the AS's /transactions/{txnID} endpoint. It also // handles exponentially backing off in case the AS isn't currently available. func SetupTransactionWorkers( - appserviceDB *storage.Database, + appserviceDB storage.Database, workerStates []types.ApplicationServiceWorkerState, ) error { // Create a worker that handles transmitting events to a single homeserver @@ -58,7 +58,7 @@ func SetupTransactionWorkers( // worker is a goroutine that sends any queued events to the application service // it is given. -func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { +func worker(db storage.Database, ws types.ApplicationServiceWorkerState) { log.WithFields(log.Fields{ "appservice": ws.AppService.ID, }).Info("starting application service") @@ -149,7 +149,7 @@ func backoff(ws *types.ApplicationServiceWorkerState, err error) { // transaction, and JSON-encodes the results. func createTransaction( ctx context.Context, - db *storage.Database, + db storage.Database, appserviceID string, ) ( transactionJSON []byte, diff --git a/clientapi/auth/storage/accounts/sqlite3/filter_table.go b/clientapi/auth/storage/accounts/sqlite3/filter_table.go index 691ead775..7f1a0c249 100644 --- a/clientapi/auth/storage/accounts/sqlite3/filter_table.go +++ b/clientapi/auth/storage/accounts/sqlite3/filter_table.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "encoding/json" + "fmt" "github.com/matrix-org/gomatrixserverlib" ) @@ -47,14 +48,10 @@ const selectFilterIDByContentSQL = "" + const insertFilterSQL = "" + "INSERT INTO account_filter (filter, localpart) VALUES ($1, $2)" -const selectLastInsertedFilterIDSQL = "" + - "SELECT id FROM account_filter WHERE rowid = last_insert_rowid()" - type filterStatements struct { - selectFilterStmt *sql.Stmt - selectLastInsertedFilterIDStmt *sql.Stmt - selectFilterIDByContentStmt *sql.Stmt - insertFilterStmt *sql.Stmt + selectFilterStmt *sql.Stmt + selectFilterIDByContentStmt *sql.Stmt + insertFilterStmt *sql.Stmt } func (s *filterStatements) prepare(db *sql.DB) (err error) { @@ -65,9 +62,6 @@ func (s *filterStatements) prepare(db *sql.DB) (err error) { if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil { return } - if s.selectLastInsertedFilterIDStmt, err = db.Prepare(selectLastInsertedFilterIDSQL); err != nil { - return - } if s.selectFilterIDByContentStmt, err = db.Prepare(selectFilterIDByContentSQL); err != nil { return } @@ -128,12 +122,14 @@ func (s *filterStatements) insertFilter( } // Otherwise insert the filter and return the new ID - if _, err = s.insertFilterStmt.ExecContext(ctx, filterJSON, localpart); err != nil { + res, err := s.insertFilterStmt.ExecContext(ctx, filterJSON, localpart) + if err != nil { return "", err } - row := s.selectLastInsertedFilterIDStmt.QueryRowContext(ctx) - if err := row.Scan(&filterID); err != nil { + rowid, err := res.LastInsertId() + if err != nil { return "", err } + filterID = fmt.Sprintf("%d", rowid) return } diff --git a/docker/Dockerfile b/docker/Dockerfile index 29b27dde2..5810825a4 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,8 +1,4 @@ -<<<<<<< HEAD FROM docker.io/golang:1.13.7-alpine3.11 -======= -FROM docker.io/golang:1.13.6-alpine ->>>>>>> master RUN mkdir /build diff --git a/mediaapi/storage/sqlite3/media_repository_table.go b/mediaapi/storage/sqlite3/media_repository_table.go new file mode 100644 index 000000000..8e2e6236a --- /dev/null +++ b/mediaapi/storage/sqlite3/media_repository_table.go @@ -0,0 +1,115 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "time" + + "github.com/matrix-org/dendrite/mediaapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const mediaSchema = ` +-- The media_repository table holds metadata for each media file stored and accessible to the local server, +-- the actual file is stored separately. +CREATE TABLE IF NOT EXISTS mediaapi_media_repository ( + -- The id used to refer to the media. + -- For uploads to this server this is a base64-encoded sha256 hash of the file data + -- For media from remote servers, this can be any unique identifier string + media_id TEXT NOT NULL, + -- The origin of the media as requested by the client. Should be a homeserver domain. + media_origin TEXT NOT NULL, + -- The MIME-type of the media file as specified when uploading. + content_type TEXT NOT NULL, + -- Size of the media file in bytes. + file_size_bytes INTEGER NOT NULL, + -- When the content was uploaded in UNIX epoch ms. + creation_ts INTEGER NOT NULL, + -- The file name with which the media was uploaded. + upload_name TEXT NOT NULL, + -- Alternate RFC 4648 unpadded base64 encoding string representation of a SHA-256 hash sum of the file data. + base64hash TEXT NOT NULL, + -- The user who uploaded the file. Should be a Matrix user ID. + user_id TEXT NOT NULL +); +CREATE UNIQUE INDEX IF NOT EXISTS mediaapi_media_repository_index ON mediaapi_media_repository (media_id, media_origin); +` + +const insertMediaSQL = ` +INSERT INTO mediaapi_media_repository (media_id, media_origin, content_type, file_size_bytes, creation_ts, upload_name, base64hash, user_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +` + +const selectMediaSQL = ` +SELECT content_type, file_size_bytes, creation_ts, upload_name, base64hash, user_id FROM mediaapi_media_repository WHERE media_id = $1 AND media_origin = $2 +` + +type mediaStatements struct { + insertMediaStmt *sql.Stmt + selectMediaStmt *sql.Stmt +} + +func (s *mediaStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(mediaSchema) + if err != nil { + return + } + + return statementList{ + {&s.insertMediaStmt, insertMediaSQL}, + {&s.selectMediaStmt, selectMediaSQL}, + }.prepare(db) +} + +func (s *mediaStatements) insertMedia( + ctx context.Context, mediaMetadata *types.MediaMetadata, +) error { + mediaMetadata.CreationTimestamp = types.UnixMs(time.Now().UnixNano() / 1000000) + _, err := s.insertMediaStmt.ExecContext( + ctx, + mediaMetadata.MediaID, + mediaMetadata.Origin, + mediaMetadata.ContentType, + mediaMetadata.FileSizeBytes, + mediaMetadata.CreationTimestamp, + mediaMetadata.UploadName, + mediaMetadata.Base64Hash, + mediaMetadata.UserID, + ) + return err +} + +func (s *mediaStatements) selectMedia( + ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName, +) (*types.MediaMetadata, error) { + mediaMetadata := types.MediaMetadata{ + MediaID: mediaID, + Origin: mediaOrigin, + } + err := s.selectMediaStmt.QueryRowContext( + ctx, mediaMetadata.MediaID, mediaMetadata.Origin, + ).Scan( + &mediaMetadata.ContentType, + &mediaMetadata.FileSizeBytes, + &mediaMetadata.CreationTimestamp, + &mediaMetadata.UploadName, + &mediaMetadata.Base64Hash, + &mediaMetadata.UserID, + ) + return &mediaMetadata, err +} diff --git a/mediaapi/storage/sqlite3/prepare.go b/mediaapi/storage/sqlite3/prepare.go new file mode 100644 index 000000000..a6bc24c98 --- /dev/null +++ b/mediaapi/storage/sqlite3/prepare.go @@ -0,0 +1,38 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// FIXME: This should be made common! + +package sqlite3 + +import ( + "database/sql" +) + +// a statementList is a list of SQL statements to prepare and a pointer to where to store the resulting prepared statement. +type statementList []struct { + statement **sql.Stmt + sql string +} + +// prepare the SQL for each statement in the list and assign the result to the prepared statement. +func (s statementList) prepare(db *sql.DB) (err error) { + for _, statement := range s { + if *statement.statement, err = db.Prepare(statement.sql); err != nil { + return + } + } + return +} diff --git a/mediaapi/storage/sqlite3/sql.go b/mediaapi/storage/sqlite3/sql.go new file mode 100644 index 000000000..9cd78b8ee --- /dev/null +++ b/mediaapi/storage/sqlite3/sql.go @@ -0,0 +1,36 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "database/sql" +) + +type statements struct { + media mediaStatements + thumbnail thumbnailStatements +} + +func (s *statements) prepare(db *sql.DB) (err error) { + if err = s.media.prepare(db); err != nil { + return + } + if err = s.thumbnail.prepare(db); err != nil { + return + } + + return +} diff --git a/mediaapi/storage/sqlite3/storage.go b/mediaapi/storage/sqlite3/storage.go new file mode 100644 index 000000000..6477f8305 --- /dev/null +++ b/mediaapi/storage/sqlite3/storage.go @@ -0,0 +1,106 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + // Import the postgres database driver. + "github.com/matrix-org/dendrite/mediaapi/types" + "github.com/matrix-org/gomatrixserverlib" + _ "github.com/mattn/go-sqlite3" +) + +// Database is used to store metadata about a repository of media files. +type Database struct { + statements statements + db *sql.DB +} + +// Open opens a postgres database. +func Open(dataSourceName string) (*Database, error) { + var d Database + var err error + if d.db, err = sql.Open("sqlite3", dataSourceName); err != nil { + return nil, err + } + if err = d.statements.prepare(d.db); err != nil { + return nil, err + } + return &d, nil +} + +// StoreMediaMetadata inserts the metadata about the uploaded media into the database. +// Returns an error if the combination of MediaID and Origin are not unique in the table. +func (d *Database) StoreMediaMetadata( + ctx context.Context, mediaMetadata *types.MediaMetadata, +) error { + return d.statements.media.insertMedia(ctx, mediaMetadata) +} + +// GetMediaMetadata returns metadata about media stored on this server. +// The media could have been uploaded to this server or fetched from another server and cached here. +// Returns nil metadata if there is no metadata associated with this media. +func (d *Database) GetMediaMetadata( + ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName, +) (*types.MediaMetadata, error) { + mediaMetadata, err := d.statements.media.selectMedia(ctx, mediaID, mediaOrigin) + if err != nil && err == sql.ErrNoRows { + return nil, nil + } + return mediaMetadata, err +} + +// StoreThumbnail inserts the metadata about the thumbnail into the database. +// Returns an error if the combination of MediaID and Origin are not unique in the table. +func (d *Database) StoreThumbnail( + ctx context.Context, thumbnailMetadata *types.ThumbnailMetadata, +) error { + return d.statements.thumbnail.insertThumbnail(ctx, thumbnailMetadata) +} + +// GetThumbnail returns metadata about a specific thumbnail. +// The media could have been uploaded to this server or fetched from another server and cached here. +// Returns nil metadata if there is no metadata associated with this thumbnail. +func (d *Database) GetThumbnail( + ctx context.Context, + mediaID types.MediaID, + mediaOrigin gomatrixserverlib.ServerName, + width, height int, + resizeMethod string, +) (*types.ThumbnailMetadata, error) { + thumbnailMetadata, err := d.statements.thumbnail.selectThumbnail( + ctx, mediaID, mediaOrigin, width, height, resizeMethod, + ) + if err != nil && err == sql.ErrNoRows { + return nil, nil + } + return thumbnailMetadata, err +} + +// GetThumbnails returns metadata about all thumbnails for a specific media stored on this server. +// The media could have been uploaded to this server or fetched from another server and cached here. +// Returns nil metadata if there are no thumbnails associated with this media. +func (d *Database) GetThumbnails( + ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName, +) ([]*types.ThumbnailMetadata, error) { + thumbnails, err := d.statements.thumbnail.selectThumbnails(ctx, mediaID, mediaOrigin) + if err != nil && err == sql.ErrNoRows { + return nil, nil + } + return thumbnails, err +} diff --git a/mediaapi/storage/sqlite3/thumbnail_table.go b/mediaapi/storage/sqlite3/thumbnail_table.go new file mode 100644 index 000000000..95332c9d4 --- /dev/null +++ b/mediaapi/storage/sqlite3/thumbnail_table.go @@ -0,0 +1,162 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "time" + + "github.com/matrix-org/dendrite/mediaapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const thumbnailSchema = ` +-- The mediaapi_thumbnail table holds metadata for each thumbnail file stored and accessible to the local server, +-- the actual file is stored separately. +CREATE TABLE IF NOT EXISTS mediaapi_thumbnail ( + media_id TEXT NOT NULL, + media_origin TEXT NOT NULL, + content_type TEXT NOT NULL, + file_size_bytes INTEGER NOT NULL, + creation_ts INTEGER NOT NULL, + width INTEGER NOT NULL, + height INTEGER NOT NULL, + resize_method TEXT NOT NULL +); +CREATE UNIQUE INDEX IF NOT EXISTS mediaapi_thumbnail_index ON mediaapi_thumbnail (media_id, media_origin, width, height, resize_method); +` + +const insertThumbnailSQL = ` +INSERT INTO mediaapi_thumbnail (media_id, media_origin, content_type, file_size_bytes, creation_ts, width, height, resize_method) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +` + +// Note: this selects one specific thumbnail +const selectThumbnailSQL = ` +SELECT content_type, file_size_bytes, creation_ts FROM mediaapi_thumbnail WHERE media_id = $1 AND media_origin = $2 AND width = $3 AND height = $4 AND resize_method = $5 +` + +// Note: this selects all thumbnails for a media_origin and media_id +const selectThumbnailsSQL = ` +SELECT content_type, file_size_bytes, creation_ts, width, height, resize_method FROM mediaapi_thumbnail WHERE media_id = $1 AND media_origin = $2 +` + +type thumbnailStatements struct { + insertThumbnailStmt *sql.Stmt + selectThumbnailStmt *sql.Stmt + selectThumbnailsStmt *sql.Stmt +} + +func (s *thumbnailStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(thumbnailSchema) + if err != nil { + return + } + + return statementList{ + {&s.insertThumbnailStmt, insertThumbnailSQL}, + {&s.selectThumbnailStmt, selectThumbnailSQL}, + {&s.selectThumbnailsStmt, selectThumbnailsSQL}, + }.prepare(db) +} + +func (s *thumbnailStatements) insertThumbnail( + ctx context.Context, thumbnailMetadata *types.ThumbnailMetadata, +) error { + thumbnailMetadata.MediaMetadata.CreationTimestamp = types.UnixMs(time.Now().UnixNano() / 1000000) + _, err := s.insertThumbnailStmt.ExecContext( + ctx, + thumbnailMetadata.MediaMetadata.MediaID, + thumbnailMetadata.MediaMetadata.Origin, + thumbnailMetadata.MediaMetadata.ContentType, + thumbnailMetadata.MediaMetadata.FileSizeBytes, + thumbnailMetadata.MediaMetadata.CreationTimestamp, + thumbnailMetadata.ThumbnailSize.Width, + thumbnailMetadata.ThumbnailSize.Height, + thumbnailMetadata.ThumbnailSize.ResizeMethod, + ) + return err +} + +func (s *thumbnailStatements) selectThumbnail( + ctx context.Context, + mediaID types.MediaID, + mediaOrigin gomatrixserverlib.ServerName, + width, height int, + resizeMethod string, +) (*types.ThumbnailMetadata, error) { + thumbnailMetadata := types.ThumbnailMetadata{ + MediaMetadata: &types.MediaMetadata{ + MediaID: mediaID, + Origin: mediaOrigin, + }, + ThumbnailSize: types.ThumbnailSize{ + Width: width, + Height: height, + ResizeMethod: resizeMethod, + }, + } + err := s.selectThumbnailStmt.QueryRowContext( + ctx, + thumbnailMetadata.MediaMetadata.MediaID, + thumbnailMetadata.MediaMetadata.Origin, + thumbnailMetadata.ThumbnailSize.Width, + thumbnailMetadata.ThumbnailSize.Height, + thumbnailMetadata.ThumbnailSize.ResizeMethod, + ).Scan( + &thumbnailMetadata.MediaMetadata.ContentType, + &thumbnailMetadata.MediaMetadata.FileSizeBytes, + &thumbnailMetadata.MediaMetadata.CreationTimestamp, + ) + return &thumbnailMetadata, err +} + +func (s *thumbnailStatements) selectThumbnails( + ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName, +) ([]*types.ThumbnailMetadata, error) { + rows, err := s.selectThumbnailsStmt.QueryContext( + ctx, mediaID, mediaOrigin, + ) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + + var thumbnails []*types.ThumbnailMetadata + for rows.Next() { + thumbnailMetadata := types.ThumbnailMetadata{ + MediaMetadata: &types.MediaMetadata{ + MediaID: mediaID, + Origin: mediaOrigin, + }, + } + err = rows.Scan( + &thumbnailMetadata.MediaMetadata.ContentType, + &thumbnailMetadata.MediaMetadata.FileSizeBytes, + &thumbnailMetadata.MediaMetadata.CreationTimestamp, + &thumbnailMetadata.ThumbnailSize.Width, + &thumbnailMetadata.ThumbnailSize.Height, + &thumbnailMetadata.ThumbnailSize.ResizeMethod, + ) + if err != nil { + return nil, err + } + thumbnails = append(thumbnails, &thumbnailMetadata) + } + + return thumbnails, rows.Err() +} diff --git a/mediaapi/storage/storage.go b/mediaapi/storage/storage.go index 2c7f937dd..1d35a95ed 100644 --- a/mediaapi/storage/storage.go +++ b/mediaapi/storage/storage.go @@ -19,6 +19,7 @@ import ( "net/url" "github.com/matrix-org/dendrite/mediaapi/storage/postgres" + "github.com/matrix-org/dendrite/mediaapi/storage/sqlite3" "github.com/matrix-org/dendrite/mediaapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -40,6 +41,8 @@ func Open(dataSourceName string) (Database, error) { switch uri.Scheme { case "postgres": return postgres.Open(dataSourceName) + case "file": + return sqlite3.Open(dataSourceName) default: return postgres.Open(dataSourceName) } diff --git a/publicroomsapi/storage/sqlite3/public_rooms_table.go b/publicroomsapi/storage/sqlite3/public_rooms_table.go index 06c74a331..3fe3d3cab 100644 --- a/publicroomsapi/storage/sqlite3/public_rooms_table.go +++ b/publicroomsapi/storage/sqlite3/public_rooms_table.go @@ -39,26 +39,15 @@ var editableAttributes = []string{ const publicRoomsSchema = ` -- Stores all of the rooms with data needed to create the server's room directory CREATE TABLE IF NOT EXISTS publicroomsapi_public_rooms( - -- The room's ID room_id TEXT NOT NULL PRIMARY KEY, - -- Number of joined members in the room joined_members INTEGER NOT NULL DEFAULT 0, - -- Aliases of the room (empty array if none) - aliases TEXT[] NOT NULL DEFAULT '{}'::TEXT[], - -- Canonical alias of the room (empty string if none) + aliases TEXT NOT NULL DEFAULT '', canonical_alias TEXT NOT NULL DEFAULT '', - -- Name of the room (empty string if none) name TEXT NOT NULL DEFAULT '', - -- Topic of the room (empty string if none) topic TEXT NOT NULL DEFAULT '', - -- Is the room world readable? world_readable BOOLEAN NOT NULL DEFAULT false, - -- Can guest join the room? guest_can_join BOOLEAN NOT NULL DEFAULT false, - -- URL of the room avatar (empty string if none) avatar_url TEXT NOT NULL DEFAULT '', - -- Visibility of the room: true means the room is publicly visible, false - -- means the room is private visibility BOOLEAN NOT NULL DEFAULT false ); ` @@ -71,13 +60,13 @@ const selectPublicRoomsSQL = "" + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + " FROM publicroomsapi_public_rooms WHERE visibility = true" + " ORDER BY joined_members DESC" + - " OFFSET $1" + " LIMIT 30 OFFSET $1" const selectPublicRoomsWithLimitSQL = "" + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + " FROM publicroomsapi_public_rooms WHERE visibility = true" + " ORDER BY joined_members DESC" + - " OFFSET $1 LIMIT $2" + " LIMIT $2 OFFSET $1" const selectPublicRoomsWithFilterSQL = "" + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + @@ -85,9 +74,9 @@ const selectPublicRoomsWithFilterSQL = "" + " WHERE visibility = true" + " AND (LOWER(name) LIKE LOWER($1)" + " OR LOWER(topic) LIKE LOWER($1)" + - " OR LOWER(ARRAY_TO_STRING(aliases, ',')) LIKE LOWER($1))" + + " OR LOWER(aliases) LIKE LOWER($1))" + // TODO: Is there a better way to search aliases? " ORDER BY joined_members DESC" + - " OFFSET $2" + " LIMIT 30 OFFSET $2" const selectPublicRoomsWithLimitAndFilterSQL = "" + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + @@ -95,9 +84,9 @@ const selectPublicRoomsWithLimitAndFilterSQL = "" + " WHERE visibility = true" + " AND (LOWER(name) LIKE LOWER($1)" + " OR LOWER(topic) LIKE LOWER($1)" + - " OR LOWER(ARRAY_TO_STRING(aliases, ',')) LIKE LOWER($1))" + + " OR LOWER(aliases) LIKE LOWER($1))" + // TODO: Is there a better way to search aliases? " ORDER BY joined_members DESC" + - " OFFSET $2 LIMIT $3" + " LIMIT $3 OFFSET $2" const selectRoomVisibilitySQL = "" + "SELECT visibility FROM publicroomsapi_public_rooms" + @@ -187,7 +176,7 @@ func (s *publicRoomsStatements) selectPublicRooms( ) } else { rows, err = s.selectPublicRoomsWithLimitAndFilterStmt.QueryContext( - ctx, pattern, offset, limit, + ctx, pattern, limit, offset, ) } } else { @@ -195,7 +184,7 @@ func (s *publicRoomsStatements) selectPublicRooms( rows, err = s.selectPublicRoomsStmt.QueryContext(ctx, offset) } else { rows, err = s.selectPublicRoomsWithLimitStmt.QueryContext( - ctx, offset, limit, + ctx, limit, offset, ) } } diff --git a/publicroomsapi/storage/storage.go b/publicroomsapi/storage/storage.go index a6e18fbcb..29a6619fa 100644 --- a/publicroomsapi/storage/storage.go +++ b/publicroomsapi/storage/storage.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/publicroomsapi/storage/postgres" + "github.com/matrix-org/dendrite/publicroomsapi/storage/sqlite3" "github.com/matrix-org/dendrite/publicroomsapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -43,6 +44,8 @@ func NewPublicRoomsServerDatabase(dataSourceName string) (Database, error) { switch uri.Scheme { case "postgres": return postgres.NewPublicRoomsServerDatabase(dataSourceName) + case "file": + return sqlite3.NewPublicRoomsServerDatabase(dataSourceName) default: return postgres.NewPublicRoomsServerDatabase(dataSourceName) } diff --git a/roomserver/storage/sqlite3/invite_table.go b/roomserver/storage/sqlite3/invite_table.go index 5a0f0bf79..641f80156 100644 --- a/roomserver/storage/sqlite3/invite_table.go +++ b/roomserver/storage/sqlite3/invite_table.go @@ -52,16 +52,18 @@ const selectInviteActiveForUserInRoomSQL = "" + // However the matrix protocol doesn't give us a way to reliably identify the // invites that were retired, so we are forced to retire all of them. const updateInviteRetiredSQL = ` - UPDATE roomserver_invites SET retired = TRUE - WHERE room_nid = $1 AND target_nid = $2 AND NOT retired; - SELECT invite_event_id FROM roomserver_invites - WHERE rowid = last_insert_rowid(); + UPDATE roomserver_invites SET retired = TRUE WHERE room_nid = $1 AND target_nid = $2 AND NOT retired +` + +const selectInvitesAboutToRetireSQL = ` +SELECT invite_event_id FROM roomserver_invites WHERE room_nid = $1 AND target_nid = $2 AND NOT retired ` type inviteStatements struct { insertInviteEventStmt *sql.Stmt selectInviteActiveForUserInRoomStmt *sql.Stmt updateInviteRetiredStmt *sql.Stmt + selectInvitesAboutToRetireStmt *sql.Stmt } func (s *inviteStatements) prepare(db *sql.DB) (err error) { @@ -74,6 +76,7 @@ func (s *inviteStatements) prepare(db *sql.DB) (err error) { {&s.insertInviteEventStmt, insertInviteEventSQL}, {&s.selectInviteActiveForUserInRoomStmt, selectInviteActiveForUserInRoomSQL}, {&s.updateInviteRetiredStmt, updateInviteRetiredSQL}, + {&s.selectInvitesAboutToRetireStmt, selectInvitesAboutToRetireSQL}, }.prepare(db) } @@ -102,7 +105,8 @@ func (s *inviteStatements) updateInviteRetired( ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, ) (eventIDs []string, err error) { - stmt := common.TxStmt(txn, s.updateInviteRetiredStmt) + // gather all the event IDs we will retire + stmt := txn.Stmt(s.selectInvitesAboutToRetireStmt) rows, err := stmt.QueryContext(ctx, roomNID, targetUserNID) if err != nil { return nil, err @@ -110,11 +114,15 @@ func (s *inviteStatements) updateInviteRetired( defer (func() { err = rows.Close() })() for rows.Next() { var inviteEventID string - if err := rows.Scan(&inviteEventID); err != nil { + if err = rows.Scan(&inviteEventID); err != nil { return nil, err } eventIDs = append(eventIDs, inviteEventID) } + + // now retire the invites + stmt = txn.Stmt(s.updateInviteRetiredStmt) + _, err = stmt.ExecContext(ctx, roomNID, targetUserNID) return } diff --git a/roomserver/storage/sqlite3/room_aliases_table.go b/roomserver/storage/sqlite3/room_aliases_table.go index a5fd5449a..71238b0e4 100644 --- a/roomserver/storage/sqlite3/room_aliases_table.go +++ b/roomserver/storage/sqlite3/room_aliases_table.go @@ -103,6 +103,8 @@ func (s *roomAliasesStatements) selectAliasesFromRoomID( return } + defer rows.Close() // nolint: errcheck + for rows.Next() { var alias string if err = rows.Scan(&alias); err != nil { diff --git a/roomserver/storage/sqlite3/state_block_table.go b/roomserver/storage/sqlite3/state_block_table.go index ac593546a..d75abceec 100644 --- a/roomserver/storage/sqlite3/state_block_table.go +++ b/roomserver/storage/sqlite3/state_block_table.go @@ -30,7 +30,7 @@ import ( const stateDataSchema = ` CREATE TABLE IF NOT EXISTS roomserver_state_block ( - state_block_nid INTEGER PRIMARY KEY AUTOINCREMENT, + state_block_nid INTEGER NOT NULL, event_type_nid INTEGER NOT NULL, event_state_key_nid INTEGER NOT NULL, event_nid INTEGER NOT NULL, @@ -43,10 +43,7 @@ const insertStateDataSQL = "" + " VALUES ($1, $2, $3, $4)" const selectNextStateBlockNIDSQL = ` - SELECT COALESCE(( - SELECT seq+1 AS state_block_nid FROM sqlite_sequence - WHERE name = 'roomserver_state_block'), 1 - ) AS state_block_nid +SELECT IFNULL(MAX(state_block_nid), 0) + 1 FROM roomserver_state_block ` // Bulk state lookup by numeric state block ID. @@ -98,11 +95,19 @@ func (s *stateBlockStatements) prepare(db *sql.DB) (err error) { func (s *stateBlockStatements) bulkInsertStateData( ctx context.Context, txn *sql.Tx, - stateBlockNID types.StateBlockNID, entries []types.StateEntry, -) error { +) (types.StateBlockNID, error) { + if len(entries) == 0 { + return 0, nil + } + var stateBlockNID types.StateBlockNID + err := txn.Stmt(s.selectNextStateBlockNIDStmt).QueryRowContext(ctx).Scan(&stateBlockNID) + if err != nil { + return 0, err + } + for _, entry := range entries { - _, err := common.TxStmt(txn, s.insertStateDataStmt).ExecContext( + _, err := txn.Stmt(s.insertStateDataStmt).ExecContext( ctx, int64(stateBlockNID), int64(entry.EventTypeNID), @@ -110,20 +115,10 @@ func (s *stateBlockStatements) bulkInsertStateData( int64(entry.EventNID), ) if err != nil { - return err + return 0, err } } - return nil -} - -func (s *stateBlockStatements) selectNextStateBlockNID( - ctx context.Context, - txn *sql.Tx, -) (types.StateBlockNID, error) { - var stateBlockNID int64 - selectStmt := common.TxStmt(txn, s.selectNextStateBlockNIDStmt) - err := selectStmt.QueryRowContext(ctx).Scan(&stateBlockNID) - return types.StateBlockNID(stateBlockNID), err + return stateBlockNID, nil } func (s *stateBlockStatements) bulkSelectStateBlockEntries( diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index e20e8aed7..aebb308c4 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -54,7 +54,12 @@ func Open(dataSourceName string) (*Database, error) { } //d.db.Exec("PRAGMA journal_mode=WAL;") //d.db.Exec("PRAGMA read_uncommitted = true;") - d.db.SetMaxOpenConns(2) + + // FIXME: We are leaking connections somewhere. Setting this to 2 will eventually + // cause the roomserver to be unresponsive to new events because something will + // acquire the global mutex and never unlock it because it is waiting for a connection + // which it will never obtain. + d.db.SetMaxOpenConns(20) if err = d.statements.prepare(d.db); err != nil { return nil, err } @@ -253,12 +258,13 @@ func (d *Database) Events( ) ([]types.Event, error) { var eventJSONs []eventJSONPair var err error - results := make([]types.Event, len(eventNIDs)) + var results []types.Event err = common.WithTransaction(d.db, func(txn *sql.Tx) error { eventJSONs, err = d.statements.bulkSelectEventJSON(ctx, txn, eventNIDs) if err != nil || len(eventJSONs) == 0 { return nil } + results = make([]types.Event, len(eventJSONs)) for i, eventJSON := range eventJSONs { result := &results[i] result.EventNID = eventJSON.EventNID @@ -286,13 +292,10 @@ func (d *Database) AddState( err = common.WithTransaction(d.db, func(txn *sql.Tx) error { if len(state) > 0 { var stateBlockNID types.StateBlockNID - stateBlockNID, err = d.statements.selectNextStateBlockNID(ctx, txn) + stateBlockNID, err = d.statements.bulkInsertStateData(ctx, txn, state) if err != nil { return err } - if err = d.statements.bulkInsertStateData(ctx, txn, stateBlockNID, state); err != nil { - return err - } stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID) } stateNID, err = d.statements.insertState(ctx, txn, roomNID, stateBlockNIDs) @@ -602,8 +605,9 @@ func (d *Database) StateEntriesForTuples( // MembershipUpdater implements input.RoomEventDatabase func (d *Database) MembershipUpdater( ctx context.Context, roomID, targetUserID string, -) (types.MembershipUpdater, error) { - txn, err := d.db.Begin() +) (updater types.MembershipUpdater, err error) { + var txn *sql.Tx + txn, err = d.db.Begin() if err != nil { return nil, err } @@ -611,6 +615,18 @@ func (d *Database) MembershipUpdater( defer func() { if !succeeded { txn.Rollback() // nolint: errcheck + } else { + // TODO: We should be holding open this transaction but we cannot have + // multiple write transactions on sqlite. The code will perform additional + // write transactions independent of this one which will consistently cause + // 'database is locked' errors. For now, we'll break up the transaction and + // hope we don't race too catastrophically. Long term, we should be able to + // thread in txn objects where appropriate (either at the interface level or + // bring matrix business logic into the storage layer). + txerr := txn.Commit() + if err == nil && txerr != nil { + err = txerr + } } }() @@ -624,7 +640,7 @@ func (d *Database) MembershipUpdater( return nil, err } - updater, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID) + updater, err = d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID) if err != nil { return nil, err } @@ -658,7 +674,8 @@ func (d *Database) membershipUpdaterTxn( } return &membershipUpdater{ - transaction{ctx, txn}, d, roomNID, targetUserNID, membership, + // purposefully set the txn to nil so if we try to use it we panic and fail fast + transaction{ctx, nil}, d, roomNID, targetUserNID, membership, }, nil } diff --git a/show-expected-fail-tests.sh b/show-expected-fail-tests.sh index d3872ad59..9cd51b007 100755 --- a/show-expected-fail-tests.sh +++ b/show-expected-fail-tests.sh @@ -76,14 +76,30 @@ while read -r test_name; do fi done <<< "${passed_but_expected_fail}" +# TODO: Check that the same test doesn't exist in both the whitelist and blacklist +# TODO: Check that the same test doesn't appear twice in the whitelist|blacklist + +# Trim test output strings +tests_to_add=$(echo -e $tests_to_add | xargs) +already_in_whitelist=$(echo -e $already_in_whitelist | xargs) + +# Format output with markdown for buildkite annotation rendering purposes +if [ -n "${tests_to_add}" ] && [ -n "${already_in_whitelist}" ]; then + echo "### 📜 SyTest Whitelist Maintenance" +fi + if [ -n "${tests_to_add}" ]; then - echo "ERROR: The following passed tests are not present in $2. Please append them to the file:" + echo "**ERROR**: The following tests passed but are not present in \`$2\`. Please append them to the file:" + echo "\`\`\`" echo -e "${tests_to_add}" + echo "\`\`\`" fi if [ -n "${already_in_whitelist}" ]; then - echo "WARN: Tests in the whitelist still marked as expected fail:" + echo "**WARN**: Tests in the whitelist still marked as **expected fail**:" + echo "\`\`\`" echo -e "${already_in_whitelist}" + echo "\`\`\`" fi exit ${fail_build} diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index ba1b7dc59..5dbef4b7d 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -158,6 +158,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ "event": string(msg.Event.JSON()), + "pdupos": pduPos, log.ErrorKey: err, }).Panicf("roomserver output log: write invite failure") return nil diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go index 3274e66ea..71105d0c3 100644 --- a/syncapi/storage/sqlite3/account_data_table.go +++ b/syncapi/storage/sqlite3/account_data_table.go @@ -19,15 +19,13 @@ import ( "context" "database/sql" - "github.com/lib/pq" - "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) const accountDataSchema = ` CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id INTEGER PRIMARY KEY, user_id TEXT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, @@ -43,9 +41,7 @@ const insertAccountDataSQL = "" + const selectAccountDataInRangeSQL = "" + "SELECT room_id, type FROM syncapi_account_data_type" + " WHERE user_id = $1 AND id > $2 AND id <= $3" + - " AND ( $4 IS NULL OR type IN ($4) )" + - " AND ( $5 IS NULL OR NOT(type IN ($5)) )" + - " ORDER BY id ASC LIMIT $6" + " ORDER BY id ASC" const selectMaxAccountDataIDSQL = "" + "SELECT MAX(id) FROM syncapi_account_data_type" @@ -53,8 +49,8 @@ const selectMaxAccountDataIDSQL = "" + type accountDataStatements struct { streamIDStatements *streamIDStatements insertAccountDataStmt *sql.Stmt - selectAccountDataInRangeStmt *sql.Stmt selectMaxAccountDataIDStmt *sql.Stmt + selectAccountDataInRangeStmt *sql.Stmt } func (s *accountDataStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { @@ -66,10 +62,10 @@ func (s *accountDataStatements) prepare(db *sql.DB, streamID *streamIDStatements if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil { return } - if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { + if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil { return } - if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil { + if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { return } return @@ -83,8 +79,7 @@ func (s *accountDataStatements) insertAccountData( if err != nil { return } - insertStmt := common.TxStmt(txn, s.insertAccountDataStmt) - _, err = insertStmt.ExecContext(ctx, pos, userID, roomID, dataType) + _, err = txn.Stmt(s.insertAccountDataStmt).ExecContext(ctx, pos, userID, roomID, dataType) return } @@ -103,14 +98,13 @@ func (s *accountDataStatements) selectAccountDataInRange( oldPos-- } - rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos, - pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.Types)), - pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.NotTypes)), - accountDataFilterPart.Limit, - ) + rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos) if err != nil { return } + defer rows.Close() // nolint: errcheck + + var entries int for rows.Next() { var dataType string @@ -120,22 +114,41 @@ func (s *accountDataStatements) selectAccountDataInRange( return } + // check if we should add this by looking at the filter. + // It would be nice if we could do this in SQL-land, but the mix of variadic + // and positional parameters makes the query annoyingly hard to do, it's easier + // and clearer to do it in Go-land. If there are no filters for [not]types then + // this gets skipped. + for _, includeType := range accountDataFilterPart.Types { + if includeType != dataType { // TODO: wildcard support + continue + } + } + for _, excludeType := range accountDataFilterPart.NotTypes { + if excludeType == dataType { // TODO: wildcard support + continue + } + } + if len(data[roomID]) > 0 { data[roomID] = append(data[roomID], dataType) } else { data[roomID] = []string{dataType} } + entries++ + if entries >= accountDataFilterPart.Limit { + break + } } - return + return data, nil } func (s *accountDataStatements) selectMaxAccountDataID( ctx context.Context, txn *sql.Tx, ) (id int64, err error) { var nullableID sql.NullInt64 - stmt := common.TxStmt(txn, s.selectMaxAccountDataIDStmt) - err = stmt.QueryRowContext(ctx).Scan(&nullableID) + err = txn.Stmt(s.selectMaxAccountDataIDStmt).QueryRowContext(ctx).Scan(&nullableID) if nullableID.Valid { id = nullableID.Int64 } diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index 4ce946667..eb969c956 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -19,6 +19,7 @@ import ( "context" "database/sql" "encoding/json" + "strings" "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -88,7 +89,6 @@ type currentRoomStateStatements struct { selectRoomIDsWithMembershipStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt - selectEventsWithEventIDsStmt *sql.Stmt selectStateEventStmt *sql.Stmt } @@ -113,9 +113,6 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB, streamID *streamIDState if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { return } - if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { - return - } if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { return } @@ -233,8 +230,12 @@ func (s *currentRoomStateStatements) upsertRoomState( func (s *currentRoomStateStatements) selectEventsWithEventIDs( ctx context.Context, txn *sql.Tx, eventIDs []string, ) ([]types.StreamEvent, error) { - stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt) - rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) + iEventIDs := make([]interface{}, len(eventIDs)) + for k, v := range eventIDs { + iEventIDs[k] = v + } + query := strings.Replace(selectEventsWithEventIDsSQL, "($1)", common.QueryVariadic(len(iEventIDs)), 1) + rows, err := txn.QueryContext(ctx, query, iEventIDs...) if err != nil { return nil, err } diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go index 74dba245b..baf8871bd 100644 --- a/syncapi/storage/sqlite3/invites_table.go +++ b/syncapi/storage/sqlite3/invites_table.go @@ -26,7 +26,7 @@ import ( const inviteEventsSchema = ` CREATE TABLE IF NOT EXISTS syncapi_invite_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id INTEGER PRIMARY KEY, event_id TEXT NOT NULL, room_id TEXT NOT NULL, target_user_id TEXT NOT NULL, @@ -39,11 +39,8 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx ON syncapi_invite_events const insertInviteEventSQL = "" + "INSERT INTO syncapi_invite_events" + - " (room_id, event_id, target_user_id, event_json)" + - " VALUES ($1, $2, $3, $4)" - -const selectLastInsertedInviteEventSQL = "" + - "SELECT id FROM syncapi_invite_events WHERE rowid = last_insert_rowid()" + " (id, room_id, event_id, target_user_id, event_json)" + + " VALUES ($1, $2, $3, $4, $5)" const deleteInviteEventSQL = "" + "DELETE FROM syncapi_invite_events WHERE event_id = $1" @@ -57,12 +54,11 @@ const selectMaxInviteIDSQL = "" + "SELECT MAX(id) FROM syncapi_invite_events" type inviteEventsStatements struct { - streamIDStatements *streamIDStatements - insertInviteEventStmt *sql.Stmt - selectLastInsertedInviteEventStmt *sql.Stmt - selectInviteEventsInRangeStmt *sql.Stmt - deleteInviteEventStmt *sql.Stmt - selectMaxInviteIDStmt *sql.Stmt + streamIDStatements *streamIDStatements + insertInviteEventStmt *sql.Stmt + selectInviteEventsInRangeStmt *sql.Stmt + deleteInviteEventStmt *sql.Stmt + selectMaxInviteIDStmt *sql.Stmt } func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { @@ -74,9 +70,6 @@ func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatement if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil { return } - if s.selectLastInsertedInviteEventStmt, err = db.Prepare(selectLastInsertedInviteEventSQL); err != nil { - return - } if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil { return } @@ -90,19 +83,16 @@ func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatement } func (s *inviteEventsStatements) insertInviteEvent( - ctx context.Context, inviteEvent gomatrixserverlib.Event, -) (streamPos types.StreamPosition, err error) { - _, err = s.insertInviteEventStmt.ExecContext( + ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.Event, streamPos types.StreamPosition, +) (err error) { + _, err = txn.Stmt(s.insertInviteEventStmt).ExecContext( ctx, + streamPos, inviteEvent.RoomID(), inviteEvent.EventID(), *inviteEvent.StateKey(), inviteEvent.JSON(), ) - if err != nil { - return - } - err = s.selectLastInsertedInviteEventStmt.QueryRowContext(ctx).Scan(&streamPos) return } diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 8c01f2ced..4535688df 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -54,9 +54,6 @@ const insertEventSQL = "" + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = $11" -const selectLastInsertedEventSQL = "" + - "SELECT id FROM syncapi_output_room_events WHERE rowid = last_insert_rowid()" - const selectEventsSQL = "" + "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = $1" @@ -105,7 +102,6 @@ const selectStateInRangeSQL = "" + type outputRoomEventsStatements struct { streamIDStatements *streamIDStatements insertEventStmt *sql.Stmt - selectLastInsertedEventStmt *sql.Stmt selectEventsStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt selectRecentEventsStmt *sql.Stmt @@ -123,9 +119,6 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB, streamID *streamIDState if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { return } - if s.selectLastInsertedEventStmt, err = db.Prepare(selectLastInsertedEventSQL); err != nil { - return - } if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { return } @@ -270,7 +263,6 @@ func (s *outputRoomEventsStatements) insertEvent( } insertStmt := common.TxStmt(txn, s.insertEventStmt) - selectStmt := common.TxStmt(txn, s.selectLastInsertedEventStmt) _, err = insertStmt.ExecContext( ctx, streamPos, @@ -286,10 +278,6 @@ func (s *outputRoomEventsStatements) insertEvent( txnID, excludeFromSync, ) - if err != nil { - return - } - err = selectStmt.QueryRowContext(ctx).Scan(&streamPos) return } diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 8cfc1884f..6ad3419c7 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -193,24 +193,20 @@ func (d *SyncServerDatasource) WriteEvent( ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, ) if err != nil { - fmt.Println("d.events.insertEvent:", err) return err } pduPosition = pos if err = d.topology.insertEventInTopology(ctx, txn, ev); err != nil { - fmt.Println("d.topology.insertEventInTopology:", err) return err } if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil { - fmt.Println("d.handleBackwardExtremities:", err) return err } if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. - fmt.Println("nothing to do") return nil } @@ -340,8 +336,12 @@ func (d *SyncServerDatasource) GetEventsInRange( } // SyncPosition returns the latest positions for syncing. -func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.PaginationToken, error) { - return d.syncPositionTx(ctx, nil) +func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.PaginationToken, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + tok, err = d.syncPositionTx(ctx, txn) + return err + }) + return } // BackwardExtremitiesForRoom returns the event IDs of all of the backward @@ -380,8 +380,12 @@ func (d *SyncServerDatasource) EventPositionInTopology( } // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. -func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { - return d.syncStreamPositionTx(ctx, nil) +func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (pos types.StreamPosition, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + pos, err = d.syncStreamPositionTx(ctx, txn) + return err + }) + return } func (d *SyncServerDatasource) syncStreamPositionTx( @@ -625,18 +629,15 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( if err != nil { return } - fmt.Println("Joined rooms:", joinedRoomIDs) stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request // Build up a /sync response. Add joined rooms. for _, roomID := range joinedRoomIDs { - fmt.Println("WE'RE ON", roomID) var stateEvents []gomatrixserverlib.Event stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart) if err != nil { - fmt.Println("d.roomstate.selectCurrentState:", err) return } //fmt.Println("State events:", stateEvents) @@ -648,7 +649,6 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( numRecentEventsPerRoom, true, true, ) if err != nil { - fmt.Println("d.events.selectRecentEvents:", err) return } //fmt.Println("Recent stream events:", recentStreamEvents) @@ -658,10 +658,9 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( var backwardTopologyPos types.StreamPosition backwardTopologyPos, err = d.topology.selectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) if err != nil { - fmt.Println("d.topology.selectPositionInTopology:", err) return nil, types.PaginationToken{}, []string{}, err } - fmt.Println("Backward topology position:", backwardTopologyPos) + if backwardTopologyPos-1 <= 0 { backwardTopologyPos = types.StreamPosition(1) } else { @@ -683,7 +682,6 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( } if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition, res); err != nil { - fmt.Println("d.addInvitesToResponse:", err) return } @@ -744,18 +742,10 @@ func (d *SyncServerDatasource) GetAccountDataInRange( func (d *SyncServerDatasource) UpsertAccountData( ctx context.Context, userID, roomID, dataType string, ) (sp types.StreamPosition, err error) { - txn, err := d.db.BeginTx(ctx, nil) - if err != nil { - return types.StreamPosition(0), err - } - var succeeded bool - defer func() { - txerr := common.EndTransaction(txn, &succeeded) - if err == nil && txerr != nil { - err = txerr - } - }() - sp, err = d.accountData.insertAccountData(ctx, txn, userID, roomID, dataType) + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + sp, err = d.accountData.insertAccountData(ctx, txn, userID, roomID, dataType) + return err + }) return } @@ -764,8 +754,15 @@ func (d *SyncServerDatasource) UpsertAccountData( // Returns an error if there was a problem communicating with the database. func (d *SyncServerDatasource) AddInviteEvent( ctx context.Context, inviteEvent gomatrixserverlib.Event, -) (types.StreamPosition, error) { - return d.invites.insertInviteEvent(ctx, inviteEvent) +) (streamPos types.StreamPosition, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + streamPos, err = d.streamID.nextStreamID(ctx, txn) + if err != nil { + return err + } + return d.invites.insertInviteEvent(ctx, txn, inviteEvent, streamPos) + }) + return } // RetireInviteEvent removes an old invite event from the database. diff --git a/sytest-whitelist b/sytest-whitelist index 47fd58286..eb91b3f0c 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -183,7 +183,6 @@ GET /directory/room/:room_alias yields room ID PUT /directory/room/:room_alias creates alias Room aliases can contain Unicode Creators can delete alias -Alias creators can delete canonical alias with no ops Regular users cannot create room aliases within the AS namespace Deleting a non-existent alias should return a 404 Users can't delete other's aliases