From 7d38e82f257a64d9fb78a0ab695151b7c809f152 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erikj@jki.re>
Date: Mon, 4 Dec 2017 18:07:52 +0000
Subject: [PATCH] Pass client transaction IDs along the kafka streams (#362)

---
 .../dendrite/clientapi/producers/roomserver.go | 10 ++++++----
 .../dendrite/clientapi/routing/createroom.go   |  2 +-
 .../dendrite/clientapi/routing/joinroom.go     |  2 +-
 .../dendrite/clientapi/routing/membership.go   |  2 +-
 .../dendrite/clientapi/routing/profile.go      |  4 ++--
 .../dendrite/clientapi/routing/sendevent.go    | 12 ++++++++++--
 .../dendrite/clientapi/threepid/invites.go     |  2 +-
 .../cmd/roomserver-integration-tests/main.go   |  3 ++-
 .../dendrite/federationapi/routing/join.go     |  2 +-
 .../dendrite/federationapi/routing/send.go     |  2 +-
 .../dendrite/federationapi/routing/threepid.go |  4 ++--
 .../dendrite/roomserver/api/input.go           | 10 ++++++++++
 .../dendrite/roomserver/api/output.go          |  3 +++
 .../dendrite/roomserver/input/events.go        |  2 +-
 .../dendrite/roomserver/input/latest_events.go | 18 +++++++++++-------
 15 files changed, 53 insertions(+), 25 deletions(-)

diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go
index 4e02fac5a..e7a8497fa 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go
@@ -36,14 +36,16 @@ func NewRoomserverProducer(inputAPI api.RoomserverInputAPI) *RoomserverProducer
 // SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
 func (c *RoomserverProducer) SendEvents(
 	ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName,
+	txnID *api.TransactionID,
 ) error {
 	ires := make([]api.InputRoomEvent, len(events))
 	for i, event := range events {
 		ires[i] = api.InputRoomEvent{
-			Kind:         api.KindNew,
-			Event:        event,
-			AuthEventIDs: event.AuthEventIDs(),
-			SendAsServer: string(sendAsServer),
+			Kind:          api.KindNew,
+			Event:         event,
+			AuthEventIDs:  event.AuthEventIDs(),
+			SendAsServer:  string(sendAsServer),
+			TransactionID: txnID,
 		}
 	}
 	return c.SendInputRoomEvents(ctx, ires)
diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/createroom.go b/src/github.com/matrix-org/dendrite/clientapi/routing/createroom.go
index c84c1f7cf..e495e4482 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/routing/createroom.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/routing/createroom.go
@@ -214,7 +214,7 @@ func createRoom(req *http.Request, device *authtypes.Device,
 	}
 
 	// send events to the room server
-	err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName)
+	err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil)
 	if err != nil {
 		return httputil.LogThenError(req, err)
 	}
diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/joinroom.go b/src/github.com/matrix-org/dendrite/clientapi/routing/joinroom.go
index 73a751acf..aba49cd65 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/routing/joinroom.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/routing/joinroom.go
@@ -217,7 +217,7 @@ func (r joinRoomReq) joinRoomUsingServers(
 	var queryRes api.QueryLatestEventsAndStateResponse
 	event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.queryAPI, &queryRes)
 	if err == nil {
-		if err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName); err != nil {
+		if err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName, nil); err != nil {
 			return httputil.LogThenError(r.req, err)
 		}
 		return util.JSONResponse{
diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/membership.go b/src/github.com/matrix-org/dendrite/clientapi/routing/membership.go
index e94fbde7d..8e2e87ad3 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/routing/membership.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/routing/membership.go
@@ -98,7 +98,7 @@ func SendMembership(
 	}
 
 	if err := producer.SendEvents(
-		req.Context(), []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName,
+		req.Context(), []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil,
 	); err != nil {
 		return httputil.LogThenError(req, err)
 	}
diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/profile.go b/src/github.com/matrix-org/dendrite/clientapi/routing/profile.go
index 1403a8292..ddbae50ca 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/routing/profile.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/routing/profile.go
@@ -138,7 +138,7 @@ func SetAvatarURL(
 		return httputil.LogThenError(req, err)
 	}
 
-	if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName); err != nil {
+	if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
 		return httputil.LogThenError(req, err)
 	}
 
