diff --git a/eduserver/api/input.go b/eduserver/api/input.go index 2be5cd3fa..2fa253f4d 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -75,11 +75,11 @@ type InputReceiptEventRequest struct { // InputReceiptEventResponse is a response to InputReceiptEventRequest type InputReceiptEventResponse struct{} -type InputSigningKeyUpdateRequest struct { - SigningKeyUpdate `json:"signing_keys"` +type InputCrossSigningKeyUpdateRequest struct { + CrossSigningKeyUpdate `json:"signing_keys"` } -type InputSigningKeyUpdateResponse struct{} +type InputCrossSigningKeyUpdateResponse struct{} // EDUServerInputAPI is used to write events to the typing server. type EDUServerInputAPI interface { @@ -101,9 +101,9 @@ type EDUServerInputAPI interface { response *InputReceiptEventResponse, ) error - InputSigningKeyUpdate( + InputCrossSigningKeyUpdate( ctx context.Context, - request *InputSigningKeyUpdateRequest, - response *InputSigningKeyUpdateResponse, + request *InputCrossSigningKeyUpdateRequest, + response *InputCrossSigningKeyUpdateResponse, ) error } diff --git a/eduserver/api/output.go b/eduserver/api/output.go index c528e71b9..ce99fe914 100644 --- a/eduserver/api/output.go +++ b/eduserver/api/output.go @@ -53,5 +53,5 @@ type OutputReceiptEvent struct { // OutputSigningKeyUpdate is an entry in the signing key update output kafka log type OutputSigningKeyUpdate struct { - SigningKeyUpdate `json:"signing_keys"` + CrossSigningKeyUpdate `json:"signing_keys"` } diff --git a/eduserver/api/types.go b/eduserver/api/types.go index 0332b7e53..6e2209bc4 100644 --- a/eduserver/api/types.go +++ b/eduserver/api/types.go @@ -40,7 +40,7 @@ type ReceiptTS struct { TS gomatrixserverlib.Timestamp `json:"ts"` } -type SigningKeyUpdate struct { +type CrossSigningKeyUpdate struct { MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"` SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"` UserID string `json:"user_id"` diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 40dfcdebd..f9a1e6053 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -46,13 +46,13 @@ func NewInternalAPI( _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) return &input.EDUServerInputAPI{ - Cache: eduCache, - UserAPI: userAPI, - Producer: producer, - OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), - OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), - OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), - OutputSigningKeyUpdateTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate), - ServerName: cfg.Matrix.ServerName, + Cache: eduCache, + UserAPI: userAPI, + Producer: producer, + OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), + OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), + OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), + OutputCrossSigningKeyUpdateTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate), + ServerName: cfg.Matrix.ServerName, } } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index bce69ec2b..86faabd7f 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -40,7 +40,7 @@ type EDUServerInputAPI struct { // The kafka topic to output new receipt events to OutputReceiptEventTopic string // The kafka topic to output new signing key changes to - OutputSigningKeyUpdateTopic string + OutputCrossSigningKeyUpdateTopic string // kafka producer Producer sarama.SyncProducer // Internal user query API @@ -79,14 +79,14 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent( return t.sendToDeviceEvent(ise) } -// InputSigningKeyUpdate implements api.EDUServerInputAPI -func (t *EDUServerInputAPI) InputSigningKeyUpdate( +// InputCrossSigningKeyUpdate implements api.EDUServerInputAPI +func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate( ctx context.Context, - request *api.InputSigningKeyUpdateRequest, - response *api.InputSigningKeyUpdateResponse, + request *api.InputCrossSigningKeyUpdateRequest, + response *api.InputCrossSigningKeyUpdateResponse, ) error { eventJSON, err := json.Marshal(&api.OutputSigningKeyUpdate{ - SigningKeyUpdate: request.SigningKeyUpdate, + CrossSigningKeyUpdate: request.CrossSigningKeyUpdate, }) if err != nil { return err @@ -94,10 +94,10 @@ func (t *EDUServerInputAPI) InputSigningKeyUpdate( logrus.WithFields(logrus.Fields{ "user_id": request.UserID, - }).Infof("Producing to topic '%s'", t.OutputSigningKeyUpdateTopic) + }).Infof("Producing to topic '%s'", t.OutputCrossSigningKeyUpdateTopic) m := &sarama.ProducerMessage{ - Topic: string(t.OutputSigningKeyUpdateTopic), + Topic: string(t.OutputCrossSigningKeyUpdateTopic), Key: sarama.StringEncoder(request.UserID), Value: sarama.ByteEncoder(eventJSON), } diff --git a/eduserver/inthttp/client.go b/eduserver/inthttp/client.go index 70870cce6..9a6f483c2 100644 --- a/eduserver/inthttp/client.go +++ b/eduserver/inthttp/client.go @@ -12,10 +12,10 @@ import ( // HTTP paths for the internal HTTP APIs const ( - EDUServerInputTypingEventPath = "/eduserver/input" - EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" - EDUServerInputReceiptEventPath = "/eduserver/receipt" - EDUServerInputSigningKeyUpdatePath = "/eduserver/signingKeyUpdate" + EDUServerInputTypingEventPath = "/eduserver/input" + EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" + EDUServerInputReceiptEventPath = "/eduserver/receipt" + EDUServerInputCrossSigningKeyUpdatePath = "/eduserver/crossSigningKeyUpdate" ) // NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. @@ -70,15 +70,15 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent( return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } -// InputSigningKeyUpdate implements EDUServerInputAPI -func (h *httpEDUServerInputAPI) InputSigningKeyUpdate( +// InputCrossSigningKeyUpdate implements EDUServerInputAPI +func (h *httpEDUServerInputAPI) InputCrossSigningKeyUpdate( ctx context.Context, - request *api.InputSigningKeyUpdateRequest, - response *api.InputSigningKeyUpdateResponse, + request *api.InputCrossSigningKeyUpdateRequest, + response *api.InputCrossSigningKeyUpdateResponse, ) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "InputSigningKeyUpdate") + span, ctx := opentracing.StartSpanFromContext(ctx, "InputCrossSigningKeyUpdate") defer span.Finish() - apiURL := h.eduServerURL + EDUServerInputSigningKeyUpdatePath + apiURL := h.eduServerURL + EDUServerInputCrossSigningKeyUpdatePath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } diff --git a/eduserver/inthttp/server.go b/eduserver/inthttp/server.go index 2f8117280..a50ca84f9 100644 --- a/eduserver/inthttp/server.go +++ b/eduserver/inthttp/server.go @@ -51,14 +51,14 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) - internalAPIMux.Handle(EDUServerInputSigningKeyUpdatePath, - httputil.MakeInternalAPI("inputSigningKeyUpdate", func(req *http.Request) util.JSONResponse { - var request api.InputSigningKeyUpdateRequest - var response api.InputSigningKeyUpdateResponse + internalAPIMux.Handle(EDUServerInputCrossSigningKeyUpdatePath, + httputil.MakeInternalAPI("inputCrossSigningKeyUpdate", func(req *http.Request) util.JSONResponse { + var request api.InputCrossSigningKeyUpdateRequest + var response api.InputCrossSigningKeyUpdateResponse if err := json.NewDecoder(req.Body).Decode(&request); err != nil { return util.MessageResponse(http.StatusBadRequest, err.Error()) } - if err := t.InputSigningKeyUpdate(req.Context(), &request, &response); err != nil { + if err := t.InputCrossSigningKeyUpdate(req.Context(), &request, &response); err != nil { return util.ErrorResponse(err) } return util.JSONResponse{Code: http.StatusOK, JSON: &response} diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 02ac2d8f3..7e93d1099 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -503,18 +503,18 @@ func (t *txnReq) processEDUs(ctx context.Context) { } } case eduserverAPI.MSigningKeyUpdate: - var updatePayload eduserverAPI.SigningKeyUpdate + var updatePayload eduserverAPI.CrossSigningKeyUpdate if err := json.Unmarshal(e.Content, &updatePayload); err != nil { util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ "user_id": updatePayload.UserID, }).Error("Failed to send signing key update to edu server") continue } - inputReq := &eduserverAPI.InputSigningKeyUpdateRequest{ - SigningKeyUpdate: updatePayload, + inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{ + CrossSigningKeyUpdate: updatePayload, } - inputRes := &eduserverAPI.InputSigningKeyUpdateResponse{} - if err := t.eduAPI.InputSigningKeyUpdate(ctx, inputReq, inputRes); err != nil { + inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{} + if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil { util.GetLogger(ctx).WithError(err).Error("Failed to send signing key update to EDU server") continue } diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index a10ac359e..702884613 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -84,10 +84,10 @@ func (o *testEDUProducer) InputReceiptEvent( return nil } -func (o *testEDUProducer) InputSigningKeyUpdate( +func (o *testEDUProducer) InputCrossSigningKeyUpdate( ctx context.Context, - request *eduAPI.InputSigningKeyUpdateRequest, - response *eduAPI.InputSigningKeyUpdateResponse, + request *eduAPI.InputCrossSigningKeyUpdateRequest, + response *eduAPI.InputCrossSigningKeyUpdateResponse, ) error { return nil } diff --git a/federationsender/consumers/signingkeys.go b/federationsender/consumers/cross_signing.go similarity index 89% rename from federationsender/consumers/signingkeys.go rename to federationsender/consumers/cross_signing.go index cf54da8a4..f9b3c143a 100644 --- a/federationsender/consumers/signingkeys.go +++ b/federationsender/consumers/cross_signing.go @@ -32,7 +32,7 @@ import ( log "github.com/sirupsen/logrus" ) -type OutputSigningKeyUpdateConsumer struct { +type CrossSigningKeyUpdateConsumer struct { consumer *internal.ContinualConsumer db storage.Database queues *queue.OutgoingQueues @@ -40,19 +40,19 @@ type OutputSigningKeyUpdateConsumer struct { rsAPI roomserverAPI.RoomserverInternalAPI } -func NewOutputSigningKeyUpdateConsumer( +func NewCrossSigningKeyUpdateConsumer( process *process.ProcessContext, cfg *config.KeyServer, kafkaConsumer sarama.Consumer, queues *queue.OutgoingQueues, store storage.Database, rsAPI roomserverAPI.RoomserverInternalAPI, -) *OutputSigningKeyUpdateConsumer { - c := &OutputSigningKeyUpdateConsumer{ +) *CrossSigningKeyUpdateConsumer { + c := &CrossSigningKeyUpdateConsumer{ consumer: &internal.ContinualConsumer{ Process: process, ComponentName: "federationsender/signingkeys", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate)), + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)), Consumer: kafkaConsumer, PartitionStore: store, }, @@ -66,14 +66,14 @@ func NewOutputSigningKeyUpdateConsumer( return c } -func (t *OutputSigningKeyUpdateConsumer) Start() error { +func (t *CrossSigningKeyUpdateConsumer) Start() error { if err := t.consumer.Start(); err != nil { return fmt.Errorf("t.consumer.Start: %w", err) } return nil } -func (t *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { +func (t *CrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { var output eduapi.OutputSigningKeyUpdate if err := json.Unmarshal(msg.Value, &output); err != nil { logrus.WithError(err).Errorf("eduserver output log: message parse failure") @@ -112,7 +112,7 @@ func (t *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) Type: eduapi.MSigningKeyUpdate, Origin: string(t.serverName), } - if edu.Content, err = json.Marshal(output.SigningKeyUpdate); err != nil { + if edu.Content, err = json.Marshal(output.CrossSigningKeyUpdate); err != nil { return err } diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 886b6557c..eb8be81c5 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -94,7 +94,7 @@ func NewInternalAPI( if err := keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") } - signingKeyConsumer := consumers.NewOutputSigningKeyUpdateConsumer( + signingKeyConsumer := consumers.NewCrossSigningKeyUpdateConsumer( base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI, ) if err := signingKeyConsumer.Start(); err != nil { diff --git a/keyserver/consumers/eduserver.go b/keyserver/consumers/cross_signing.go similarity index 83% rename from keyserver/consumers/eduserver.go rename to keyserver/consumers/cross_signing.go index 7a3886e73..38a801be0 100644 --- a/keyserver/consumers/eduserver.go +++ b/keyserver/consumers/cross_signing.go @@ -16,28 +16,28 @@ import ( "github.com/Shopify/sarama" ) -type OutputSigningKeyUpdateConsumer struct { +type OutputCrossSigningKeyUpdateConsumer struct { eduServerConsumer *internal.ContinualConsumer keyDB storage.Database keyAPI api.KeyInternalAPI serverName string } -func NewOutputSigningKeyUpdateConsumer( +func NewOutputCrossSigningKeyUpdateConsumer( process *process.ProcessContext, cfg *config.Dendrite, kafkaConsumer sarama.Consumer, keyDB storage.Database, keyAPI api.KeyInternalAPI, -) *OutputSigningKeyUpdateConsumer { +) *OutputCrossSigningKeyUpdateConsumer { consumer := internal.ContinualConsumer{ Process: process, ComponentName: "keyserver/eduserver", - Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate), + Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate), Consumer: kafkaConsumer, PartitionStore: keyDB, } - s := &OutputSigningKeyUpdateConsumer{ + s := &OutputCrossSigningKeyUpdateConsumer{ eduServerConsumer: &consumer, keyDB: keyDB, keyAPI: keyAPI, @@ -48,11 +48,11 @@ func NewOutputSigningKeyUpdateConsumer( return s } -func (s *OutputSigningKeyUpdateConsumer) Start() error { +func (s *OutputCrossSigningKeyUpdateConsumer) Start() error { return s.eduServerConsumer.Start() } -func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { +func (s *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { var output eduapi.OutputSigningKeyUpdate if err := json.Unmarshal(msg.Value, &output); err != nil { logrus.WithError(err).Errorf("eduserver output log: message parse failure") diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go index 9a8592040..77d9da21b 100644 --- a/keyserver/internal/cross_signing.go +++ b/keyserver/internal/cross_signing.go @@ -226,7 +226,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P // Finally, generate a notification that we updated the keys. if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer { - update := eduserverAPI.SigningKeyUpdate{ + update := eduserverAPI.CrossSigningKeyUpdate{ UserID: req.UserID, } if _, ok := toStore[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index ba5c74fb0..30932726b 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -40,7 +40,7 @@ type KeyInternalAPI struct { FedClient fedsenderapi.FederationClient UserAPI userapi.UserInternalAPI DeviceKeysProducer *producers.KeyChange - CrossSigningProducer *producers.SigningKeyUpdate + CrossSigningProducer *producers.CrossSigningKeyUpdate Updater *DeviceListUpdater } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 5aab15197..e17c6fa31 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -51,8 +51,8 @@ func NewInternalAPI( Producer: producer, DB: db, } - signingKeyUpdateProducer := &producers.SigningKeyUpdate{ - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate)), + crossSigningKeyUpdateProducer := &producers.CrossSigningKeyUpdate{ + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)), Producer: producer, } ap := &internal.KeyInternalAPI{ @@ -60,7 +60,7 @@ func NewInternalAPI( ThisServer: cfg.Matrix.ServerName, FedClient: fedClient, DeviceKeysProducer: keyChangeProducer, - CrossSigningProducer: signingKeyUpdateProducer, + CrossSigningProducer: crossSigningKeyUpdateProducer, } updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable ap.Updater = updater @@ -70,7 +70,7 @@ func NewInternalAPI( } }() - keyconsumer := consumers.NewOutputSigningKeyUpdateConsumer( + keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer( base.ProcessContext, base.Cfg, consumer, db, ap, ) if err := keyconsumer.Start(); err != nil { diff --git a/keyserver/producers/signingupdate.go b/keyserver/producers/cross_signing.go similarity index 85% rename from keyserver/producers/signingupdate.go rename to keyserver/producers/cross_signing.go index 450ec50d6..3142c8a98 100644 --- a/keyserver/producers/signingupdate.go +++ b/keyserver/producers/cross_signing.go @@ -22,19 +22,19 @@ import ( "github.com/sirupsen/logrus" ) -type SigningKeyUpdate struct { +type CrossSigningKeyUpdate struct { Topic string Producer sarama.SyncProducer } -func (p *SigningKeyUpdate) DefaultPartition() int32 { +func (p *CrossSigningKeyUpdate) DefaultPartition() int32 { return 0 } -func (p *SigningKeyUpdate) ProduceSigningKeyUpdate(key api.SigningKeyUpdate) error { +func (p *CrossSigningKeyUpdate) ProduceSigningKeyUpdate(key api.CrossSigningKeyUpdate) error { var m sarama.ProducerMessage output := &api.OutputSigningKeyUpdate{ - SigningKeyUpdate: key, + CrossSigningKeyUpdate: key, } value, err := json.Marshal(output) diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go index 15b3ad713..d25ab33e6 100644 --- a/setup/config/config_kafka.go +++ b/setup/config/config_kafka.go @@ -4,13 +4,13 @@ import "fmt" // Defined Kafka topics. const ( - TopicOutputTypingEvent = "OutputTypingEvent" - TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent" - TopicOutputKeyChangeEvent = "OutputKeyChangeEvent" - TopicOutputRoomEvent = "OutputRoomEvent" - TopicOutputClientData = "OutputClientData" - TopicOutputReceiptEvent = "OutputReceiptEvent" - TopicOutputSigningKeyUpdate = "OutputSigningKeyUpdate" + TopicOutputTypingEvent = "OutputTypingEvent" + TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent" + TopicOutputKeyChangeEvent = "OutputKeyChangeEvent" + TopicOutputRoomEvent = "OutputRoomEvent" + TopicOutputClientData = "OutputClientData" + TopicOutputReceiptEvent = "OutputReceiptEvent" + TopicOutputCrossSigningKeyUpdate = "OutputCrossSigningKeyUpdate" ) type Kafka struct {