diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 2a558c483..d8ce9727f 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -121,20 +121,8 @@ func (r *Inputer) processRoomEvent( } } - // If we don't have a transaction ID then get one. - if input.TransactionID != nil { - tdID := input.TransactionID - eventID, err = r.DB.GetTransactionEventID( - ctx, tdID.TransactionID, tdID.SessionID, event.Sender(), - ) - // On error OR event with the transaction already processed/processesing - if err != nil || eventID != "" { - return - } - } - // Store the event. - _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs, isRejected) + _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected) if err != nil { return "", fmt.Errorf("r.DB.StoreEvent: %w", err) } diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index d9d720f26..23b1f7370 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -562,7 +562,7 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs var stateAtEvent types.StateAtEvent var redactedEventID string var redactionEvent *gomatrixserverlib.Event - roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids, false) + roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), authNids, false) if err != nil { logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event") continue diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 62aa73ad4..7f6b98557 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -17,7 +17,6 @@ package storage import ( "context" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" @@ -69,7 +68,7 @@ type Database interface { SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error) // Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error. StoreEvent( - ctx context.Context, event *gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, + ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, isRejected bool, ) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) // Look up the state entries for a list of string event IDs @@ -92,10 +91,6 @@ type Database interface { // Returns the latest events in the room and the last eventID sent to the log along with an updater. // If this returns an error then no further action is required. GetLatestEventsForUpdate(ctx context.Context, roomInfo types.RoomInfo) (*shared.LatestEventsUpdater, error) - // Look up event ID by transaction's info. - // This is used to determine if the room event is processed/processing already. - // Returns an empty string if no such event exists. - GetTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (string, error) // Look up event references for the latest events in the room and the current state snapshot. // Returns the latest events, the current state and the maximum depth of the latest events plus 1. // Returns an error if there was a problem talking to the database. diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go index 863a15939..b5e05c982 100644 --- a/roomserver/storage/postgres/storage.go +++ b/roomserver/storage/postgres/storage.go @@ -82,9 +82,6 @@ func (d *Database) create(db *sql.DB) error { if err := createRoomsTable(db); err != nil { return err } - if err := createTransactionsTable(db); err != nil { - return err - } if err := createStateBlockTable(db); err != nil { return err } @@ -134,10 +131,6 @@ func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error { if err != nil { return err } - transactions, err := prepareTransactionsTable(db) - if err != nil { - return err - } stateBlock, err := prepareStateBlockTable(db) if err != nil { return err @@ -179,7 +172,6 @@ func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error { EventJSONTable: eventJSON, EventsTable: events, RoomsTable: rooms, - TransactionsTable: transactions, StateBlockTable: stateBlock, StateSnapshotTable: stateSnapshot, PrevEventsTable: prevEvents, diff --git a/roomserver/storage/postgres/transactions_table.go b/roomserver/storage/postgres/transactions_table.go deleted file mode 100644 index af023b740..000000000 --- a/roomserver/storage/postgres/transactions_table.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package postgres - -import ( - "context" - "database/sql" - - "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/roomserver/storage/tables" -) - -const transactionsSchema = ` --- The transactions table holds transaction IDs with sender's info and event ID it belongs to. --- This table is used by roomserver to prevent reprocessing of events. -CREATE TABLE IF NOT EXISTS roomserver_transactions ( - -- The transaction ID of the event. - transaction_id TEXT NOT NULL, - -- The session ID of the originating transaction. - session_id BIGINT NOT NULL, - -- User ID of the sender who authored the event - user_id TEXT NOT NULL, - -- Event ID corresponding to the transaction - -- Required to return event ID to client on a duplicate request. - event_id TEXT NOT NULL, - -- A transaction ID is unique for a user and device - -- This automatically creates an index. - PRIMARY KEY (transaction_id, session_id, user_id) -); -` -const insertTransactionSQL = "" + - "INSERT INTO roomserver_transactions (transaction_id, session_id, user_id, event_id)" + - " VALUES ($1, $2, $3, $4)" - -const selectTransactionEventIDSQL = "" + - "SELECT event_id FROM roomserver_transactions" + - " WHERE transaction_id = $1 AND session_id = $2 AND user_id = $3" - -type transactionStatements struct { - insertTransactionStmt *sql.Stmt - selectTransactionEventIDStmt *sql.Stmt -} - -func createTransactionsTable(db *sql.DB) error { - _, err := db.Exec(transactionsSchema) - return err -} - -func prepareTransactionsTable(db *sql.DB) (tables.Transactions, error) { - s := &transactionStatements{} - - return s, sqlutil.StatementList{ - {&s.insertTransactionStmt, insertTransactionSQL}, - {&s.selectTransactionEventIDStmt, selectTransactionEventIDSQL}, - }.Prepare(db) -} - -func (s *transactionStatements) InsertTransaction( - ctx context.Context, txn *sql.Tx, - transactionID string, - sessionID int64, - userID string, - eventID string, -) (err error) { - _, err = s.insertTransactionStmt.ExecContext( - ctx, transactionID, sessionID, userID, eventID, - ) - return -} - -func (s *transactionStatements) SelectTransactionEventID( - ctx context.Context, - transactionID string, - sessionID int64, - userID string, -) (eventID string, err error) { - err = s.selectTransactionEventIDStmt.QueryRowContext( - ctx, transactionID, sessionID, userID, - ).Scan(&eventID) - return -} diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 3685332fd..dbf706e5d 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -9,7 +9,6 @@ import ( "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" @@ -35,7 +34,6 @@ type Database struct { EventTypesTable tables.EventTypes EventStateKeysTable tables.EventStateKeys RoomsTable tables.Rooms - TransactionsTable tables.Transactions StateSnapshotTable tables.StateSnapshot StateBlockTable tables.StateBlock RoomAliasesTable tables.RoomAliases @@ -426,17 +424,6 @@ func (d *Database) Events( return results, nil } -func (d *Database) GetTransactionEventID( - ctx context.Context, transactionID string, - sessionID int64, userID string, -) (string, error) { - eventID, err := d.TransactionsTable.SelectTransactionEventID(ctx, transactionID, sessionID, userID) - if err == sql.ErrNoRows { - return "", nil - } - return eventID, err -} - func (d *Database) MembershipUpdater( ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, @@ -473,7 +460,7 @@ func (d *Database) GetLatestEventsForUpdate( func (d *Database) StoreEvent( ctx context.Context, event *gomatrixserverlib.Event, - txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, isRejected bool, + authEventNIDs []types.EventNID, isRejected bool, ) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { var ( roomNID types.RoomNID @@ -487,15 +474,6 @@ func (d *Database) StoreEvent( ) err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - if txnAndSessionID != nil { - if err = d.TransactionsTable.InsertTransaction( - ctx, txn, txnAndSessionID.TransactionID, - txnAndSessionID.SessionID, event.Sender(), event.EventID(), - ); err != nil { - return fmt.Errorf("d.TransactionsTable.InsertTransaction: %w", err) - } - } - // TODO: Here we should aim to have two different code paths for new rooms // vs existing ones. diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index e081acdbd..1fcc7989d 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -90,9 +90,6 @@ func (d *Database) create(db *sql.DB) error { if err := createRoomsTable(db); err != nil { return err } - if err := createTransactionsTable(db); err != nil { - return err - } if err := createStateBlockTable(db); err != nil { return err } @@ -142,10 +139,6 @@ func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error { if err != nil { return err } - transactions, err := prepareTransactionsTable(db) - if err != nil { - return err - } stateBlock, err := prepareStateBlockTable(db) if err != nil { return err @@ -187,7 +180,6 @@ func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error { EventStateKeysTable: eventStateKeys, EventJSONTable: eventJSON, RoomsTable: rooms, - TransactionsTable: transactions, StateBlockTable: stateBlock, StateSnapshotTable: stateSnapshot, PrevEventsTable: prevEvents, diff --git a/roomserver/storage/sqlite3/transactions_table.go b/roomserver/storage/sqlite3/transactions_table.go deleted file mode 100644 index 1fb0a831d..000000000 --- a/roomserver/storage/sqlite3/transactions_table.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sqlite3 - -import ( - "context" - "database/sql" - - "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/roomserver/storage/tables" -) - -const transactionsSchema = ` - CREATE TABLE IF NOT EXISTS roomserver_transactions ( - transaction_id TEXT NOT NULL, - session_id INTEGER NOT NULL, - user_id TEXT NOT NULL, - event_id TEXT NOT NULL, - PRIMARY KEY (transaction_id, session_id, user_id) - ); -` -const insertTransactionSQL = ` - INSERT INTO roomserver_transactions (transaction_id, session_id, user_id, event_id) - VALUES ($1, $2, $3, $4) -` - -const selectTransactionEventIDSQL = ` - SELECT event_id FROM roomserver_transactions - WHERE transaction_id = $1 AND session_id = $2 AND user_id = $3 -` - -type transactionStatements struct { - db *sql.DB - insertTransactionStmt *sql.Stmt - selectTransactionEventIDStmt *sql.Stmt -} - -func createTransactionsTable(db *sql.DB) error { - _, err := db.Exec(transactionsSchema) - return err -} - -func prepareTransactionsTable(db *sql.DB) (tables.Transactions, error) { - s := &transactionStatements{ - db: db, - } - - return s, sqlutil.StatementList{ - {&s.insertTransactionStmt, insertTransactionSQL}, - {&s.selectTransactionEventIDStmt, selectTransactionEventIDSQL}, - }.Prepare(db) -} - -func (s *transactionStatements) InsertTransaction( - ctx context.Context, txn *sql.Tx, - transactionID string, - sessionID int64, - userID string, - eventID string, -) error { - stmt := sqlutil.TxStmt(txn, s.insertTransactionStmt) - _, err := stmt.ExecContext( - ctx, transactionID, sessionID, userID, eventID, - ) - return err -} - -func (s *transactionStatements) SelectTransactionEventID( - ctx context.Context, - transactionID string, - sessionID int64, - userID string, -) (eventID string, err error) { - err = s.selectTransactionEventIDStmt.QueryRowContext( - ctx, transactionID, sessionID, userID, - ).Scan(&eventID) - return -} diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 8720d4007..6ad7ed2e8 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -76,11 +76,6 @@ type Rooms interface { BulkSelectRoomNIDs(ctx context.Context, roomIDs []string) ([]types.RoomNID, error) } -type Transactions interface { - InsertTransaction(ctx context.Context, txn *sql.Tx, transactionID string, sessionID int64, userID string, eventID string) error - SelectTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (eventID string, err error) -} - type StateSnapshot interface { InsertState(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs types.StateBlockNIDs) (stateNID types.StateSnapshotNID, err error) BulkSelectStateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)