From 46af57c1d9c9e8b60a7822248b7de315d2f887fd Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 10 Aug 2021 13:21:17 +0100 Subject: [PATCH] Initial work on signing key update EDUs --- eduserver/api/input.go | 12 ++++++++ eduserver/api/output.go | 36 ++--------------------- eduserver/api/types.go | 47 ++++++++++++++++++++++++++++++ eduserver/eduserver.go | 1 + eduserver/input/input.go | 29 ++++++++++++++++++ eduserver/inthttp/client.go | 14 +++++++++ eduserver/inthttp/server.go | 13 +++++++++ federationapi/routing/send.go | 16 ++++++++++ federationapi/routing/send_test.go | 8 +++++ keyserver/consumers/eduserver.go | 38 ++++++++++++++++++------ 10 files changed, 172 insertions(+), 42 deletions(-) create mode 100644 eduserver/api/types.go diff --git a/eduserver/api/input.go b/eduserver/api/input.go index f8599e1cc..2be5cd3fa 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -75,6 +75,12 @@ type InputReceiptEventRequest struct { // InputReceiptEventResponse is a response to InputReceiptEventRequest type InputReceiptEventResponse struct{} +type InputSigningKeyUpdateRequest struct { + SigningKeyUpdate `json:"signing_keys"` +} + +type InputSigningKeyUpdateResponse struct{} + // EDUServerInputAPI is used to write events to the typing server. type EDUServerInputAPI interface { InputTypingEvent( @@ -94,4 +100,10 @@ type EDUServerInputAPI interface { request *InputReceiptEventRequest, response *InputReceiptEventResponse, ) error + + InputSigningKeyUpdate( + ctx context.Context, + request *InputSigningKeyUpdateRequest, + response *InputSigningKeyUpdateResponse, + ) error } diff --git a/eduserver/api/output.go b/eduserver/api/output.go index 650458a29..c528e71b9 100644 --- a/eduserver/api/output.go +++ b/eduserver/api/output.go @@ -33,14 +33,6 @@ type OutputTypingEvent struct { ExpireTime *time.Time } -// TypingEvent represents a matrix edu event of type 'm.typing'. -type TypingEvent struct { - Type string `json:"type"` - RoomID string `json:"room_id"` - UserID string `json:"user_id"` - Typing bool `json:"typing"` -} - // OutputSendToDeviceEvent is an entry in the send-to-device output kafka log. // This contains the full event content, along with the user ID and device ID // to which it is destined. @@ -50,14 +42,6 @@ type OutputSendToDeviceEvent struct { gomatrixserverlib.SendToDeviceEvent } -type ReceiptEvent struct { - UserID string `json:"user_id"` - RoomID string `json:"room_id"` - EventID string `json:"event_id"` - Type string `json:"type"` - Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` -} - // OutputReceiptEvent is an entry in the receipt output kafka log type OutputReceiptEvent struct { UserID string `json:"user_id"` @@ -67,21 +51,7 @@ type OutputReceiptEvent struct { Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` } -// Helper structs for receipts json creation -type ReceiptMRead struct { - User map[string]ReceiptTS `json:"m.read"` -} - -type ReceiptTS struct { - TS gomatrixserverlib.Timestamp `json:"ts"` -} - -// FederationSender output -type FederationReceiptMRead struct { - User map[string]FederationReceiptData `json:"m.read"` -} - -type FederationReceiptData struct { - Data ReceiptTS `json:"data"` - EventIDs []string `json:"event_ids"` +// OutputSigningKeyUpdate is an entry in the signing key update output kafka log +type OutputSigningKeyUpdate struct { + SigningKeyUpdate `json:"signing_keys"` } diff --git a/eduserver/api/types.go b/eduserver/api/types.go new file mode 100644 index 000000000..e04cff011 --- /dev/null +++ b/eduserver/api/types.go @@ -0,0 +1,47 @@ +package api + +import "github.com/matrix-org/gomatrixserverlib" + +const ( + MSigningKeyUpdate = "m.signing_key_update" + MTyping = "m.typing" + MReceipt = "m.receipt" +) + +type TypingEvent struct { + Type string `json:"type"` + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + Typing bool `json:"typing"` +} + +type ReceiptEvent struct { + UserID string `json:"user_id"` + RoomID string `json:"room_id"` + EventID string `json:"event_id"` + Type string `json:"type"` + Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` +} + +type FederationReceiptMRead struct { + User map[string]FederationReceiptData `json:"m.read"` +} + +type FederationReceiptData struct { + Data ReceiptTS `json:"data"` + EventIDs []string `json:"event_ids"` +} + +type ReceiptMRead struct { + User map[string]ReceiptTS `json:"m.read"` +} + +type ReceiptTS struct { + TS gomatrixserverlib.Timestamp `json:"ts"` +} + +type SigningKeyUpdate struct { + MasterKey gomatrixserverlib.CrossSigningKey `json:"master_key"` + SelfSigningKey gomatrixserverlib.CrossSigningKey `json:"self_signing_key"` + UserID string `json:"user_id"` +} diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 7cc405108..40dfcdebd 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -52,6 +52,7 @@ func NewInternalAPI( OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), + OutputSigningKeyUpdateTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate), ServerName: cfg.Matrix.ServerName, } } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index c54fb9de8..a430776a0 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -39,6 +39,8 @@ type EDUServerInputAPI struct { OutputSendToDeviceEventTopic string // The kafka topic to output new receipt events to OutputReceiptEventTopic string + // The kafka topic to output new signing key changes to + OutputSigningKeyUpdateTopic string // kafka producer Producer sarama.SyncProducer // Internal user query API @@ -77,6 +79,33 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent( return t.sendToDeviceEvent(ise) } +// InputSigningKeyUpdate implements api.EDUServerInputAPI +func (t *EDUServerInputAPI) InputSigningKeyUpdate( + ctx context.Context, + request *api.InputSigningKeyUpdateRequest, + response *api.InputSigningKeyUpdateResponse, +) error { + eventJSON, err := json.Marshal(&api.OutputSigningKeyUpdateEvent{ + SigningKeyUpdate: request.SigningKeyUpdate, + }) + if err != nil { + return err + } + + logrus.WithFields(logrus.Fields{ + "user_id": request.UserID, + }).Infof("Producing to topic '%s'", t.OutputSigningKeyUpdateTopic) + + m := &sarama.ProducerMessage{ + Topic: string(t.OutputSigningKeyUpdateTopic), + Key: sarama.StringEncoder(request.UserID), + Value: sarama.ByteEncoder(eventJSON), + } + + _, _, err = t.Producer.SendMessage(m) + return err +} + func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { ev := &api.TypingEvent{ Type: gomatrixserverlib.MTyping, diff --git a/eduserver/inthttp/client.go b/eduserver/inthttp/client.go index 0690ed827..70870cce6 100644 --- a/eduserver/inthttp/client.go +++ b/eduserver/inthttp/client.go @@ -15,6 +15,7 @@ const ( EDUServerInputTypingEventPath = "/eduserver/input" EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" EDUServerInputReceiptEventPath = "/eduserver/receipt" + EDUServerInputSigningKeyUpdatePath = "/eduserver/signingKeyUpdate" ) // NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. @@ -68,3 +69,16 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent( apiURL := h.eduServerURL + EDUServerInputReceiptEventPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } + +// InputSigningKeyUpdate implements EDUServerInputAPI +func (h *httpEDUServerInputAPI) InputSigningKeyUpdate( + ctx context.Context, + request *api.InputSigningKeyUpdateRequest, + response *api.InputSigningKeyUpdateResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "InputSigningKeyUpdate") + defer span.Finish() + + apiURL := h.eduServerURL + EDUServerInputSigningKeyUpdatePath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/eduserver/inthttp/server.go b/eduserver/inthttp/server.go index a34943750..2f8117280 100644 --- a/eduserver/inthttp/server.go +++ b/eduserver/inthttp/server.go @@ -51,4 +51,17 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(EDUServerInputSigningKeyUpdatePath, + httputil.MakeInternalAPI("inputSigningKeyUpdate", func(req *http.Request) util.JSONResponse { + var request api.InputSigningKeyUpdateRequest + var response api.InputSigningKeyUpdateResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := t.InputSigningKeyUpdate(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 5f214e0fc..02ac2d8f3 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -502,6 +502,22 @@ func (t *txnReq) processEDUs(ctx context.Context) { } } } + case eduserverAPI.MSigningKeyUpdate: + var updatePayload eduserverAPI.SigningKeyUpdate + if err := json.Unmarshal(e.Content, &updatePayload); err != nil { + util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ + "user_id": updatePayload.UserID, + }).Error("Failed to send signing key update to edu server") + continue + } + inputReq := &eduserverAPI.InputSigningKeyUpdateRequest{ + SigningKeyUpdate: updatePayload, + } + inputRes := &eduserverAPI.InputSigningKeyUpdateResponse{} + if err := t.eduAPI.InputSigningKeyUpdate(ctx, inputReq, inputRes); err != nil { + util.GetLogger(ctx).WithError(err).Error("Failed to send signing key update to EDU server") + continue + } default: util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") } diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 5b5af9c4d..a10ac359e 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -84,6 +84,14 @@ func (o *testEDUProducer) InputReceiptEvent( return nil } +func (o *testEDUProducer) InputSigningKeyUpdate( + ctx context.Context, + request *eduAPI.InputSigningKeyUpdateRequest, + response *eduAPI.InputSigningKeyUpdateResponse, +) error { + return nil +} + type testRoomserverAPI struct { api.RoomserverInternalAPITrace inputRoomEvents []api.InputRoomEvent diff --git a/keyserver/consumers/eduserver.go b/keyserver/consumers/eduserver.go index d764950bc..c5eafe860 100644 --- a/keyserver/consumers/eduserver.go +++ b/keyserver/consumers/eduserver.go @@ -1,13 +1,17 @@ package consumers import ( - "fmt" + "context" + "encoding/json" + eduapi "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" "github.com/Shopify/sarama" ) @@ -49,13 +53,29 @@ func (s *OutputSigningKeyUpdateConsumer) Start() error { } func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { - /* - var output eduapi.OutputSigningKeyUpdate - if err := json.Unmarshal(msg.Value, &output); err != nil { - log.WithError(err).Errorf("eduserver output log: message parse failure") - return nil - } + var output eduapi.OutputSigningKeyUpdate + if err := json.Unmarshal(msg.Value, &output); err != nil { + logrus.WithError(err).Errorf("eduserver output log: message parse failure") return nil - */ - return fmt.Errorf("TODO") + } + _, host, err := gomatrixserverlib.SplitID('@', output.UserID) + if err != nil { + logrus.WithError(err).Errorf("eduserver output log: user ID parse failure") + return nil + } + if host == gomatrixserverlib.ServerName(s.serverName) { + // Ignore any messages that contain information about our own users, as + // they already originated from this server. + return nil + } + uploadReq := &api.PerformUploadDeviceKeysRequest{ + CrossSigningKeys: gomatrixserverlib.CrossSigningKeys{ + MasterKey: output.MasterKey, + SelfSigningKey: output.SelfSigningKey, + }, + UserID: output.UserID, + } + uploadRes := &api.PerformUploadDeviceKeysResponse{} + s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes) + return uploadRes.Error }