@@ -230,7 +230,7 @@ func SetDisplayName(
 		return httputil.LogThenError(req, err)
 	}
 
-	if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName); err != nil {
+	if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
 		return httputil.LogThenError(req, err)
 	}
 
diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go b/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go
index dc2f58f6a..5b3803bb1 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go
@@ -41,7 +41,7 @@ type sendEventResponse struct {
 func SendEvent(
 	req *http.Request,
 	device *authtypes.Device,
-	roomID, eventType string, _, stateKey *string,
+	roomID, eventType string, txnID, stateKey *string,
 	cfg config.Dendrite,
 	queryAPI api.RoomserverQueryAPI,
 	producer *producers.RoomserverProducer,
@@ -90,9 +90,17 @@ func SendEvent(
 		}
 	}
 
+	var txnAndDeviceID *api.TransactionID
+	if txnID != nil {
+		txnAndDeviceID = &api.TransactionID{
+			TransactionID: *txnID,
+			DeviceID:      device.ID,
+		}
+	}
+
 	// pass the new event to the roomserver
 	if err := producer.SendEvents(
-		req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName,
+		req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName, txnAndDeviceID,
 	); err != nil {
 		return httputil.LogThenError(req, err)
 	}
diff --git a/src/github.com/matrix-org/dendrite/clientapi/threepid/invites.go b/src/github.com/matrix-org/dendrite/clientapi/threepid/invites.go
index ea998e34a..84ba27641 100644
--- a/src/github.com/matrix-org/dendrite/clientapi/threepid/invites.go
+++ b/src/github.com/matrix-org/dendrite/clientapi/threepid/invites.go
@@ -355,5 +355,5 @@ func emit3PIDInviteEvent(
 		return err
 	}
 
-	return producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName)
+	return producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil)
 }
