diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go index e7a8497fa..e50561a70 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go @@ -37,7 +37,7 @@ func NewRoomserverProducer(inputAPI api.RoomserverInputAPI) *RoomserverProducer func (c *RoomserverProducer) SendEvents( ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID, -) error { +) (string, error) { ires := make([]api.InputRoomEvent, len(events)) for i, event := range events { ires[i] = api.InputRoomEvent{ @@ -83,20 +83,27 @@ func (c *RoomserverProducer) SendEventWithState( StateEventIDs: stateEventIDs, } - return c.SendInputRoomEvents(ctx, ires) + _, err = c.SendInputRoomEvents(ctx, ires) + return err } // SendInputRoomEvents writes the given input room events to the roomserver input API. -func (c *RoomserverProducer) SendInputRoomEvents(ctx context.Context, ires []api.InputRoomEvent) error { +func (c *RoomserverProducer) SendInputRoomEvents( + ctx context.Context, ires []api.InputRoomEvent, +) (eventID string, err error) { request := api.InputRoomEventsRequest{InputRoomEvents: ires} var response api.InputRoomEventsResponse - return c.InputAPI.InputRoomEvents(ctx, &request, &response) + err = c.InputAPI.InputRoomEvents(ctx, &request, &response) + eventID = response.EventID + return } // SendInvite writes the invite event to the roomserver input API. // This should only be needed for invite events that occur outside of a known room. // If we are in the room then the event should be sent using the SendEvents method. -func (c *RoomserverProducer) SendInvite(ctx context.Context, inviteEvent gomatrixserverlib.Event) error { +func (c *RoomserverProducer) SendInvite( + ctx context.Context, inviteEvent gomatrixserverlib.Event, +) error { request := api.InputRoomEventsRequest{ InputInviteEvents: []api.InputInviteEvent{{Event: inviteEvent}}, } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/createroom.go b/src/github.com/matrix-org/dendrite/clientapi/routing/createroom.go index 05751f23a..edb1858d2 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/createroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/createroom.go @@ -261,7 +261,7 @@ func createRoom(req *http.Request, device *authtypes.Device, } // send events to the room server - err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil) + _, err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil) if err != nil { return httputil.LogThenError(req, err) } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/joinroom.go b/src/github.com/matrix-org/dendrite/clientapi/routing/joinroom.go index 806c5f08b..fa8425ca1 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/joinroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/joinroom.go @@ -217,7 +217,7 @@ func (r joinRoomReq) joinRoomUsingServers( var queryRes api.QueryLatestEventsAndStateResponse event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.queryAPI, &queryRes) if err == nil { - if err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName, nil); err != nil { + if _, err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName, nil); err != nil { return httputil.LogThenError(r.req, err) } return util.JSONResponse{ diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/membership.go b/src/github.com/matrix-org/dendrite/clientapi/routing/membership.go index 46de5b78f..05ebbdd82 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/membership.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/membership.go @@ -97,7 +97,7 @@ func SendMembership( return httputil.LogThenError(req, err) } - if err := producer.SendEvents( + if _, err := producer.SendEvents( req.Context(), []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil, ); err != nil { return httputil.LogThenError(req, err) diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/profile.go b/src/github.com/matrix-org/dendrite/clientapi/routing/profile.go index 2013a3580..6fb748fca 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/profile.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/profile.go @@ -138,7 +138,7 @@ func SetAvatarURL( return httputil.LogThenError(req, err) } - if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { + if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { return httputil.LogThenError(req, err) } @@ -230,7 +230,7 @@ func SetDisplayName( return httputil.LogThenError(req, err) } - if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { + if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { return httputil.LogThenError(req, err) } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go b/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go index 46ede5983..1419df404 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go @@ -107,16 +107,18 @@ func SendEvent( } } - // pass the new event to the roomserver - if err := producer.SendEvents( + // pass the new event to the roomserver and receive the correct event ID + // event ID in case of duplicate transaction is discarded + eventID, err := producer.SendEvents( req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName, txnAndDeviceID, - ); err != nil { + ) + if err != nil { return httputil.LogThenError(req, err) } res := util.JSONResponse{ Code: http.StatusOK, - JSON: sendEventResponse{e.EventID()}, + JSON: sendEventResponse{eventID}, } // Add response to transactionsCache if txnID != nil { diff --git a/src/github.com/matrix-org/dendrite/clientapi/threepid/invites.go b/src/github.com/matrix-org/dendrite/clientapi/threepid/invites.go index a7fdea69a..23b97e93b 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/threepid/invites.go +++ b/src/github.com/matrix-org/dendrite/clientapi/threepid/invites.go @@ -355,5 +355,6 @@ func emit3PIDInviteEvent( return err } - return producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil) + _, err = producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil) + return err } diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/join.go b/src/github.com/matrix-org/dendrite/federationapi/routing/join.go index d3e63df79..7bae4e707 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/routing/join.go +++ b/src/github.com/matrix-org/dendrite/federationapi/routing/join.go @@ -169,7 +169,7 @@ func SendJoin( // Send the events to the room server. // We are responsible for notifying other servers that the user has joined // the room, so set SendAsServer to cfg.Matrix.ServerName - err = producer.SendEvents(ctx, []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil) + _, err = producer.SendEvents(ctx, []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil) if err != nil { return httputil.LogThenError(httpReq, err) } diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/send.go b/src/github.com/matrix-org/dendrite/federationapi/routing/send.go index c0c425f5e..eab248745 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/routing/send.go +++ b/src/github.com/matrix-org/dendrite/federationapi/routing/send.go @@ -170,7 +170,8 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { // TODO: Check that the event is allowed by its auth_events. // pass the event to the roomserver - return t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers, nil) + _, err := t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers, nil) + return err } func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error { diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/threepid.go b/src/github.com/matrix-org/dendrite/federationapi/routing/threepid.go index 8537002f1..bc17060ce 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/routing/threepid.go +++ b/src/github.com/matrix-org/dendrite/federationapi/routing/threepid.go @@ -81,7 +81,7 @@ func CreateInvitesFrom3PIDInvites( } // Send all the events - if err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName, nil); err != nil { + if _, err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName, nil); err != nil { return httputil.LogThenError(req, err) } @@ -153,7 +153,7 @@ func ExchangeThirdPartyInvite( } // Send the event to the roomserver - if err = producer.SendEvents( + if _, err = producer.SendEvents( httpReq.Context(), []gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName, nil, ); err != nil { return httputil.LogThenError(httpReq, err) diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go index 504e751f9..e81e79203 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go @@ -94,7 +94,9 @@ type InputRoomEventsRequest struct { } // InputRoomEventsResponse is a response to InputRoomEvents -type InputRoomEventsResponse struct{} +type InputRoomEventsResponse struct { + EventID string `json:"event_id"` +} // RoomserverInputAPI is used to write events to the room server. type RoomserverInputAPI interface { diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index b498cc058..feb15b3e1 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -32,6 +32,7 @@ type RoomEventDatabase interface { StoreEvent( ctx context.Context, event gomatrixserverlib.Event, + txnAndDeviceID *api.TransactionID, authEventNIDs []types.EventNID, ) (types.RoomNID, types.StateAtEvent, error) // Look up the state entries for a list of string event IDs @@ -61,6 +62,13 @@ type RoomEventDatabase interface { MembershipUpdater( ctx context.Context, roomID, targerUserID string, ) (types.MembershipUpdater, error) + // Look up event ID by transaction's info. + // This is used to determine if the room event is processed/processing already. + // Returns an empty string if no such event exists. + GetTransactionEventID( + ctx context.Context, transactionID string, + deviceID string, userID string, + ) (string, error) } // OutputRoomEventWriter has the APIs needed to write an event to the output logs. @@ -79,52 +87,46 @@ func processRoomEvent( db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent, -) error { +) (eventID string, err error) { // Parse and validate the event JSON event := input.Event // Check that the event passes authentication checks and work out the numeric IDs for the auth events. authEventNIDs, err := checkAuthEvents(ctx, db, event, input.AuthEventIDs) if err != nil { - return err + return + } + + if input.TransactionID != nil { + tdID := input.TransactionID + eventID, err = db.GetTransactionEventID( + ctx, tdID.TransactionID, tdID.DeviceID, input.Event.Sender(), + ) + // On error OR event with the transaction already processed/processesing + if err != nil || eventID != "" { + return + } } // Store the event - roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, authEventNIDs) + roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) if err != nil { - return err + return } if input.Kind == api.KindOutlier { // For outliers we can stop after we've stored the event itself as it // doesn't have any associated state to store and we don't need to // notify anyone about it. - return nil + return event.EventID(), nil } if stateAtEvent.BeforeStateSnapshotNID == 0 { // We haven't calculated a state for this event yet. // Lets calculate one. - if input.HasState { - // We've been told what the state at the event is so we don't need to calculate it. - // Check that those state events are in the database and store the state. - var entries []types.StateEntry - if entries, err = db.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil { - return err - } - - if stateAtEvent.BeforeStateSnapshotNID, err = db.AddState(ctx, roomNID, nil, entries); err != nil { - return err - } - } else { - // We haven't been told what the state at the event is so we need to calculate it from the prev_events - if stateAtEvent.BeforeStateSnapshotNID, err = state.CalculateAndStoreStateBeforeEvent(ctx, db, event, roomNID); err != nil { - return err - } - } - err = db.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID) + err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event) if err != nil { - return err + return } } @@ -134,7 +136,38 @@ func processRoomEvent( } // Update the extremities of the event graph for the room - return updateLatestEvents(ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID) + return event.EventID(), updateLatestEvents( + ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID, + ) +} + +func calculateAndSetState( + ctx context.Context, + db RoomEventDatabase, + input api.InputRoomEvent, + roomNID types.RoomNID, + stateAtEvent *types.StateAtEvent, + event gomatrixserverlib.Event, +) error { + var err error + if input.HasState { + // We've been told what the state at the event is so we don't need to calculate it. + // Check that those state events are in the database and store the state. + var entries []types.StateEntry + if entries, err = db.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil { + return err + } + + if stateAtEvent.BeforeStateSnapshotNID, err = db.AddState(ctx, roomNID, nil, entries); err != nil { + return err + } + } else { + // We haven't been told what the state at the event is so we need to calculate it from the prev_events + if stateAtEvent.BeforeStateSnapshotNID, err = state.CalculateAndStoreStateBeforeEvent(ctx, db, event, roomNID); err != nil { + return err + } + } + return db.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID) } func processInviteEvent( diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go index 98971bf8e..bd029d8df 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go @@ -60,17 +60,17 @@ func (r *RoomserverInputAPI) InputRoomEvents( ctx context.Context, request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, -) error { +) (err error) { // We lock as processRoomEvent can only be called once at a time r.mutex.Lock() defer r.mutex.Unlock() for i := range request.InputRoomEvents { - if err := processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { + if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { return err } } for i := range request.InputInviteEvents { - if err := processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil { + if err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil { return err } } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go index a24dbb1d3..05efa8dd4 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -30,6 +30,7 @@ type statements struct { roomAliasesStatements inviteStatements membershipStatements + transactionStatements } func (s *statements) prepare(db *sql.DB) error { @@ -47,6 +48,7 @@ func (s *statements) prepare(db *sql.DB) error { s.roomAliasesStatements.prepare, s.inviteStatements.prepare, s.membershipStatements.prepare, + s.transactionStatements.prepare, } { if err = prepare(db); err != nil { return err diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index b94036c9b..cd434da0c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -20,6 +20,7 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -45,7 +46,8 @@ func Open(dataSourceName string) (*Database, error) { // StoreEvent implements input.EventDatabase func (d *Database) StoreEvent( - ctx context.Context, event gomatrixserverlib.Event, authEventNIDs []types.EventNID, + ctx context.Context, event gomatrixserverlib.Event, + txnAndDeviceID *api.TransactionID, authEventNIDs []types.EventNID, ) (types.RoomNID, types.StateAtEvent, error) { var ( roomNID types.RoomNID @@ -56,6 +58,15 @@ func (d *Database) StoreEvent( err error ) + if txnAndDeviceID != nil { + if err = d.statements.insertTransaction( + ctx, txnAndDeviceID.TransactionID, + txnAndDeviceID.DeviceID, event.Sender(), event.EventID(), + ); err != nil { + return 0, types.StateAtEvent{}, err + } + } + if roomNID, err = d.assignRoomNID(ctx, nil, event.RoomID()); err != nil { return 0, types.StateAtEvent{}, err } @@ -308,6 +319,18 @@ func (d *Database) GetLatestEventsForUpdate( }, nil } +// GetTransactionEventID implements input.EventDatabase +func (d *Database) GetTransactionEventID( + ctx context.Context, transactionID string, + deviceID string, userID string, +) (string, error) { + eventID, err := d.statements.selectTransactionEventID(ctx, transactionID, deviceID, userID) + if err == sql.ErrNoRows { + return "", nil + } + return eventID, err +} + type roomRecentEventsUpdater struct { transaction d *Database diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/transactions_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/transactions_table.go new file mode 100644 index 000000000..e9c904cc8 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/transactions_table.go @@ -0,0 +1,86 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" +) + +const transactionsSchema = ` +-- The transactions table holds transaction IDs with sender's info and event ID it belongs to. +-- This table is used by roomserver to prevent reprocessing of events. +CREATE TABLE IF NOT EXISTS roomserver_transactions ( + -- The transaction ID of the event. + transaction_id TEXT NOT NULL, + -- The device ID of the originating transaction. + device_id TEXT NOT NULL, + -- User ID of the sender who authored the event + user_id TEXT NOT NULL, + -- Event ID corresponding to the transaction + -- Required to return event ID to client on a duplicate request. + event_id TEXT NOT NULL, + -- A transaction ID is unique for a user and device + -- This automatically creates an index. + PRIMARY KEY (transaction_id, device_id, user_id) +); +` +const insertTransactionSQL = "" + + "INSERT INTO roomserver_transactions (transaction_id, device_id, user_id, event_id)" + + " VALUES ($1, $2, $3, $4)" + +const selectTransactionEventIDSQL = "" + + "SELECT event_id FROM roomserver_transactions" + + " WHERE transaction_id = $1 AND device_id = $2 AND user_id = $3" + +type transactionStatements struct { + insertTransactionStmt *sql.Stmt + selectTransactionEventIDStmt *sql.Stmt +} + +func (s *transactionStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(transactionsSchema) + if err != nil { + return + } + + return statementList{ + {&s.insertTransactionStmt, insertTransactionSQL}, + {&s.selectTransactionEventIDStmt, selectTransactionEventIDSQL}, + }.prepare(db) +} + +func (s *transactionStatements) insertTransaction( + ctx context.Context, + transactionID string, + deviceID string, + userID string, + eventID string, +) (err error) { + _, err = s.insertTransactionStmt.ExecContext( + ctx, transactionID, deviceID, userID, eventID, + ) + return +} + +func (s *transactionStatements) selectTransactionEventID( + ctx context.Context, + transactionID string, + deviceID string, + userID string, +) (eventID string, err error) { + err = s.selectTransactionEventIDStmt.QueryRowContext( + ctx, transactionID, deviceID, userID, + ).Scan(&eventID) + return +}