diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 5fd45ccca..ed7c1f486 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -32,6 +32,7 @@ type RoomEventDatabase interface { StoreEvent( ctx context.Context, event gomatrixserverlib.Event, + txnAndDeviceID *api.TransactionID, authEventNIDs []types.EventNID, ) (types.RoomNID, types.StateAtEvent, error) // Look up the state entries for a list of string event IDs @@ -61,6 +62,13 @@ type RoomEventDatabase interface { MembershipUpdater( ctx context.Context, roomID, targerUserID string, ) (types.MembershipUpdater, error) + // Look up event ID by transaction's info. + // This is used to determine if the room event is processed/processing already. + // Returns an empty string if no such event exists. + GetTransactionEventID( + ctx context.Context, transactionID string, + deviceID string, userID string, + ) (string, error) } // OutputRoomEventWriter has the APIs needed to write an event to the output logs. @@ -89,8 +97,23 @@ func processRoomEvent( return err } + if input.TransactionID != nil { + var eventID string + tdID := input.TransactionID + eventID, err = db.GetTransactionEventID( + ctx, tdID.TransactionID, tdID.DeviceID, input.Event.Sender(), + ) + if err != nil { + return err + } + + if eventID != "" { + return nil + } + } + // Store the event - roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, authEventNIDs) + roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) if err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go index a24dbb1d3..05efa8dd4 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -30,6 +30,7 @@ type statements struct { roomAliasesStatements inviteStatements membershipStatements + transactionStatements } func (s *statements) prepare(db *sql.DB) error { @@ -47,6 +48,7 @@ func (s *statements) prepare(db *sql.DB) error { s.roomAliasesStatements.prepare, s.inviteStatements.prepare, s.membershipStatements.prepare, + s.transactionStatements.prepare, } { if err = prepare(db); err != nil { return err diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index b94036c9b..cd434da0c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -20,6 +20,7 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -45,7 +46,8 @@ func Open(dataSourceName string) (*Database, error) { // StoreEvent implements input.EventDatabase func (d *Database) StoreEvent( - ctx context.Context, event gomatrixserverlib.Event, authEventNIDs []types.EventNID, + ctx context.Context, event gomatrixserverlib.Event, + txnAndDeviceID *api.TransactionID, authEventNIDs []types.EventNID, ) (types.RoomNID, types.StateAtEvent, error) { var ( roomNID types.RoomNID @@ -56,6 +58,15 @@ func (d *Database) StoreEvent( err error ) + if txnAndDeviceID != nil { + if err = d.statements.insertTransaction( + ctx, txnAndDeviceID.TransactionID, + txnAndDeviceID.DeviceID, event.Sender(), event.EventID(), + ); err != nil { + return 0, types.StateAtEvent{}, err + } + } + if roomNID, err = d.assignRoomNID(ctx, nil, event.RoomID()); err != nil { return 0, types.StateAtEvent{}, err } @@ -308,6 +319,18 @@ func (d *Database) GetLatestEventsForUpdate( }, nil } +// GetTransactionEventID implements input.EventDatabase +func (d *Database) GetTransactionEventID( + ctx context.Context, transactionID string, + deviceID string, userID string, +) (string, error) { + eventID, err := d.statements.selectTransactionEventID(ctx, transactionID, deviceID, userID) + if err == sql.ErrNoRows { + return "", nil + } + return eventID, err +} + type roomRecentEventsUpdater struct { transaction d *Database diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/transactions_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/transactions_table.go new file mode 100644 index 000000000..9faaae654 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/transactions_table.go @@ -0,0 +1,88 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" +) + +const transactionsSchema = ` +-- The transactions table holds transaction IDs with sender's info and event ID it belongs to. +-- This table is used by roomserver to prevent reprocessing of events. +CREATE TABLE IF NOT EXISTS roomserver_transactions ( + -- The transaction ID of the event. + transaction_id TEXT NOT NULL, + -- The device ID of the originating transaction. + device_id TEXT NOT NULL, + -- User ID of the sender who authored the event + user_id TEXT NOT NULL, + -- Event ID corresponding to the transaction + -- Required to return event ID to client on a duplicate request. + event_id TEXT NOT NULL, + -- A transaction ID is unique for a user and device + -- This automatically creates an index. + CONSTRAINT roomserver_transaction_unique PRIMARY KEY (transaction_id, device_id, user_id) +); +` +const insertTransactionSQL = "" + + "INSERT INTO roomserver_transactions (transaction_id, device_id, user_id, event_id)" + + " VALUES ($1, $2, $3, $4)" + + " ON CONFLICT ON CONSTRAINT roomserver_transaction_unique" + + " DO NOTHING" + +const selectTransactionEventIDSQL = "" + + "SELECT event_id FROM roomserver_transactions" + + " WHERE transaction_id = $1 AND device_id = $2 AND user_id = $3" + +type transactionStatements struct { + insertTransactionStmt *sql.Stmt + selectTransactionEventIDStmt *sql.Stmt +} + +func (s *transactionStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(transactionsSchema) + if err != nil { + return + } + + return statementList{ + {&s.insertTransactionStmt, insertTransactionSQL}, + {&s.selectTransactionEventIDStmt, selectTransactionEventIDSQL}, + }.prepare(db) +} + +func (s *transactionStatements) insertTransaction( + ctx context.Context, + transactionID string, + deviceID string, + userID string, + eventID string, +) (err error) { + _, err = s.insertTransactionStmt.ExecContext( + ctx, transactionID, deviceID, userID, eventID, + ) + return +} + +func (s *transactionStatements) selectTransactionEventID( + ctx context.Context, + transactionID string, + deviceID string, + userID string, +) (eventID string, err error) { + err = s.selectTransactionEventIDStmt.QueryRowContext( + ctx, transactionID, deviceID, userID, + ).Scan(&eventID) + return +}