diff --git a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go
index 126a91437..d4a8a1d10 100644
--- a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go
+++ b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go
@@ -387,7 +387,8 @@ func main() {
 			"adds_state_event_ids":["$1463671337126266wrSBX:matrix.org", "$1463671339126270PnVwC:matrix.org"],
 			"removes_state_event_ids":null,
 			"last_sent_event_id":"",
-			"send_as_server":""
+			"send_as_server":"",
+			"transaction_id": null
 		}}`,
 	}
 
diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/join.go b/src/github.com/matrix-org/dendrite/federationapi/routing/join.go
index b52174a84..585c74e6b 100644
--- a/src/github.com/matrix-org/dendrite/federationapi/routing/join.go
+++ b/src/github.com/matrix-org/dendrite/federationapi/routing/join.go
@@ -169,7 +169,7 @@ func SendJoin(
 	// Send the events to the room server.
 	// We are responsible for notifying other servers that the user has joined
 	// the room, so set SendAsServer to cfg.Matrix.ServerName
-	err = producer.SendEvents(ctx, []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName)
+	err = producer.SendEvents(ctx, []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil)
 	if err != nil {
 		return httputil.LogThenError(httpReq, err)
 	}
diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/send.go b/src/github.com/matrix-org/dendrite/federationapi/routing/send.go
index 2103c2b8f..cbffd4cc8 100644
--- a/src/github.com/matrix-org/dendrite/federationapi/routing/send.go
+++ b/src/github.com/matrix-org/dendrite/federationapi/routing/send.go
@@ -170,7 +170,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
 	// TODO: Check that the event is allowed by its auth_events.
 
 	// pass the event to the roomserver
-	return t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers)
+	return t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers, nil)
 }
 
 func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {
diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/threepid.go b/src/github.com/matrix-org/dendrite/federationapi/routing/threepid.go
index cf261c450..6227d150d 100644
--- a/src/github.com/matrix-org/dendrite/federationapi/routing/threepid.go
+++ b/src/github.com/matrix-org/dendrite/federationapi/routing/threepid.go
@@ -81,7 +81,7 @@ func CreateInvitesFrom3PIDInvites(
 	}
 
 	// Send all the events
-	if err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName); err != nil {
+	if err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName, nil); err != nil {
 		return httputil.LogThenError(req, err)
 	}
 
@@ -154,7 +154,7 @@ func ExchangeThirdPartyInvite(
 
 	// Send the event to the roomserver
 	if err = producer.SendEvents(
-		httpReq.Context(), []gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName,
+		httpReq.Context(), []gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName, nil,
 	); err != nil {
 		return httputil.LogThenError(httpReq, err)
 	}
diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go
index fb4f7a616..504e751f9 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/api/input.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go
@@ -68,6 +68,16 @@ type InputRoomEvent struct {
 	// The server name to use to push this event to other servers.
 	// Or empty if this event shouldn't be pushed to other servers.
 	SendAsServer string `json:"send_as_server"`
+	// The transaction ID of the send request if sent by a local user and one
+	// was specified
+	TransactionID *TransactionID `json:"transaction_id"`
+}
+
+// TransactionID contains the transaction ID sent by a client when sending an
+// event, along with the ID of that device.
+type TransactionID struct {
+	DeviceID      string `json:"device_id"`
+	TransactionID string `json:"id"`
 }
 
 // InputInviteEvent is a matrix invite event received over federation without
diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go
index 6a5c924c6..c09d5a1e5 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go
@@ -107,6 +107,9 @@ type OutputNewRoomEvent struct {
 	// We encode the server name that the event should be sent using here to
 	// future proof the API for virtual hosting.
 	SendAsServer string `json:"send_as_server"`
+	// The transaction ID of the send request if sent by a local user and one
+	// was specified
+	TransactionID *TransactionID `json:"transaction_id"`
 }
 
 // An OutputNewInviteEvent is written whenever an invite becomes active.
diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go
index 9032219ee..91de64353 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go
@@ -129,7 +129,7 @@ func processRoomEvent(
 	}
 
 	// Update the extremities of the event graph for the room
-	return updateLatestEvents(ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer)
+	return updateLatestEvents(ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID)
 }
 
 func processInviteEvent(
diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go
index 5767daab8..2b82bcba2 100644
--- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go
+++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go
@@ -50,6 +50,7 @@ func updateLatestEvents(
 	stateAtEvent types.StateAtEvent,
 	event gomatrixserverlib.Event,
 	sendAsServer string,
+	transactionID *api.TransactionID,
 ) (err error) {
 	updater, err := db.GetLatestEventsForUpdate(ctx, roomNID)
 	if err != nil {
@@ -61,6 +62,7 @@ func updateLatestEvents(
 	u := latestEventsUpdater{
 		ctx: ctx, db: db, updater: updater, ow: ow, roomNID: roomNID,
 		stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer,
+		transactionID: transactionID,
 	}
 	if err = u.doUpdateLatestEvents(); err != nil {
 		return err
@@ -75,13 +77,14 @@ func updateLatestEvents(
 // The state could be passed using function arguments, but it becomes impractical
 // when there are so many variables to pass around.
 type latestEventsUpdater struct {
-	ctx          context.Context
-	db           RoomEventDatabase
-	updater      types.RoomRecentEventsUpdater
-	ow           OutputRoomEventWriter
-	roomNID      types.RoomNID
-	stateAtEvent types.StateAtEvent
-	event        gomatrixserverlib.Event
+	ctx           context.Context
+	db            RoomEventDatabase
+	updater       types.RoomRecentEventsUpdater
+	ow            OutputRoomEventWriter
+	roomNID       types.RoomNID
+	stateAtEvent  types.StateAtEvent
+	event         gomatrixserverlib.Event
+	transactionID *api.TransactionID
 	// Which server to send this event as.
 	sendAsServer string
 	// The eventID of the event that was processed before this one.
@@ -241,6 +244,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
 		Event:           u.event,
 		LastSentEventID: u.lastEventIDSent,
 		LatestEventIDs:  latestEventIDs,
+		TransactionID:   u.transactionID,
 	}
 
 	var stateEventNIDs []types.EventNID