mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-29 01:33:10 -06:00
Better naming
This commit is contained in:
parent
b5f5ff8ba4
commit
371d71bba8
|
|
@ -75,11 +75,11 @@ type InputReceiptEventRequest struct {
|
||||||
// InputReceiptEventResponse is a response to InputReceiptEventRequest
|
// InputReceiptEventResponse is a response to InputReceiptEventRequest
|
||||||
type InputReceiptEventResponse struct{}
|
type InputReceiptEventResponse struct{}
|
||||||
|
|
||||||
type InputSigningKeyUpdateRequest struct {
|
type InputCrossSigningKeyUpdateRequest struct {
|
||||||
SigningKeyUpdate `json:"signing_keys"`
|
CrossSigningKeyUpdate `json:"signing_keys"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type InputSigningKeyUpdateResponse struct{}
|
type InputCrossSigningKeyUpdateResponse struct{}
|
||||||
|
|
||||||
// EDUServerInputAPI is used to write events to the typing server.
|
// EDUServerInputAPI is used to write events to the typing server.
|
||||||
type EDUServerInputAPI interface {
|
type EDUServerInputAPI interface {
|
||||||
|
|
@ -101,9 +101,9 @@ type EDUServerInputAPI interface {
|
||||||
response *InputReceiptEventResponse,
|
response *InputReceiptEventResponse,
|
||||||
) error
|
) error
|
||||||
|
|
||||||
InputSigningKeyUpdate(
|
InputCrossSigningKeyUpdate(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *InputSigningKeyUpdateRequest,
|
request *InputCrossSigningKeyUpdateRequest,
|
||||||
response *InputSigningKeyUpdateResponse,
|
response *InputCrossSigningKeyUpdateResponse,
|
||||||
) error
|
) error
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -53,5 +53,5 @@ type OutputReceiptEvent struct {
|
||||||
|
|
||||||
// OutputSigningKeyUpdate is an entry in the signing key update output kafka log
|
// OutputSigningKeyUpdate is an entry in the signing key update output kafka log
|
||||||
type OutputSigningKeyUpdate struct {
|
type OutputSigningKeyUpdate struct {
|
||||||
SigningKeyUpdate `json:"signing_keys"`
|
CrossSigningKeyUpdate `json:"signing_keys"`
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ type ReceiptTS struct {
|
||||||
TS gomatrixserverlib.Timestamp `json:"ts"`
|
TS gomatrixserverlib.Timestamp `json:"ts"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type SigningKeyUpdate struct {
|
type CrossSigningKeyUpdate struct {
|
||||||
MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"`
|
MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"`
|
||||||
SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"`
|
SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"`
|
||||||
UserID string `json:"user_id"`
|
UserID string `json:"user_id"`
|
||||||
|
|
|
||||||
|
|
@ -52,7 +52,7 @@ func NewInternalAPI(
|
||||||
OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
|
OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
|
||||||
OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
|
OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
|
||||||
OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
|
OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
|
||||||
OutputSigningKeyUpdateTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate),
|
OutputCrossSigningKeyUpdateTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate),
|
||||||
ServerName: cfg.Matrix.ServerName,
|
ServerName: cfg.Matrix.ServerName,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ type EDUServerInputAPI struct {
|
||||||
// The kafka topic to output new receipt events to
|
// The kafka topic to output new receipt events to
|
||||||
OutputReceiptEventTopic string
|
OutputReceiptEventTopic string
|
||||||
// The kafka topic to output new signing key changes to
|
// The kafka topic to output new signing key changes to
|
||||||
OutputSigningKeyUpdateTopic string
|
OutputCrossSigningKeyUpdateTopic string
|
||||||
// kafka producer
|
// kafka producer
|
||||||
Producer sarama.SyncProducer
|
Producer sarama.SyncProducer
|
||||||
// Internal user query API
|
// Internal user query API
|
||||||
|
|
@ -79,14 +79,14 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent(
|
||||||
return t.sendToDeviceEvent(ise)
|
return t.sendToDeviceEvent(ise)
|
||||||
}
|
}
|
||||||
|
|
||||||
// InputSigningKeyUpdate implements api.EDUServerInputAPI
|
// InputCrossSigningKeyUpdate implements api.EDUServerInputAPI
|
||||||
func (t *EDUServerInputAPI) InputSigningKeyUpdate(
|
func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.InputSigningKeyUpdateRequest,
|
request *api.InputCrossSigningKeyUpdateRequest,
|
||||||
response *api.InputSigningKeyUpdateResponse,
|
response *api.InputCrossSigningKeyUpdateResponse,
|
||||||
) error {
|
) error {
|
||||||
eventJSON, err := json.Marshal(&api.OutputSigningKeyUpdate{
|
eventJSON, err := json.Marshal(&api.OutputSigningKeyUpdate{
|
||||||
SigningKeyUpdate: request.SigningKeyUpdate,
|
CrossSigningKeyUpdate: request.CrossSigningKeyUpdate,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -94,10 +94,10 @@ func (t *EDUServerInputAPI) InputSigningKeyUpdate(
|
||||||
|
|
||||||
logrus.WithFields(logrus.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"user_id": request.UserID,
|
"user_id": request.UserID,
|
||||||
}).Infof("Producing to topic '%s'", t.OutputSigningKeyUpdateTopic)
|
}).Infof("Producing to topic '%s'", t.OutputCrossSigningKeyUpdateTopic)
|
||||||
|
|
||||||
m := &sarama.ProducerMessage{
|
m := &sarama.ProducerMessage{
|
||||||
Topic: string(t.OutputSigningKeyUpdateTopic),
|
Topic: string(t.OutputCrossSigningKeyUpdateTopic),
|
||||||
Key: sarama.StringEncoder(request.UserID),
|
Key: sarama.StringEncoder(request.UserID),
|
||||||
Value: sarama.ByteEncoder(eventJSON),
|
Value: sarama.ByteEncoder(eventJSON),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ const (
|
||||||
EDUServerInputTypingEventPath = "/eduserver/input"
|
EDUServerInputTypingEventPath = "/eduserver/input"
|
||||||
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
|
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
|
||||||
EDUServerInputReceiptEventPath = "/eduserver/receipt"
|
EDUServerInputReceiptEventPath = "/eduserver/receipt"
|
||||||
EDUServerInputSigningKeyUpdatePath = "/eduserver/signingKeyUpdate"
|
EDUServerInputCrossSigningKeyUpdatePath = "/eduserver/crossSigningKeyUpdate"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
|
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
|
||||||
|
|
@ -70,15 +70,15 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent(
|
||||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
// InputSigningKeyUpdate implements EDUServerInputAPI
|
// InputCrossSigningKeyUpdate implements EDUServerInputAPI
|
||||||
func (h *httpEDUServerInputAPI) InputSigningKeyUpdate(
|
func (h *httpEDUServerInputAPI) InputCrossSigningKeyUpdate(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.InputSigningKeyUpdateRequest,
|
request *api.InputCrossSigningKeyUpdateRequest,
|
||||||
response *api.InputSigningKeyUpdateResponse,
|
response *api.InputCrossSigningKeyUpdateResponse,
|
||||||
) error {
|
) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "InputSigningKeyUpdate")
|
span, ctx := opentracing.StartSpanFromContext(ctx, "InputCrossSigningKeyUpdate")
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
apiURL := h.eduServerURL + EDUServerInputSigningKeyUpdatePath
|
apiURL := h.eduServerURL + EDUServerInputCrossSigningKeyUpdatePath
|
||||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -51,14 +51,14 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
internalAPIMux.Handle(EDUServerInputSigningKeyUpdatePath,
|
internalAPIMux.Handle(EDUServerInputCrossSigningKeyUpdatePath,
|
||||||
httputil.MakeInternalAPI("inputSigningKeyUpdate", func(req *http.Request) util.JSONResponse {
|
httputil.MakeInternalAPI("inputCrossSigningKeyUpdate", func(req *http.Request) util.JSONResponse {
|
||||||
var request api.InputSigningKeyUpdateRequest
|
var request api.InputCrossSigningKeyUpdateRequest
|
||||||
var response api.InputSigningKeyUpdateResponse
|
var response api.InputCrossSigningKeyUpdateResponse
|
||||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||||
}
|
}
|
||||||
if err := t.InputSigningKeyUpdate(req.Context(), &request, &response); err != nil {
|
if err := t.InputCrossSigningKeyUpdate(req.Context(), &request, &response); err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
|
|
||||||
|
|
@ -503,18 +503,18 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case eduserverAPI.MSigningKeyUpdate:
|
case eduserverAPI.MSigningKeyUpdate:
|
||||||
var updatePayload eduserverAPI.SigningKeyUpdate
|
var updatePayload eduserverAPI.CrossSigningKeyUpdate
|
||||||
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
|
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
|
||||||
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
|
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
|
||||||
"user_id": updatePayload.UserID,
|
"user_id": updatePayload.UserID,
|
||||||
}).Error("Failed to send signing key update to edu server")
|
}).Error("Failed to send signing key update to edu server")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
inputReq := &eduserverAPI.InputSigningKeyUpdateRequest{
|
inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{
|
||||||
SigningKeyUpdate: updatePayload,
|
CrossSigningKeyUpdate: updatePayload,
|
||||||
}
|
}
|
||||||
inputRes := &eduserverAPI.InputSigningKeyUpdateResponse{}
|
inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{}
|
||||||
if err := t.eduAPI.InputSigningKeyUpdate(ctx, inputReq, inputRes); err != nil {
|
if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil {
|
||||||
util.GetLogger(ctx).WithError(err).Error("Failed to send signing key update to EDU server")
|
util.GetLogger(ctx).WithError(err).Error("Failed to send signing key update to EDU server")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -84,10 +84,10 @@ func (o *testEDUProducer) InputReceiptEvent(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *testEDUProducer) InputSigningKeyUpdate(
|
func (o *testEDUProducer) InputCrossSigningKeyUpdate(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *eduAPI.InputSigningKeyUpdateRequest,
|
request *eduAPI.InputCrossSigningKeyUpdateRequest,
|
||||||
response *eduAPI.InputSigningKeyUpdateResponse,
|
response *eduAPI.InputCrossSigningKeyUpdateResponse,
|
||||||
) error {
|
) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ import (
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type OutputSigningKeyUpdateConsumer struct {
|
type CrossSigningKeyUpdateConsumer struct {
|
||||||
consumer *internal.ContinualConsumer
|
consumer *internal.ContinualConsumer
|
||||||
db storage.Database
|
db storage.Database
|
||||||
queues *queue.OutgoingQueues
|
queues *queue.OutgoingQueues
|
||||||
|
|
@ -40,19 +40,19 @@ type OutputSigningKeyUpdateConsumer struct {
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewOutputSigningKeyUpdateConsumer(
|
func NewCrossSigningKeyUpdateConsumer(
|
||||||
process *process.ProcessContext,
|
process *process.ProcessContext,
|
||||||
cfg *config.KeyServer,
|
cfg *config.KeyServer,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
queues *queue.OutgoingQueues,
|
queues *queue.OutgoingQueues,
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
) *OutputSigningKeyUpdateConsumer {
|
) *CrossSigningKeyUpdateConsumer {
|
||||||
c := &OutputSigningKeyUpdateConsumer{
|
c := &CrossSigningKeyUpdateConsumer{
|
||||||
consumer: &internal.ContinualConsumer{
|
consumer: &internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "federationsender/signingkeys",
|
ComponentName: "federationsender/signingkeys",
|
||||||
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate)),
|
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
},
|
},
|
||||||
|
|
@ -66,14 +66,14 @@ func NewOutputSigningKeyUpdateConsumer(
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *OutputSigningKeyUpdateConsumer) Start() error {
|
func (t *CrossSigningKeyUpdateConsumer) Start() error {
|
||||||
if err := t.consumer.Start(); err != nil {
|
if err := t.consumer.Start(); err != nil {
|
||||||
return fmt.Errorf("t.consumer.Start: %w", err)
|
return fmt.Errorf("t.consumer.Start: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
func (t *CrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
var output eduapi.OutputSigningKeyUpdate
|
var output eduapi.OutputSigningKeyUpdate
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
logrus.WithError(err).Errorf("eduserver output log: message parse failure")
|
logrus.WithError(err).Errorf("eduserver output log: message parse failure")
|
||||||
|
|
@ -112,7 +112,7 @@ func (t *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage)
|
||||||
Type: eduapi.MSigningKeyUpdate,
|
Type: eduapi.MSigningKeyUpdate,
|
||||||
Origin: string(t.serverName),
|
Origin: string(t.serverName),
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(output.SigningKeyUpdate); err != nil {
|
if edu.Content, err = json.Marshal(output.CrossSigningKeyUpdate); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -94,7 +94,7 @@ func NewInternalAPI(
|
||||||
if err := keyConsumer.Start(); err != nil {
|
if err := keyConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start key server consumer")
|
logrus.WithError(err).Panic("failed to start key server consumer")
|
||||||
}
|
}
|
||||||
signingKeyConsumer := consumers.NewOutputSigningKeyUpdateConsumer(
|
signingKeyConsumer := consumers.NewCrossSigningKeyUpdateConsumer(
|
||||||
base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI,
|
base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI,
|
||||||
)
|
)
|
||||||
if err := signingKeyConsumer.Start(); err != nil {
|
if err := signingKeyConsumer.Start(); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -16,28 +16,28 @@ import (
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
)
|
)
|
||||||
|
|
||||||
type OutputSigningKeyUpdateConsumer struct {
|
type OutputCrossSigningKeyUpdateConsumer struct {
|
||||||
eduServerConsumer *internal.ContinualConsumer
|
eduServerConsumer *internal.ContinualConsumer
|
||||||
keyDB storage.Database
|
keyDB storage.Database
|
||||||
keyAPI api.KeyInternalAPI
|
keyAPI api.KeyInternalAPI
|
||||||
serverName string
|
serverName string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewOutputSigningKeyUpdateConsumer(
|
func NewOutputCrossSigningKeyUpdateConsumer(
|
||||||
process *process.ProcessContext,
|
process *process.ProcessContext,
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
keyDB storage.Database,
|
keyDB storage.Database,
|
||||||
keyAPI api.KeyInternalAPI,
|
keyAPI api.KeyInternalAPI,
|
||||||
) *OutputSigningKeyUpdateConsumer {
|
) *OutputCrossSigningKeyUpdateConsumer {
|
||||||
consumer := internal.ContinualConsumer{
|
consumer := internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "keyserver/eduserver",
|
ComponentName: "keyserver/eduserver",
|
||||||
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate),
|
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: keyDB,
|
PartitionStore: keyDB,
|
||||||
}
|
}
|
||||||
s := &OutputSigningKeyUpdateConsumer{
|
s := &OutputCrossSigningKeyUpdateConsumer{
|
||||||
eduServerConsumer: &consumer,
|
eduServerConsumer: &consumer,
|
||||||
keyDB: keyDB,
|
keyDB: keyDB,
|
||||||
keyAPI: keyAPI,
|
keyAPI: keyAPI,
|
||||||
|
|
@ -48,11 +48,11 @@ func NewOutputSigningKeyUpdateConsumer(
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputSigningKeyUpdateConsumer) Start() error {
|
func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
|
||||||
return s.eduServerConsumer.Start()
|
return s.eduServerConsumer.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
func (s *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
var output eduapi.OutputSigningKeyUpdate
|
var output eduapi.OutputSigningKeyUpdate
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
logrus.WithError(err).Errorf("eduserver output log: message parse failure")
|
logrus.WithError(err).Errorf("eduserver output log: message parse failure")
|
||||||
|
|
@ -226,7 +226,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
|
||||||
|
|
||||||
// Finally, generate a notification that we updated the keys.
|
// Finally, generate a notification that we updated the keys.
|
||||||
if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer {
|
if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer {
|
||||||
update := eduserverAPI.SigningKeyUpdate{
|
update := eduserverAPI.CrossSigningKeyUpdate{
|
||||||
UserID: req.UserID,
|
UserID: req.UserID,
|
||||||
}
|
}
|
||||||
if _, ok := toStore[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok {
|
if _, ok := toStore[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok {
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ type KeyInternalAPI struct {
|
||||||
FedClient fedsenderapi.FederationClient
|
FedClient fedsenderapi.FederationClient
|
||||||
UserAPI userapi.UserInternalAPI
|
UserAPI userapi.UserInternalAPI
|
||||||
DeviceKeysProducer *producers.KeyChange
|
DeviceKeysProducer *producers.KeyChange
|
||||||
CrossSigningProducer *producers.SigningKeyUpdate
|
CrossSigningProducer *producers.CrossSigningKeyUpdate
|
||||||
Updater *DeviceListUpdater
|
Updater *DeviceListUpdater
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -51,8 +51,8 @@ func NewInternalAPI(
|
||||||
Producer: producer,
|
Producer: producer,
|
||||||
DB: db,
|
DB: db,
|
||||||
}
|
}
|
||||||
signingKeyUpdateProducer := &producers.SigningKeyUpdate{
|
crossSigningKeyUpdateProducer := &producers.CrossSigningKeyUpdate{
|
||||||
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate)),
|
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)),
|
||||||
Producer: producer,
|
Producer: producer,
|
||||||
}
|
}
|
||||||
ap := &internal.KeyInternalAPI{
|
ap := &internal.KeyInternalAPI{
|
||||||
|
|
@ -60,7 +60,7 @@ func NewInternalAPI(
|
||||||
ThisServer: cfg.Matrix.ServerName,
|
ThisServer: cfg.Matrix.ServerName,
|
||||||
FedClient: fedClient,
|
FedClient: fedClient,
|
||||||
DeviceKeysProducer: keyChangeProducer,
|
DeviceKeysProducer: keyChangeProducer,
|
||||||
CrossSigningProducer: signingKeyUpdateProducer,
|
CrossSigningProducer: crossSigningKeyUpdateProducer,
|
||||||
}
|
}
|
||||||
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
||||||
ap.Updater = updater
|
ap.Updater = updater
|
||||||
|
|
@ -70,7 +70,7 @@ func NewInternalAPI(
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
keyconsumer := consumers.NewOutputSigningKeyUpdateConsumer(
|
keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
|
||||||
base.ProcessContext, base.Cfg, consumer, db, ap,
|
base.ProcessContext, base.Cfg, consumer, db, ap,
|
||||||
)
|
)
|
||||||
if err := keyconsumer.Start(); err != nil {
|
if err := keyconsumer.Start(); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -22,19 +22,19 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SigningKeyUpdate struct {
|
type CrossSigningKeyUpdate struct {
|
||||||
Topic string
|
Topic string
|
||||||
Producer sarama.SyncProducer
|
Producer sarama.SyncProducer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SigningKeyUpdate) DefaultPartition() int32 {
|
func (p *CrossSigningKeyUpdate) DefaultPartition() int32 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SigningKeyUpdate) ProduceSigningKeyUpdate(key api.SigningKeyUpdate) error {
|
func (p *CrossSigningKeyUpdate) ProduceSigningKeyUpdate(key api.CrossSigningKeyUpdate) error {
|
||||||
var m sarama.ProducerMessage
|
var m sarama.ProducerMessage
|
||||||
output := &api.OutputSigningKeyUpdate{
|
output := &api.OutputSigningKeyUpdate{
|
||||||
SigningKeyUpdate: key,
|
CrossSigningKeyUpdate: key,
|
||||||
}
|
}
|
||||||
|
|
||||||
value, err := json.Marshal(output)
|
value, err := json.Marshal(output)
|
||||||
|
|
@ -10,7 +10,7 @@ const (
|
||||||
TopicOutputRoomEvent = "OutputRoomEvent"
|
TopicOutputRoomEvent = "OutputRoomEvent"
|
||||||
TopicOutputClientData = "OutputClientData"
|
TopicOutputClientData = "OutputClientData"
|
||||||
TopicOutputReceiptEvent = "OutputReceiptEvent"
|
TopicOutputReceiptEvent = "OutputReceiptEvent"
|
||||||
TopicOutputSigningKeyUpdate = "OutputSigningKeyUpdate"
|
TopicOutputCrossSigningKeyUpdate = "OutputCrossSigningKeyUpdate"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Kafka struct {
|
type Kafka struct {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue