diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go index e7a8497fa..5e4a1de7e 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go @@ -37,7 +37,7 @@ func NewRoomserverProducer(inputAPI api.RoomserverInputAPI) *RoomserverProducer func (c *RoomserverProducer) SendEvents( ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID, -) error { +) (string, error) { ires := make([]api.InputRoomEvent, len(events)) for i, event := range events { ires[i] = api.InputRoomEvent{ @@ -55,10 +55,10 @@ func (c *RoomserverProducer) SendEvents( // with the state at the event as KindOutlier before it. func (c *RoomserverProducer) SendEventWithState( ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.Event, -) error { +) (string, error) { outliers, err := state.Events() if err != nil { - return err + return "", err } ires := make([]api.InputRoomEvent, len(outliers)+1) @@ -87,16 +87,22 @@ func (c *RoomserverProducer) SendEventWithState( } // SendInputRoomEvents writes the given input room events to the roomserver input API. -func (c *RoomserverProducer) SendInputRoomEvents(ctx context.Context, ires []api.InputRoomEvent) error { +func (c *RoomserverProducer) SendInputRoomEvents( + ctx context.Context, ires []api.InputRoomEvent, +) (eventID string, err error) { request := api.InputRoomEventsRequest{InputRoomEvents: ires} var response api.InputRoomEventsResponse - return c.InputAPI.InputRoomEvents(ctx, &request, &response) + err = c.InputAPI.InputRoomEvents(ctx, &request, &response) + eventID = response.EventID + return } // SendInvite writes the invite event to the roomserver input API. // This should only be needed for invite events that occur outside of a known room. // If we are in the room then the event should be sent using the SendEvents method. -func (c *RoomserverProducer) SendInvite(ctx context.Context, inviteEvent gomatrixserverlib.Event) error { +func (c *RoomserverProducer) SendInvite( + ctx context.Context, inviteEvent gomatrixserverlib.Event, +) error { request := api.InputRoomEventsRequest{ InputInviteEvents: []api.InputInviteEvent{{Event: inviteEvent}}, } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go b/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go index 46ede5983..1419df404 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go @@ -107,16 +107,18 @@ func SendEvent( } } - // pass the new event to the roomserver - if err := producer.SendEvents( + // pass the new event to the roomserver and receive the correct event ID + // event ID in case of duplicate transaction is discarded + eventID, err := producer.SendEvents( req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName, txnAndDeviceID, - ); err != nil { + ) + if err != nil { return httputil.LogThenError(req, err) } res := util.JSONResponse{ Code: http.StatusOK, - JSON: sendEventResponse{e.EventID()}, + JSON: sendEventResponse{eventID}, } // Add response to transactionsCache if txnID != nil { diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go index 504e751f9..e81e79203 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go @@ -94,7 +94,9 @@ type InputRoomEventsRequest struct { } // InputRoomEventsResponse is a response to InputRoomEvents -type InputRoomEventsResponse struct{} +type InputRoomEventsResponse struct { + EventID string `json:"event_id"` +} // RoomserverInputAPI is used to write events to the room server. type RoomserverInputAPI interface { diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index ed7c1f486..feb15b3e1 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -87,42 +87,38 @@ func processRoomEvent( db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent, -) error { +) (eventID string, err error) { // Parse and validate the event JSON event := input.Event // Check that the event passes authentication checks and work out the numeric IDs for the auth events. authEventNIDs, err := checkAuthEvents(ctx, db, event, input.AuthEventIDs) if err != nil { - return err + return } if input.TransactionID != nil { - var eventID string tdID := input.TransactionID eventID, err = db.GetTransactionEventID( ctx, tdID.TransactionID, tdID.DeviceID, input.Event.Sender(), ) - if err != nil { - return err - } - - if eventID != "" { - return nil + // On error OR event with the transaction already processed/processesing + if err != nil || eventID != "" { + return } } // Store the event roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) if err != nil { - return err + return } if input.Kind == api.KindOutlier { // For outliers we can stop after we've stored the event itself as it // doesn't have any associated state to store and we don't need to // notify anyone about it. - return nil + return event.EventID(), nil } if stateAtEvent.BeforeStateSnapshotNID == 0 { @@ -130,7 +126,7 @@ func processRoomEvent( // Lets calculate one. err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event) if err != nil { - return err + return } } @@ -140,7 +136,9 @@ func processRoomEvent( } // Update the extremities of the event graph for the room - return updateLatestEvents(ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID) + return event.EventID(), updateLatestEvents( + ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID, + ) } func calculateAndSetState( diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go index 98971bf8e..bd029d8df 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go @@ -60,17 +60,17 @@ func (r *RoomserverInputAPI) InputRoomEvents( ctx context.Context, request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, -) error { +) (err error) { // We lock as processRoomEvent can only be called once at a time r.mutex.Lock() defer r.mutex.Unlock() for i := range request.InputRoomEvents { - if err := processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { + if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { return err } } for i := range request.InputInviteEvents { - if err := processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil { + if err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil { return err } }