Initial work on signing key update EDUs

This commit is contained in:
Neil Alexander 2021-08-10 13:21:17 +01:00
parent b1377d991a
commit 46af57c1d9
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
10 changed files with 172 additions and 42 deletions

View file

@ -75,6 +75,12 @@ type InputReceiptEventRequest struct {
// InputReceiptEventResponse is a response to InputReceiptEventRequest
type InputReceiptEventResponse struct{}
type InputSigningKeyUpdateRequest struct {
SigningKeyUpdate `json:"signing_keys"`
}
type InputSigningKeyUpdateResponse struct{}
// EDUServerInputAPI is used to write events to the typing server.
type EDUServerInputAPI interface {
InputTypingEvent(
@ -94,4 +100,10 @@ type EDUServerInputAPI interface {
request *InputReceiptEventRequest,
response *InputReceiptEventResponse,
) error
InputSigningKeyUpdate(
ctx context.Context,
request *InputSigningKeyUpdateRequest,
response *InputSigningKeyUpdateResponse,
) error
}

View file

@ -33,14 +33,6 @@ type OutputTypingEvent struct {
ExpireTime *time.Time
}
// TypingEvent represents a matrix edu event of type 'm.typing'.
type TypingEvent struct {
Type string `json:"type"`
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}
// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log.
// This contains the full event content, along with the user ID and device ID
// to which it is destined.
@ -50,14 +42,6 @@ type OutputSendToDeviceEvent struct {
gomatrixserverlib.SendToDeviceEvent
}
type ReceiptEvent struct {
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
EventID string `json:"event_id"`
Type string `json:"type"`
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
}
// OutputReceiptEvent is an entry in the receipt output kafka log
type OutputReceiptEvent struct {
UserID string `json:"user_id"`
@ -67,21 +51,7 @@ type OutputReceiptEvent struct {
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
}
// Helper structs for receipts json creation
type ReceiptMRead struct {
User map[string]ReceiptTS `json:"m.read"`
}
type ReceiptTS struct {
TS gomatrixserverlib.Timestamp `json:"ts"`
}
// FederationSender output
type FederationReceiptMRead struct {
User map[string]FederationReceiptData `json:"m.read"`
}
type FederationReceiptData struct {
Data ReceiptTS `json:"data"`
EventIDs []string `json:"event_ids"`
// OutputSigningKeyUpdate is an entry in the signing key update output kafka log
type OutputSigningKeyUpdate struct {
SigningKeyUpdate `json:"signing_keys"`
}

47
eduserver/api/types.go Normal file
View file

@ -0,0 +1,47 @@
package api
import "github.com/matrix-org/gomatrixserverlib"
const (
MSigningKeyUpdate = "m.signing_key_update"
MTyping = "m.typing"
MReceipt = "m.receipt"
)
type TypingEvent struct {
Type string `json:"type"`
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}
type ReceiptEvent struct {
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
EventID string `json:"event_id"`
Type string `json:"type"`
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
}
type FederationReceiptMRead struct {
User map[string]FederationReceiptData `json:"m.read"`
}
type FederationReceiptData struct {
Data ReceiptTS `json:"data"`
EventIDs []string `json:"event_ids"`
}
type ReceiptMRead struct {
User map[string]ReceiptTS `json:"m.read"`
}
type ReceiptTS struct {
TS gomatrixserverlib.Timestamp `json:"ts"`
}
type SigningKeyUpdate struct {
MasterKey gomatrixserverlib.CrossSigningKey `json:"master_key"`
SelfSigningKey gomatrixserverlib.CrossSigningKey `json:"self_signing_key"`
UserID string `json:"user_id"`
}

View file

@ -52,6 +52,7 @@ func NewInternalAPI(
OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
OutputSigningKeyUpdateTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate),
ServerName: cfg.Matrix.ServerName,
}
}

View file

@ -39,6 +39,8 @@ type EDUServerInputAPI struct {
OutputSendToDeviceEventTopic string
// The kafka topic to output new receipt events to
OutputReceiptEventTopic string
// The kafka topic to output new signing key changes to
OutputSigningKeyUpdateTopic string
// kafka producer
Producer sarama.SyncProducer
// Internal user query API
@ -77,6 +79,33 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent(
return t.sendToDeviceEvent(ise)
}
// InputSigningKeyUpdate implements api.EDUServerInputAPI
func (t *EDUServerInputAPI) InputSigningKeyUpdate(
ctx context.Context,
request *api.InputSigningKeyUpdateRequest,
response *api.InputSigningKeyUpdateResponse,
) error {
eventJSON, err := json.Marshal(&api.OutputSigningKeyUpdateEvent{
SigningKeyUpdate: request.SigningKeyUpdate,
})
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{
"user_id": request.UserID,
}).Infof("Producing to topic '%s'", t.OutputSigningKeyUpdateTopic)
m := &sarama.ProducerMessage{
Topic: string(t.OutputSigningKeyUpdateTopic),
Key: sarama.StringEncoder(request.UserID),
Value: sarama.ByteEncoder(eventJSON),
}
_, _, err = t.Producer.SendMessage(m)
return err
}
func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping,

View file

@ -15,6 +15,7 @@ const (
EDUServerInputTypingEventPath = "/eduserver/input"
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
EDUServerInputReceiptEventPath = "/eduserver/receipt"
EDUServerInputSigningKeyUpdatePath = "/eduserver/signingKeyUpdate"
)
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
@ -68,3 +69,16 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent(
apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// InputSigningKeyUpdate implements EDUServerInputAPI
func (h *httpEDUServerInputAPI) InputSigningKeyUpdate(
ctx context.Context,
request *api.InputSigningKeyUpdateRequest,
response *api.InputSigningKeyUpdateResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputSigningKeyUpdate")
defer span.Finish()
apiURL := h.eduServerURL + EDUServerInputSigningKeyUpdatePath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -51,4 +51,17 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(EDUServerInputSigningKeyUpdatePath,
httputil.MakeInternalAPI("inputSigningKeyUpdate", func(req *http.Request) util.JSONResponse {
var request api.InputSigningKeyUpdateRequest
var response api.InputSigningKeyUpdateResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := t.InputSigningKeyUpdate(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -502,6 +502,22 @@ func (t *txnReq) processEDUs(ctx context.Context) {
}
}
}
case eduserverAPI.MSigningKeyUpdate:
var updatePayload eduserverAPI.SigningKeyUpdate
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"user_id": updatePayload.UserID,
}).Error("Failed to send signing key update to edu server")
continue
}
inputReq := &eduserverAPI.InputSigningKeyUpdateRequest{
SigningKeyUpdate: updatePayload,
}
inputRes := &eduserverAPI.InputSigningKeyUpdateResponse{}
if err := t.eduAPI.InputSigningKeyUpdate(ctx, inputReq, inputRes); err != nil {
util.GetLogger(ctx).WithError(err).Error("Failed to send signing key update to EDU server")
continue
}
default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
}

View file

@ -84,6 +84,14 @@ func (o *testEDUProducer) InputReceiptEvent(
return nil
}
func (o *testEDUProducer) InputSigningKeyUpdate(
ctx context.Context,
request *eduAPI.InputSigningKeyUpdateRequest,
response *eduAPI.InputSigningKeyUpdateResponse,
) error {
return nil
}
type testRoomserverAPI struct {
api.RoomserverInternalAPITrace
inputRoomEvents []api.InputRoomEvent

View file

@ -1,13 +1,17 @@
package consumers
import (
"fmt"
"context"
"encoding/json"
eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/Shopify/sarama"
)
@ -49,13 +53,29 @@ func (s *OutputSigningKeyUpdateConsumer) Start() error {
}
func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
/*
var output eduapi.OutputSigningKeyUpdate
if err := json.Unmarshal(msg.Value, &output); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failure")
logrus.WithError(err).Errorf("eduserver output log: message parse failure")
return nil
}
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
logrus.WithError(err).Errorf("eduserver output log: user ID parse failure")
return nil
*/
return fmt.Errorf("TODO")
}
if host == gomatrixserverlib.ServerName(s.serverName) {
// Ignore any messages that contain information about our own users, as
// they already originated from this server.
return nil
}
uploadReq := &api.PerformUploadDeviceKeysRequest{
CrossSigningKeys: gomatrixserverlib.CrossSigningKeys{
MasterKey: output.MasterKey,
SelfSigningKey: output.SelfSigningKey,
},
UserID: output.UserID,
}
uploadRes := &api.PerformUploadDeviceKeysResponse{}
s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes)
return uploadRes.Error
}