mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-29 01:33:10 -06:00
Merge branch 'master' into master
This commit is contained in:
commit
f129037433
|
|
@ -51,6 +51,10 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
|
||||||
if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) {
|
if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) {
|
||||||
// The full path to the rooms API, includes hs token
|
// The full path to the rooms API, includes hs token
|
||||||
URL, err := url.Parse(appservice.URL + roomAliasExistsPath)
|
URL, err := url.Parse(appservice.URL + roomAliasExistsPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
URL.Path += request.Alias
|
URL.Path += request.Alias
|
||||||
apiURL := URL.String() + "?access_token=" + appservice.HSToken
|
apiURL := URL.String() + "?access_token=" + appservice.HSToken
|
||||||
|
|
||||||
|
|
@ -114,6 +118,9 @@ func (a *AppServiceQueryAPI) UserIDExists(
|
||||||
if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) {
|
if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) {
|
||||||
// The full path to the rooms API, includes hs token
|
// The full path to the rooms API, includes hs token
|
||||||
URL, err := url.Parse(appservice.URL + userIDExistsPath)
|
URL, err := url.Parse(appservice.URL + userIDExistsPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
URL.Path += request.UserID
|
URL.Path += request.UserID
|
||||||
apiURL := URL.String() + "?access_token=" + appservice.HSToken
|
apiURL := URL.String() + "?access_token=" + appservice.HSToken
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ echo "Installing golangci-lint..."
|
||||||
|
|
||||||
# Make a backup of go.{mod,sum} first
|
# Make a backup of go.{mod,sum} first
|
||||||
cp go.mod go.mod.bak && cp go.sum go.sum.bak
|
cp go.mod go.mod.bak && cp go.sum go.sum.bak
|
||||||
go get github.com/golangci/golangci-lint/cmd/golangci-lint@v1.19.1
|
go get github.com/golangci/golangci-lint/cmd/golangci-lint@v1.41.1
|
||||||
|
|
||||||
# Run linting
|
# Run linting
|
||||||
echo "Looking for lint..."
|
echo "Looking for lint..."
|
||||||
|
|
|
||||||
|
|
@ -137,6 +137,12 @@ func InvalidSignature(msg string) *MatrixError {
|
||||||
return &MatrixError{"M_INVALID_SIGNATURE", msg}
|
return &MatrixError{"M_INVALID_SIGNATURE", msg}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InvalidParam is an error that is returned when a parameter was invalid,
|
||||||
|
// traditionally with cross-signing.
|
||||||
|
func InvalidParam(msg string) *MatrixError {
|
||||||
|
return &MatrixError{"M_INVALID_PARAM", msg}
|
||||||
|
}
|
||||||
|
|
||||||
// MissingParam is an error that is returned when a parameter was incorrect,
|
// MissingParam is an error that is returned when a parameter was incorrect,
|
||||||
// traditionally with cross-signing.
|
// traditionally with cross-signing.
|
||||||
func MissingParam(msg string) *MatrixError {
|
func MissingParam(msg string) *MatrixError {
|
||||||
|
|
|
||||||
|
|
@ -73,6 +73,11 @@ func UploadCrossSigningDeviceKeys(
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
JSON: jsonerror.MissingParam(err.Error()),
|
JSON: jsonerror.MissingParam(err.Error()),
|
||||||
}
|
}
|
||||||
|
case err.IsInvalidParam:
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.InvalidParam(err.Error()),
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
|
|
@ -110,6 +115,11 @@ func UploadCrossSigningDeviceSignatures(req *http.Request, keyserverAPI api.KeyI
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
JSON: jsonerror.MissingParam(err.Error()),
|
JSON: jsonerror.MissingParam(err.Error()),
|
||||||
}
|
}
|
||||||
|
case err.IsInvalidParam:
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.InvalidParam(err.Error()),
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ func Setup(
|
||||||
userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg)
|
userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg)
|
||||||
|
|
||||||
unstableFeatures := map[string]bool{
|
unstableFeatures := map[string]bool{
|
||||||
//"org.matrix.e2e_cross_signing": true,
|
"org.matrix.e2e_cross_signing": true,
|
||||||
}
|
}
|
||||||
for _, msc := range cfg.MSCs.MSCs {
|
for _, msc := range cfg.MSCs.MSCs {
|
||||||
unstableFeatures["org.matrix."+msc] = true
|
unstableFeatures["org.matrix."+msc] = true
|
||||||
|
|
@ -291,10 +291,7 @@ func Setup(
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
// If there's a trailing slash, remove it
|
// If there's a trailing slash, remove it
|
||||||
eventType := vars["type"]
|
eventType := strings.TrimSuffix(vars["type"], "/")
|
||||||
if strings.HasSuffix(eventType, "/") {
|
|
||||||
eventType = eventType[:len(eventType)-1]
|
|
||||||
}
|
|
||||||
eventFormat := req.URL.Query().Get("format") == "event"
|
eventFormat := req.URL.Query().Get("format") == "event"
|
||||||
return OnIncomingStateTypeRequest(req.Context(), device, rsAPI, vars["roomID"], eventType, "", eventFormat)
|
return OnIncomingStateTypeRequest(req.Context(), device, rsAPI, vars["roomID"], eventType, "", eventFormat)
|
||||||
})).Methods(http.MethodGet, http.MethodOptions)
|
})).Methods(http.MethodGet, http.MethodOptions)
|
||||||
|
|
@ -315,11 +312,7 @@ func Setup(
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
emptyString := ""
|
emptyString := ""
|
||||||
eventType := vars["eventType"]
|
eventType := strings.TrimSuffix(vars["eventType"], "/")
|
||||||
// If there's a trailing slash, remove it
|
|
||||||
if strings.HasSuffix(eventType, "/") {
|
|
||||||
eventType = eventType[:len(eventType)-1]
|
|
||||||
}
|
|
||||||
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil)
|
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPut, http.MethodOptions)
|
).Methods(http.MethodPut, http.MethodOptions)
|
||||||
|
|
|
||||||
|
|
@ -75,6 +75,12 @@ type InputReceiptEventRequest struct {
|
||||||
// InputReceiptEventResponse is a response to InputReceiptEventRequest
|
// InputReceiptEventResponse is a response to InputReceiptEventRequest
|
||||||
type InputReceiptEventResponse struct{}
|
type InputReceiptEventResponse struct{}
|
||||||
|
|
||||||
|
type InputCrossSigningKeyUpdateRequest struct {
|
||||||
|
CrossSigningKeyUpdate `json:"signing_keys"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type InputCrossSigningKeyUpdateResponse struct{}
|
||||||
|
|
||||||
// EDUServerInputAPI is used to write events to the typing server.
|
// EDUServerInputAPI is used to write events to the typing server.
|
||||||
type EDUServerInputAPI interface {
|
type EDUServerInputAPI interface {
|
||||||
InputTypingEvent(
|
InputTypingEvent(
|
||||||
|
|
@ -94,4 +100,10 @@ type EDUServerInputAPI interface {
|
||||||
request *InputReceiptEventRequest,
|
request *InputReceiptEventRequest,
|
||||||
response *InputReceiptEventResponse,
|
response *InputReceiptEventResponse,
|
||||||
) error
|
) error
|
||||||
|
|
||||||
|
InputCrossSigningKeyUpdate(
|
||||||
|
ctx context.Context,
|
||||||
|
request *InputCrossSigningKeyUpdateRequest,
|
||||||
|
response *InputCrossSigningKeyUpdateResponse,
|
||||||
|
) error
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,14 +33,6 @@ type OutputTypingEvent struct {
|
||||||
ExpireTime *time.Time
|
ExpireTime *time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// TypingEvent represents a matrix edu event of type 'm.typing'.
|
|
||||||
type TypingEvent struct {
|
|
||||||
Type string `json:"type"`
|
|
||||||
RoomID string `json:"room_id"`
|
|
||||||
UserID string `json:"user_id"`
|
|
||||||
Typing bool `json:"typing"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log.
|
// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log.
|
||||||
// This contains the full event content, along with the user ID and device ID
|
// This contains the full event content, along with the user ID and device ID
|
||||||
// to which it is destined.
|
// to which it is destined.
|
||||||
|
|
@ -50,14 +42,6 @@ type OutputSendToDeviceEvent struct {
|
||||||
gomatrixserverlib.SendToDeviceEvent
|
gomatrixserverlib.SendToDeviceEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
type ReceiptEvent struct {
|
|
||||||
UserID string `json:"user_id"`
|
|
||||||
RoomID string `json:"room_id"`
|
|
||||||
EventID string `json:"event_id"`
|
|
||||||
Type string `json:"type"`
|
|
||||||
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// OutputReceiptEvent is an entry in the receipt output kafka log
|
// OutputReceiptEvent is an entry in the receipt output kafka log
|
||||||
type OutputReceiptEvent struct {
|
type OutputReceiptEvent struct {
|
||||||
UserID string `json:"user_id"`
|
UserID string `json:"user_id"`
|
||||||
|
|
@ -67,21 +51,7 @@ type OutputReceiptEvent struct {
|
||||||
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
|
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper structs for receipts json creation
|
// OutputCrossSigningKeyUpdate is an entry in the signing key update output kafka log
|
||||||
type ReceiptMRead struct {
|
type OutputCrossSigningKeyUpdate struct {
|
||||||
User map[string]ReceiptTS `json:"m.read"`
|
CrossSigningKeyUpdate `json:"signing_keys"`
|
||||||
}
|
|
||||||
|
|
||||||
type ReceiptTS struct {
|
|
||||||
TS gomatrixserverlib.Timestamp `json:"ts"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// FederationSender output
|
|
||||||
type FederationReceiptMRead struct {
|
|
||||||
User map[string]FederationReceiptData `json:"m.read"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type FederationReceiptData struct {
|
|
||||||
Data ReceiptTS `json:"data"`
|
|
||||||
EventIDs []string `json:"event_ids"`
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
59
eduserver/api/types.go
Normal file
59
eduserver/api/types.go
Normal file
|
|
@ -0,0 +1,59 @@
|
||||||
|
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package api
|
||||||
|
|
||||||
|
import "github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
|
const (
|
||||||
|
MSigningKeyUpdate = "m.signing_key_update"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TypingEvent struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
RoomID string `json:"room_id"`
|
||||||
|
UserID string `json:"user_id"`
|
||||||
|
Typing bool `json:"typing"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReceiptEvent struct {
|
||||||
|
UserID string `json:"user_id"`
|
||||||
|
RoomID string `json:"room_id"`
|
||||||
|
EventID string `json:"event_id"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type FederationReceiptMRead struct {
|
||||||
|
User map[string]FederationReceiptData `json:"m.read"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type FederationReceiptData struct {
|
||||||
|
Data ReceiptTS `json:"data"`
|
||||||
|
EventIDs []string `json:"event_ids"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReceiptMRead struct {
|
||||||
|
User map[string]ReceiptTS `json:"m.read"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReceiptTS struct {
|
||||||
|
TS gomatrixserverlib.Timestamp `json:"ts"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CrossSigningKeyUpdate struct {
|
||||||
|
MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"`
|
||||||
|
SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"`
|
||||||
|
UserID string `json:"user_id"`
|
||||||
|
}
|
||||||
|
|
@ -52,6 +52,7 @@ func NewInternalAPI(
|
||||||
OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
|
OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
|
||||||
OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
|
OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
|
||||||
OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
|
OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
|
||||||
|
OutputKeyChangeEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent),
|
||||||
ServerName: cfg.Matrix.ServerName,
|
ServerName: cfg.Matrix.ServerName,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
"github.com/matrix-org/dendrite/eduserver/api"
|
"github.com/matrix-org/dendrite/eduserver/api"
|
||||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||||
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
@ -39,6 +40,8 @@ type EDUServerInputAPI struct {
|
||||||
OutputSendToDeviceEventTopic string
|
OutputSendToDeviceEventTopic string
|
||||||
// The kafka topic to output new receipt events to
|
// The kafka topic to output new receipt events to
|
||||||
OutputReceiptEventTopic string
|
OutputReceiptEventTopic string
|
||||||
|
// The kafka topic to output new key change events to
|
||||||
|
OutputKeyChangeEventTopic string
|
||||||
// kafka producer
|
// kafka producer
|
||||||
Producer sarama.SyncProducer
|
Producer sarama.SyncProducer
|
||||||
// Internal user query API
|
// Internal user query API
|
||||||
|
|
@ -77,6 +80,36 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent(
|
||||||
return t.sendToDeviceEvent(ise)
|
return t.sendToDeviceEvent(ise)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InputCrossSigningKeyUpdate implements api.EDUServerInputAPI
|
||||||
|
func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate(
|
||||||
|
ctx context.Context,
|
||||||
|
request *api.InputCrossSigningKeyUpdateRequest,
|
||||||
|
response *api.InputCrossSigningKeyUpdateResponse,
|
||||||
|
) error {
|
||||||
|
eventJSON, err := json.Marshal(&keyapi.DeviceMessage{
|
||||||
|
Type: keyapi.TypeCrossSigningUpdate,
|
||||||
|
OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{
|
||||||
|
CrossSigningKeyUpdate: request.CrossSigningKeyUpdate,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"user_id": request.UserID,
|
||||||
|
}).Infof("Producing to topic '%s'", t.OutputKeyChangeEventTopic)
|
||||||
|
|
||||||
|
m := &sarama.ProducerMessage{
|
||||||
|
Topic: string(t.OutputKeyChangeEventTopic),
|
||||||
|
Key: sarama.StringEncoder(request.UserID),
|
||||||
|
Value: sarama.ByteEncoder(eventJSON),
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, err = t.Producer.SendMessage(m)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
|
func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
|
||||||
ev := &api.TypingEvent{
|
ev := &api.TypingEvent{
|
||||||
Type: gomatrixserverlib.MTyping,
|
Type: gomatrixserverlib.MTyping,
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ const (
|
||||||
EDUServerInputTypingEventPath = "/eduserver/input"
|
EDUServerInputTypingEventPath = "/eduserver/input"
|
||||||
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
|
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
|
||||||
EDUServerInputReceiptEventPath = "/eduserver/receipt"
|
EDUServerInputReceiptEventPath = "/eduserver/receipt"
|
||||||
|
EDUServerInputCrossSigningKeyUpdatePath = "/eduserver/crossSigningKeyUpdate"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
|
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
|
||||||
|
|
@ -68,3 +69,16 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent(
|
||||||
apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
|
apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
|
||||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InputCrossSigningKeyUpdate implements EDUServerInputAPI
|
||||||
|
func (h *httpEDUServerInputAPI) InputCrossSigningKeyUpdate(
|
||||||
|
ctx context.Context,
|
||||||
|
request *api.InputCrossSigningKeyUpdateRequest,
|
||||||
|
response *api.InputCrossSigningKeyUpdateResponse,
|
||||||
|
) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "InputCrossSigningKeyUpdate")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.eduServerURL + EDUServerInputCrossSigningKeyUpdatePath
|
||||||
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -51,4 +51,17 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
internalAPIMux.Handle(EDUServerInputCrossSigningKeyUpdatePath,
|
||||||
|
httputil.MakeInternalAPI("inputCrossSigningKeyUpdate", func(req *http.Request) util.JSONResponse {
|
||||||
|
var request api.InputCrossSigningKeyUpdateRequest
|
||||||
|
var response api.InputCrossSigningKeyUpdateResponse
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||||
|
}
|
||||||
|
if err := t.InputCrossSigningKeyUpdate(req.Context(), &request, &response); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
}),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -502,6 +502,22 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case eduserverAPI.MSigningKeyUpdate:
|
||||||
|
var updatePayload eduserverAPI.CrossSigningKeyUpdate
|
||||||
|
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
|
||||||
|
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
|
||||||
|
"user_id": updatePayload.UserID,
|
||||||
|
}).Error("Failed to send signing key update to edu server")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{
|
||||||
|
CrossSigningKeyUpdate: updatePayload,
|
||||||
|
}
|
||||||
|
inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{}
|
||||||
|
if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil {
|
||||||
|
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal cross-signing update")
|
||||||
|
continue
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
|
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -84,6 +84,14 @@ func (o *testEDUProducer) InputReceiptEvent(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *testEDUProducer) InputCrossSigningKeyUpdate(
|
||||||
|
ctx context.Context,
|
||||||
|
request *eduAPI.InputCrossSigningKeyUpdateRequest,
|
||||||
|
response *eduAPI.InputCrossSigningKeyUpdateResponse,
|
||||||
|
) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type testRoomserverAPI struct {
|
type testRoomserverAPI struct {
|
||||||
api.RoomserverInternalAPITrace
|
api.RoomserverInternalAPITrace
|
||||||
inputRoomEvents []api.InputRoomEvent
|
inputRoomEvents []api.InputRoomEvent
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
|
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||||
"github.com/matrix-org/dendrite/federationsender/queue"
|
"github.com/matrix-org/dendrite/federationsender/queue"
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
|
@ -28,6 +29,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -83,6 +85,17 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
log.WithError(err).Errorf("failed to read device message from key change topic")
|
log.WithError(err).Errorf("failed to read device message from key change topic")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
switch m.Type {
|
||||||
|
case api.TypeCrossSigningUpdate:
|
||||||
|
return t.onCrossSigningMessage(m)
|
||||||
|
case api.TypeDeviceKeyUpdate:
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
return t.onDeviceKeyMessage(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
|
||||||
logger := log.WithField("user_id", m.UserID)
|
logger := log.WithField("user_id", m.UserID)
|
||||||
|
|
||||||
// only send key change events which originated from us
|
// only send key change events which originated from us
|
||||||
|
|
@ -133,6 +146,50 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
return t.queues.SendEDU(edu, t.serverName, destinations)
|
return t.queues.SendEDU(edu, t.serverName, destinations)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
|
||||||
|
output := m.CrossSigningKeyUpdate
|
||||||
|
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if host != gomatrixserverlib.ServerName(t.serverName) {
|
||||||
|
// Ignore any messages that didn't originate locally, otherwise we'll
|
||||||
|
// end up parroting information we received from other servers.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
logger := log.WithField("user_id", output.UserID)
|
||||||
|
|
||||||
|
var queryRes roomserverAPI.QueryRoomsForUserResponse
|
||||||
|
err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{
|
||||||
|
UserID: output.UserID,
|
||||||
|
WantMembership: "join",
|
||||||
|
}, &queryRes)
|
||||||
|
if err != nil {
|
||||||
|
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined rooms for user")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// send this key change to all servers who share rooms with this user.
|
||||||
|
destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs)
|
||||||
|
if err != nil {
|
||||||
|
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pack the EDU and marshal it
|
||||||
|
edu := &gomatrixserverlib.EDU{
|
||||||
|
Type: eduserverAPI.MSigningKeyUpdate,
|
||||||
|
Origin: string(t.serverName),
|
||||||
|
}
|
||||||
|
if edu.Content, err = json.Marshal(output); err != nil {
|
||||||
|
logger.WithError(err).Error("fedsender key change consumer: failed to marshal output, dropping")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Infof("Sending cross-signing update message to %q", destinations)
|
||||||
|
return t.queues.SendEDU(edu, t.serverName, destinations)
|
||||||
|
}
|
||||||
|
|
||||||
func prevID(streamID int) []int {
|
func prevID(streamID int) []int {
|
||||||
if streamID <= 1 {
|
if streamID <= 1 {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
2
go.mod
2
go.mod
|
|
@ -31,7 +31,7 @@ require (
|
||||||
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
|
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210809130922-d9c3f400582b
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20210817115641-f9416ac1a723
|
||||||
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0
|
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0
|
||||||
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b
|
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -994,8 +994,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210809130922-d9c3f400582b h1:8St1B8QmlvMLsOmGqW3++0akUs0250IAi+AGcr5faxw=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20210817115641-f9416ac1a723 h1:b8cyR4aYv9Lmf1lKgASJ+PFSp/GBv8ZFgb/O42ZXLGA=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210809130922-d9c3f400582b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20210817115641-f9416ac1a723/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
|
||||||
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 h1:HZCzy4oVzz55e+cOMiX/JtSF2UOY1evBl2raaE7ACcU=
|
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 h1:HZCzy4oVzz55e+cOMiX/JtSF2UOY1evBl2raaE7ACcU=
|
||||||
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
|
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b h1:5X5vdWQ13xrNkJVqaJHPsrt7rKkMJH5iac0EtfOuxSg=
|
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b h1:5X5vdWQ13xrNkJVqaJHPsrt7rKkMJH5iac0EtfOuxSg=
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log/syslog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
|
@ -30,6 +31,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dugong"
|
"github.com/matrix-org/dugong"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
lSyslog "github.com/sirupsen/logrus/hooks/syslog"
|
||||||
)
|
)
|
||||||
|
|
||||||
type utcFormatter struct {
|
type utcFormatter struct {
|
||||||
|
|
@ -128,6 +130,9 @@ func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
|
||||||
case "file":
|
case "file":
|
||||||
checkFileHookParams(hook.Params)
|
checkFileHookParams(hook.Params)
|
||||||
setupFileHook(hook, level, componentName)
|
setupFileHook(hook, level, componentName)
|
||||||
|
case "syslog":
|
||||||
|
checkSyslogHookParams(hook.Params)
|
||||||
|
setupSyslogHook(hook, level, componentName)
|
||||||
default:
|
default:
|
||||||
logrus.Fatalf("Unrecognised logging hook type: %s", hook.Type)
|
logrus.Fatalf("Unrecognised logging hook type: %s", hook.Type)
|
||||||
}
|
}
|
||||||
|
|
@ -173,6 +178,34 @@ func setupFileHook(hook config.LogrusHook, level logrus.Level, componentName str
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkSyslogHookParams(params map[string]interface{}) {
|
||||||
|
addr, ok := params["address"]
|
||||||
|
if !ok {
|
||||||
|
logrus.Fatalf("Expecting a parameter \"address\" for logging hook of type \"syslog\"")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := addr.(string); !ok {
|
||||||
|
logrus.Fatalf("Parameter \"address\" for logging hook of type \"syslog\" should be a string")
|
||||||
|
}
|
||||||
|
|
||||||
|
proto, ok2 := params["protocol"]
|
||||||
|
if !ok2 {
|
||||||
|
logrus.Fatalf("Expecting a parameter \"protocol\" for logging hook of type \"syslog\"")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok2 := proto.(string); !ok2 {
|
||||||
|
logrus.Fatalf("Parameter \"protocol\" for logging hook of type \"syslog\" should be a string")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupSyslogHook(hook config.LogrusHook, level logrus.Level, componentName string) {
|
||||||
|
syslogHook, err := lSyslog.NewSyslogHook(hook.Params["protocol"].(string), hook.Params["address"].(string), syslog.LOG_INFO, componentName)
|
||||||
|
if err == nil {
|
||||||
|
logrus.AddHook(&logLevelHook{level, syslogHook})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//CloseAndLogIfError Closes io.Closer and logs the error if any
|
//CloseAndLogIfError Closes io.Closer and logs the error if any
|
||||||
func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string) {
|
func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string) {
|
||||||
if closer == nil {
|
if closer == nil {
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
eduapi "github.com/matrix-org/dendrite/eduserver/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/types"
|
"github.com/matrix-org/dendrite/keyserver/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
@ -33,6 +34,7 @@ type KeyInternalAPI interface {
|
||||||
PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse)
|
PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse)
|
||||||
// PerformClaimKeys claims one-time keys for use in pre-key messages
|
// PerformClaimKeys claims one-time keys for use in pre-key messages
|
||||||
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
|
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
|
||||||
|
PerformDeleteKeys(ctx context.Context, req *PerformDeleteKeysRequest, res *PerformDeleteKeysResponse)
|
||||||
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
|
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
|
||||||
PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse)
|
PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse)
|
||||||
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
|
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
|
||||||
|
|
@ -47,6 +49,7 @@ type KeyError struct {
|
||||||
Err string `json:"error"`
|
Err string `json:"error"`
|
||||||
IsInvalidSignature bool `json:"is_invalid_signature,omitempty"` // M_INVALID_SIGNATURE
|
IsInvalidSignature bool `json:"is_invalid_signature,omitempty"` // M_INVALID_SIGNATURE
|
||||||
IsMissingParam bool `json:"is_missing_param,omitempty"` // M_MISSING_PARAM
|
IsMissingParam bool `json:"is_missing_param,omitempty"` // M_MISSING_PARAM
|
||||||
|
IsInvalidParam bool `json:"is_invalid_param,omitempty"` // M_INVALID_PARAM
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KeyError) Error() string {
|
func (k *KeyError) Error() string {
|
||||||
|
|
@ -64,6 +67,7 @@ const (
|
||||||
type DeviceMessage struct {
|
type DeviceMessage struct {
|
||||||
Type DeviceMessageType `json:"Type,omitempty"`
|
Type DeviceMessageType `json:"Type,omitempty"`
|
||||||
*DeviceKeys `json:"DeviceKeys,omitempty"`
|
*DeviceKeys `json:"DeviceKeys,omitempty"`
|
||||||
|
*eduapi.OutputCrossSigningKeyUpdate `json:"CrossSigningKeyUpdate,omitempty"`
|
||||||
// A monotonically increasing number which represents device changes for this user.
|
// A monotonically increasing number which represents device changes for this user.
|
||||||
StreamID int
|
StreamID int
|
||||||
}
|
}
|
||||||
|
|
@ -142,6 +146,18 @@ type PerformUploadKeysResponse struct {
|
||||||
OneTimeKeyCounts []OneTimeKeysCount
|
OneTimeKeyCounts []OneTimeKeysCount
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PerformDeleteKeysRequest asks the keyserver to forget about certain
|
||||||
|
// keys, and signatures related to those keys.
|
||||||
|
type PerformDeleteKeysRequest struct {
|
||||||
|
UserID string
|
||||||
|
KeyIDs []gomatrixserverlib.KeyID
|
||||||
|
}
|
||||||
|
|
||||||
|
// PerformDeleteKeysResponse is the response to PerformDeleteKeysRequest.
|
||||||
|
type PerformDeleteKeysResponse struct {
|
||||||
|
Error *KeyError
|
||||||
|
}
|
||||||
|
|
||||||
// KeyError sets a key error field on KeyErrors
|
// KeyError sets a key error field on KeyErrors
|
||||||
func (r *PerformUploadKeysResponse) KeyError(userID, deviceID string, err *KeyError) {
|
func (r *PerformUploadKeysResponse) KeyError(userID, deviceID string, err *KeyError) {
|
||||||
if r.KeyErrors[userID] == nil {
|
if r.KeyErrors[userID] == nil {
|
||||||
|
|
|
||||||
112
keyserver/consumers/cross_signing.go
Normal file
112
keyserver/consumers/cross_signing.go
Normal file
|
|
@ -0,0 +1,112 @@
|
||||||
|
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package consumers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/Shopify/sarama"
|
||||||
|
)
|
||||||
|
|
||||||
|
type OutputCrossSigningKeyUpdateConsumer struct {
|
||||||
|
eduServerConsumer *internal.ContinualConsumer
|
||||||
|
keyDB storage.Database
|
||||||
|
keyAPI api.KeyInternalAPI
|
||||||
|
serverName string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewOutputCrossSigningKeyUpdateConsumer(
|
||||||
|
process *process.ProcessContext,
|
||||||
|
cfg *config.Dendrite,
|
||||||
|
kafkaConsumer sarama.Consumer,
|
||||||
|
keyDB storage.Database,
|
||||||
|
keyAPI api.KeyInternalAPI,
|
||||||
|
) *OutputCrossSigningKeyUpdateConsumer {
|
||||||
|
// The keyserver both produces and consumes on the TopicOutputKeyChangeEvent
|
||||||
|
// topic. We will only produce events where the UserID matches our server name,
|
||||||
|
// and we will only consume events where the UserID does NOT match our server
|
||||||
|
// name (because the update came from a remote server).
|
||||||
|
consumer := internal.ContinualConsumer{
|
||||||
|
Process: process,
|
||||||
|
ComponentName: "keyserver/keyserver",
|
||||||
|
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputKeyChangeEvent),
|
||||||
|
Consumer: kafkaConsumer,
|
||||||
|
PartitionStore: keyDB,
|
||||||
|
}
|
||||||
|
s := &OutputCrossSigningKeyUpdateConsumer{
|
||||||
|
eduServerConsumer: &consumer,
|
||||||
|
keyDB: keyDB,
|
||||||
|
keyAPI: keyAPI,
|
||||||
|
serverName: string(cfg.Global.ServerName),
|
||||||
|
}
|
||||||
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
|
||||||
|
return s.eduServerConsumer.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
// onMessage is called in response to a message received on the
|
||||||
|
// key change events topic from the key server.
|
||||||
|
func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
|
var m api.DeviceMessage
|
||||||
|
if err := json.Unmarshal(msg.Value, &m); err != nil {
|
||||||
|
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
switch m.Type {
|
||||||
|
case api.TypeCrossSigningUpdate:
|
||||||
|
return t.onCrossSigningMessage(m)
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
|
||||||
|
output := m.CrossSigningKeyUpdate
|
||||||
|
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Errorf("eduserver output log: user ID parse failure")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if host == gomatrixserverlib.ServerName(s.serverName) {
|
||||||
|
// Ignore any messages that contain information about our own users, as
|
||||||
|
// they already originated from this server.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
uploadReq := &api.PerformUploadDeviceKeysRequest{
|
||||||
|
UserID: output.UserID,
|
||||||
|
}
|
||||||
|
if output.MasterKey != nil {
|
||||||
|
uploadReq.MasterKey = *output.MasterKey
|
||||||
|
}
|
||||||
|
if output.SelfSigningKey != nil {
|
||||||
|
uploadReq.SelfSigningKey = *output.SelfSigningKey
|
||||||
|
}
|
||||||
|
uploadRes := &api.PerformUploadDeviceKeysResponse{}
|
||||||
|
s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes)
|
||||||
|
return uploadRes.Error
|
||||||
|
}
|
||||||
|
|
@ -1,61 +0,0 @@
|
||||||
package consumers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
|
||||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
|
||||||
)
|
|
||||||
|
|
||||||
type OutputSigningKeyUpdateConsumer struct {
|
|
||||||
eduServerConsumer *internal.ContinualConsumer
|
|
||||||
keyDB storage.Database
|
|
||||||
keyAPI api.KeyInternalAPI
|
|
||||||
serverName string
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewOutputSigningKeyUpdateConsumer(
|
|
||||||
process *process.ProcessContext,
|
|
||||||
cfg *config.Dendrite,
|
|
||||||
kafkaConsumer sarama.Consumer,
|
|
||||||
keyDB storage.Database,
|
|
||||||
keyAPI api.KeyInternalAPI,
|
|
||||||
) *OutputSigningKeyUpdateConsumer {
|
|
||||||
consumer := internal.ContinualConsumer{
|
|
||||||
Process: process,
|
|
||||||
ComponentName: "keyserver/eduserver",
|
|
||||||
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate),
|
|
||||||
Consumer: kafkaConsumer,
|
|
||||||
PartitionStore: keyDB,
|
|
||||||
}
|
|
||||||
s := &OutputSigningKeyUpdateConsumer{
|
|
||||||
eduServerConsumer: &consumer,
|
|
||||||
keyDB: keyDB,
|
|
||||||
keyAPI: keyAPI,
|
|
||||||
serverName: string(cfg.Global.ServerName),
|
|
||||||
}
|
|
||||||
consumer.ProcessMessage = s.onMessage
|
|
||||||
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *OutputSigningKeyUpdateConsumer) Start() error {
|
|
||||||
return s.eduServerConsumer.Start()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|
||||||
/*
|
|
||||||
var output eduapi.OutputSigningKeyUpdate
|
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
|
||||||
log.WithError(err).Errorf("eduserver output log: message parse failure")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
*/
|
|
||||||
return fmt.Errorf("TODO")
|
|
||||||
}
|
|
||||||
|
|
@ -19,14 +19,15 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ed25519"
|
"crypto/ed25519"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/types"
|
"github.com/matrix-org/dendrite/keyserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/crypto/curve25519"
|
||||||
)
|
)
|
||||||
|
|
||||||
func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpose gomatrixserverlib.CrossSigningKeyPurpose) error {
|
func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpose gomatrixserverlib.CrossSigningKeyPurpose) error {
|
||||||
|
|
@ -45,6 +46,41 @@ func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpos
|
||||||
if tokens[1] != b64 {
|
if tokens[1] != b64 {
|
||||||
return fmt.Errorf("key ID isn't correct")
|
return fmt.Errorf("key ID isn't correct")
|
||||||
}
|
}
|
||||||
|
switch tokens[0] {
|
||||||
|
case "ed25519":
|
||||||
|
if len(keyData) != ed25519.PublicKeySize {
|
||||||
|
return fmt.Errorf("ed25519 key is not the correct length")
|
||||||
|
}
|
||||||
|
case "curve25519":
|
||||||
|
if len(keyData) != curve25519.PointSize {
|
||||||
|
return fmt.Errorf("curve25519 key is not the correct length")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// We can't enforce the key length to be correct for an
|
||||||
|
// algorithm that we don't recognise, so instead we'll
|
||||||
|
// just make sure that it isn't incredibly excessive.
|
||||||
|
if l := len(keyData); l > 4096 {
|
||||||
|
return fmt.Errorf("unknown key type is too long (%d bytes)", l)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check to see if the signatures make sense
|
||||||
|
for _, forOriginUser := range key.Signatures {
|
||||||
|
for originKeyID, originSignature := range forOriginUser {
|
||||||
|
switch strings.SplitN(string(originKeyID), ":", 1)[0] {
|
||||||
|
case "ed25519":
|
||||||
|
if len(originSignature) != ed25519.SignatureSize {
|
||||||
|
return fmt.Errorf("ed25519 signature is not the correct length")
|
||||||
|
}
|
||||||
|
case "curve25519":
|
||||||
|
return fmt.Errorf("curve25519 signatures are impossible")
|
||||||
|
default:
|
||||||
|
if l := len(originSignature); l > 4096 {
|
||||||
|
return fmt.Errorf("unknown signature type is too long (%d bytes)", l)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Does the key claim to be from the right user?
|
// Does the key claim to be from the right user?
|
||||||
|
|
@ -69,42 +105,68 @@ func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpos
|
||||||
|
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) {
|
func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) {
|
||||||
var masterKey gomatrixserverlib.Base64Bytes
|
// Find the keys to store.
|
||||||
|
byPurpose := map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey{}
|
||||||
|
toStore := types.CrossSigningKeyMap{}
|
||||||
hasMasterKey := false
|
hasMasterKey := false
|
||||||
|
|
||||||
if len(req.MasterKey.Keys) > 0 {
|
if len(req.MasterKey.Keys) > 0 {
|
||||||
if err := sanityCheckKey(req.MasterKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err != nil {
|
if err := sanityCheckKey(req.MasterKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err != nil {
|
||||||
res.Error = &api.KeyError{
|
res.Error = &api.KeyError{
|
||||||
Err: "Master key sanity check failed: " + err.Error(),
|
Err: "Master key sanity check failed: " + err.Error(),
|
||||||
|
IsInvalidParam: true,
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, keyData := range req.MasterKey.Keys { // iterates once, because sanityCheckKey requires one key
|
|
||||||
hasMasterKey = true
|
byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster] = req.MasterKey
|
||||||
masterKey = keyData
|
for _, key := range req.MasterKey.Keys { // iterates once, see sanityCheckKey
|
||||||
|
toStore[gomatrixserverlib.CrossSigningKeyPurposeMaster] = key
|
||||||
}
|
}
|
||||||
|
hasMasterKey = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(req.SelfSigningKey.Keys) > 0 {
|
if len(req.SelfSigningKey.Keys) > 0 {
|
||||||
if err := sanityCheckKey(req.SelfSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeSelfSigning); err != nil {
|
if err := sanityCheckKey(req.SelfSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeSelfSigning); err != nil {
|
||||||
res.Error = &api.KeyError{
|
res.Error = &api.KeyError{
|
||||||
Err: "Self-signing key sanity check failed: " + err.Error(),
|
Err: "Self-signing key sanity check failed: " + err.Error(),
|
||||||
|
IsInvalidParam: true,
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning] = req.SelfSigningKey
|
||||||
|
for _, key := range req.SelfSigningKey.Keys { // iterates once, see sanityCheckKey
|
||||||
|
toStore[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning] = key
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(req.UserSigningKey.Keys) > 0 {
|
if len(req.UserSigningKey.Keys) > 0 {
|
||||||
if err := sanityCheckKey(req.UserSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeUserSigning); err != nil {
|
if err := sanityCheckKey(req.UserSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeUserSigning); err != nil {
|
||||||
res.Error = &api.KeyError{
|
res.Error = &api.KeyError{
|
||||||
Err: "User-signing key sanity check failed: " + err.Error(),
|
Err: "User-signing key sanity check failed: " + err.Error(),
|
||||||
|
IsInvalidParam: true,
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
byPurpose[gomatrixserverlib.CrossSigningKeyPurposeUserSigning] = req.UserSigningKey
|
||||||
|
for _, key := range req.UserSigningKey.Keys { // iterates once, see sanityCheckKey
|
||||||
|
toStore[gomatrixserverlib.CrossSigningKeyPurposeUserSigning] = key
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the user hasn't given a new master key, then let's go and get their
|
// If there's nothing to do then stop here.
|
||||||
// existing keys from the database.
|
if len(toStore) == 0 {
|
||||||
|
res.Error = &api.KeyError{
|
||||||
|
Err: "No keys were supplied in the request",
|
||||||
|
IsMissingParam: true,
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can't have a self-signing or user-signing key without a master
|
||||||
|
// key, so make sure we have one of those.
|
||||||
if !hasMasterKey {
|
if !hasMasterKey {
|
||||||
existingKeys, err := a.DB.CrossSigningKeysDataForUser(ctx, req.UserID)
|
existingKeys, err := a.DB.CrossSigningKeysDataForUser(ctx, req.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -114,87 +176,20 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
masterKey, hasMasterKey = existingKeys[gomatrixserverlib.CrossSigningKeyPurposeMaster]
|
_, hasMasterKey = existingKeys[gomatrixserverlib.CrossSigningKeyPurposeMaster]
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we still don't have a master key at this point then there's nothing else
|
// If we still can't find a master key for the user then stop the upload.
|
||||||
// we can do - we've checked both the request and the database.
|
// This satisfies the "Fails to upload self-signing key without master key" test.
|
||||||
if !hasMasterKey {
|
if !hasMasterKey {
|
||||||
res.Error = &api.KeyError{
|
res.Error = &api.KeyError{
|
||||||
Err: "No master key was found either in the database or in the request!",
|
Err: "No master key was found",
|
||||||
IsMissingParam: true,
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// The key ID is basically the key itself.
|
|
||||||
masterKeyID := gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", masterKey.Encode()))
|
|
||||||
|
|
||||||
// Work out which things we need to verify the signatures for.
|
|
||||||
toVerify := make(map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey, 3)
|
|
||||||
toStore := types.CrossSigningKeyMap{}
|
|
||||||
if len(req.MasterKey.Keys) > 0 {
|
|
||||||
toVerify[gomatrixserverlib.CrossSigningKeyPurposeMaster] = req.MasterKey
|
|
||||||
}
|
|
||||||
if len(req.SelfSigningKey.Keys) > 0 {
|
|
||||||
toVerify[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning] = req.SelfSigningKey
|
|
||||||
}
|
|
||||||
if len(req.UserSigningKey.Keys) > 0 {
|
|
||||||
toVerify[gomatrixserverlib.CrossSigningKeyPurposeUserSigning] = req.UserSigningKey
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(toVerify) == 0 {
|
|
||||||
res.Error = &api.KeyError{
|
|
||||||
Err: "No supplied keys available for verification",
|
|
||||||
IsMissingParam: true,
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for purpose, key := range toVerify {
|
|
||||||
// Collect together the key IDs we need to verify with. This will include
|
|
||||||
// all of the key IDs specified in the signatures.
|
|
||||||
keyJSON, err := json.Marshal(key)
|
|
||||||
if err != nil {
|
|
||||||
res.Error = &api.KeyError{
|
|
||||||
Err: fmt.Sprintf("The JSON of the key section is invalid: %s", err.Error()),
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
switch purpose {
|
|
||||||
case gomatrixserverlib.CrossSigningKeyPurposeMaster:
|
|
||||||
// The master key might have a signature attached to it from the
|
|
||||||
// previous key, or from a device key, but there's no real need
|
|
||||||
// to verify it. Clients will perform key checks when the master
|
|
||||||
// key changes.
|
|
||||||
|
|
||||||
default:
|
|
||||||
// Sub-keys should be signed by the master key.
|
|
||||||
if err := gomatrixserverlib.VerifyJSON(req.UserID, masterKeyID, ed25519.PublicKey(masterKey), keyJSON); err != nil {
|
|
||||||
res.Error = &api.KeyError{
|
|
||||||
Err: fmt.Sprintf("The %q sub-key failed master key signature verification: %s", purpose, err.Error()),
|
|
||||||
IsInvalidSignature: true,
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we've reached this point then all the signatures are valid so
|
|
||||||
// add the key to the list of keys to store.
|
|
||||||
for _, keyData := range key.Keys { // iterates once, see sanityCheckKey
|
|
||||||
toStore[purpose] = keyData
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(toStore) == 0 {
|
|
||||||
res.Error = &api.KeyError{
|
|
||||||
Err: "No supplied keys passed verification",
|
|
||||||
IsMissingParam: true,
|
IsMissingParam: true,
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Store the keys.
|
||||||
if err := a.DB.StoreCrossSigningKeysForUser(ctx, req.UserID, toStore); err != nil {
|
if err := a.DB.StoreCrossSigningKeysForUser(ctx, req.UserID, toStore); err != nil {
|
||||||
res.Error = &api.KeyError{
|
res.Error = &api.KeyError{
|
||||||
Err: fmt.Sprintf("a.DB.StoreCrossSigningKeysForUser: %s", err),
|
Err: fmt.Sprintf("a.DB.StoreCrossSigningKeysForUser: %s", err),
|
||||||
|
|
@ -203,7 +198,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now upload any signatures that were included with the keys.
|
// Now upload any signatures that were included with the keys.
|
||||||
for _, key := range toVerify {
|
for _, key := range byPurpose {
|
||||||
var targetKeyID gomatrixserverlib.KeyID
|
var targetKeyID gomatrixserverlib.KeyID
|
||||||
for targetKey := range key.Keys { // iterates once, see sanityCheckKey
|
for targetKey := range key.Keys { // iterates once, see sanityCheckKey
|
||||||
targetKeyID = targetKey
|
targetKeyID = targetKey
|
||||||
|
|
@ -222,6 +217,28 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Finally, generate a notification that we updated the keys.
|
||||||
|
if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer {
|
||||||
|
update := eduserverAPI.CrossSigningKeyUpdate{
|
||||||
|
UserID: req.UserID,
|
||||||
|
}
|
||||||
|
if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok {
|
||||||
|
update.MasterKey = &mk
|
||||||
|
}
|
||||||
|
if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok {
|
||||||
|
update.SelfSigningKey = &ssk
|
||||||
|
}
|
||||||
|
if update.MasterKey == nil && update.SelfSigningKey == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
|
||||||
|
res.Error = &api.KeyError{
|
||||||
|
Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req *api.PerformUploadDeviceSignaturesRequest, res *api.PerformUploadDeviceSignaturesResponse) {
|
func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req *api.PerformUploadDeviceSignaturesRequest, res *api.PerformUploadDeviceSignaturesResponse) {
|
||||||
|
|
@ -277,7 +294,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := a.processSelfSignatures(ctx, req.UserID, queryRes, selfSignatures); err != nil {
|
if err := a.processSelfSignatures(ctx, selfSignatures); err != nil {
|
||||||
res.Error = &api.KeyError{
|
res.Error = &api.KeyError{
|
||||||
Err: fmt.Sprintf("a.processSelfSignatures: %s", err),
|
Err: fmt.Sprintf("a.processSelfSignatures: %s", err),
|
||||||
}
|
}
|
||||||
|
|
@ -290,10 +307,25 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Finally, generate a notification that we updated the signatures.
|
||||||
|
for userID := range req.Signatures {
|
||||||
|
if _, host, err := gomatrixserverlib.SplitID('@', userID); err == nil && host == a.ThisServer {
|
||||||
|
update := eduserverAPI.CrossSigningKeyUpdate{
|
||||||
|
UserID: userID,
|
||||||
|
}
|
||||||
|
if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
|
||||||
|
res.Error = &api.KeyError{
|
||||||
|
Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *KeyInternalAPI) processSelfSignatures(
|
func (a *KeyInternalAPI) processSelfSignatures(
|
||||||
ctx context.Context, _ string, queryRes *api.QueryKeysResponse,
|
ctx context.Context,
|
||||||
signatures map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice,
|
signatures map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice,
|
||||||
) error {
|
) error {
|
||||||
// Here we will process:
|
// Here we will process:
|
||||||
|
|
@ -304,37 +336,8 @@ func (a *KeyInternalAPI) processSelfSignatures(
|
||||||
for targetKeyID, signature := range forTargetUserID {
|
for targetKeyID, signature := range forTargetUserID {
|
||||||
switch sig := signature.CrossSigningBody.(type) {
|
switch sig := signature.CrossSigningBody.(type) {
|
||||||
case *gomatrixserverlib.CrossSigningKey:
|
case *gomatrixserverlib.CrossSigningKey:
|
||||||
// The user is signing their master key with one of their devices
|
|
||||||
// The QueryKeys response should contain the device key hopefully.
|
|
||||||
// First we need to marshal the blob back into JSON so we can verify
|
|
||||||
// it.
|
|
||||||
j, err := json.Marshal(sig)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("json.Marshal: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for originUserID, forOriginUserID := range sig.Signatures {
|
for originUserID, forOriginUserID := range sig.Signatures {
|
||||||
originDeviceKeys, ok := queryRes.DeviceKeys[originUserID]
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("missing device keys for user %q", originUserID)
|
|
||||||
}
|
|
||||||
|
|
||||||
for originKeyID, originSig := range forOriginUserID {
|
for originKeyID, originSig := range forOriginUserID {
|
||||||
var originKey gomatrixserverlib.DeviceKeys
|
|
||||||
if err := json.Unmarshal(originDeviceKeys[string(originKeyID)], &originKey); err != nil {
|
|
||||||
return fmt.Errorf("json.Unmarshal: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
originSigningKey, ok := originKey.Keys[originKeyID]
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("missing origin signing key %q", originKeyID)
|
|
||||||
}
|
|
||||||
originSigningKeyPublic := ed25519.PublicKey(originSigningKey)
|
|
||||||
|
|
||||||
if err := gomatrixserverlib.VerifyJSON(originUserID, originKeyID, originSigningKeyPublic, j); err != nil {
|
|
||||||
return fmt.Errorf("gomatrixserverlib.VerifyJSON: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := a.DB.StoreCrossSigningSigsForTarget(
|
if err := a.DB.StoreCrossSigningSigsForTarget(
|
||||||
ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig,
|
ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
|
|
@ -344,35 +347,8 @@ func (a *KeyInternalAPI) processSelfSignatures(
|
||||||
}
|
}
|
||||||
|
|
||||||
case *gomatrixserverlib.DeviceKeys:
|
case *gomatrixserverlib.DeviceKeys:
|
||||||
// The user is signing one of their devices with their self-signing key
|
|
||||||
// The QueryKeys response should contain the master key hopefully.
|
|
||||||
// First we need to marshal the blob back into JSON so we can verify
|
|
||||||
// it.
|
|
||||||
j, err := json.Marshal(sig)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("json.Marshal: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for originUserID, forOriginUserID := range sig.Signatures {
|
for originUserID, forOriginUserID := range sig.Signatures {
|
||||||
for originKeyID, originSig := range forOriginUserID {
|
for originKeyID, originSig := range forOriginUserID {
|
||||||
originSelfSigningKeys, ok := queryRes.SelfSigningKeys[originUserID]
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("missing self-signing key for user %q", originUserID)
|
|
||||||
}
|
|
||||||
|
|
||||||
var originSelfSigningKeyID gomatrixserverlib.KeyID
|
|
||||||
var originSelfSigningKey gomatrixserverlib.Base64Bytes
|
|
||||||
for keyID, key := range originSelfSigningKeys.Keys {
|
|
||||||
originSelfSigningKeyID, originSelfSigningKey = keyID, key
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
originSelfSigningKeyPublic := ed25519.PublicKey(originSelfSigningKey)
|
|
||||||
|
|
||||||
if err := gomatrixserverlib.VerifyJSON(originUserID, originSelfSigningKeyID, originSelfSigningKeyPublic, j); err != nil {
|
|
||||||
return fmt.Errorf("gomatrixserverlib.VerifyJSON: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := a.DB.StoreCrossSigningSigsForTarget(
|
if err := a.DB.StoreCrossSigningSigsForTarget(
|
||||||
ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig,
|
ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -182,6 +182,14 @@ func (a *KeyInternalAPI) claimRemoteKeys(
|
||||||
util.GetLogger(ctx).WithField("num_keys", keysClaimed).Info("Claimed remote keys")
|
util.GetLogger(ctx).WithField("num_keys", keysClaimed).Info("Claimed remote keys")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *KeyInternalAPI) PerformDeleteKeys(ctx context.Context, req *api.PerformDeleteKeysRequest, res *api.PerformDeleteKeysResponse) {
|
||||||
|
if err := a.DB.DeleteDeviceKeys(ctx, req.UserID, req.KeyIDs); err != nil {
|
||||||
|
res.Error = &api.KeyError{
|
||||||
|
Err: fmt.Sprintf("Failed to delete device keys: %s", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (a *KeyInternalAPI) QueryOneTimeKeys(ctx context.Context, req *api.QueryOneTimeKeysRequest, res *api.QueryOneTimeKeysResponse) {
|
func (a *KeyInternalAPI) QueryOneTimeKeys(ctx context.Context, req *api.QueryOneTimeKeysRequest, res *api.QueryOneTimeKeysResponse) {
|
||||||
count, err := a.DB.OneTimeKeysCount(ctx, req.UserID, req.DeviceID)
|
count, err := a.DB.OneTimeKeysCount(ctx, req.UserID, req.DeviceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ const (
|
||||||
InputDeviceListUpdatePath = "/keyserver/inputDeviceListUpdate"
|
InputDeviceListUpdatePath = "/keyserver/inputDeviceListUpdate"
|
||||||
PerformUploadKeysPath = "/keyserver/performUploadKeys"
|
PerformUploadKeysPath = "/keyserver/performUploadKeys"
|
||||||
PerformClaimKeysPath = "/keyserver/performClaimKeys"
|
PerformClaimKeysPath = "/keyserver/performClaimKeys"
|
||||||
|
PerformDeleteKeysPath = "/keyserver/performDeleteKeys"
|
||||||
PerformUploadDeviceKeysPath = "/keyserver/performUploadDeviceKeys"
|
PerformUploadDeviceKeysPath = "/keyserver/performUploadDeviceKeys"
|
||||||
PerformUploadDeviceSignaturesPath = "/keyserver/performUploadDeviceSignatures"
|
PerformUploadDeviceSignaturesPath = "/keyserver/performUploadDeviceSignatures"
|
||||||
QueryKeysPath = "/keyserver/queryKeys"
|
QueryKeysPath = "/keyserver/queryKeys"
|
||||||
|
|
@ -94,6 +95,23 @@ func (h *httpKeyInternalAPI) PerformClaimKeys(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *httpKeyInternalAPI) PerformDeleteKeys(
|
||||||
|
ctx context.Context,
|
||||||
|
request *api.PerformDeleteKeysRequest,
|
||||||
|
response *api.PerformDeleteKeysResponse,
|
||||||
|
) {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformClaimKeys")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.apiURL + PerformClaimKeysPath
|
||||||
|
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
|
if err != nil {
|
||||||
|
response.Error = &api.KeyError{
|
||||||
|
Err: err.Error(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (h *httpKeyInternalAPI) PerformUploadKeys(
|
func (h *httpKeyInternalAPI) PerformUploadKeys(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.PerformUploadKeysRequest,
|
request *api.PerformUploadKeysRequest,
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,17 @@ func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
internalAPIMux.Handle(PerformDeleteKeysPath,
|
||||||
|
httputil.MakeInternalAPI("performDeleteKeys", func(req *http.Request) util.JSONResponse {
|
||||||
|
request := api.PerformDeleteKeysRequest{}
|
||||||
|
response := api.PerformDeleteKeysResponse{}
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||||
|
}
|
||||||
|
s.PerformDeleteKeys(req.Context(), &request, &response)
|
||||||
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
}),
|
||||||
|
)
|
||||||
internalAPIMux.Handle(PerformUploadKeysPath,
|
internalAPIMux.Handle(PerformUploadKeysPath,
|
||||||
httputil.MakeInternalAPI("performUploadKeys", func(req *http.Request) util.JSONResponse {
|
httputil.MakeInternalAPI("performUploadKeys", func(req *http.Request) util.JSONResponse {
|
||||||
request := api.PerformUploadKeysRequest{}
|
request := api.PerformUploadKeysRequest{}
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ func NewInternalAPI(
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
keyconsumer := consumers.NewOutputSigningKeyUpdateConsumer(
|
keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
|
||||||
base.ProcessContext, base.Cfg, consumer, db, ap,
|
base.ProcessContext, base.Cfg, consumer, db, ap,
|
||||||
)
|
)
|
||||||
if err := keyconsumer.Start(); err != nil {
|
if err := keyconsumer.Start(); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
|
eduapi "github.com/matrix-org/dendrite/eduserver/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
@ -73,3 +74,36 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *KeyChange) ProduceSigningKeyUpdate(key eduapi.CrossSigningKeyUpdate) error {
|
||||||
|
var m sarama.ProducerMessage
|
||||||
|
output := &api.DeviceMessage{
|
||||||
|
Type: api.TypeCrossSigningUpdate,
|
||||||
|
OutputCrossSigningKeyUpdate: &eduapi.OutputCrossSigningKeyUpdate{
|
||||||
|
CrossSigningKeyUpdate: key,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
value, err := json.Marshal(output)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Topic = string(p.Topic)
|
||||||
|
m.Key = sarama.StringEncoder(key.UserID)
|
||||||
|
m.Value = sarama.ByteEncoder(value)
|
||||||
|
|
||||||
|
partition, offset, err := p.Producer.SendMessage(&m)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = p.DB.StoreKeyChange(context.Background(), partition, offset, key.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"user_id": key.UserID,
|
||||||
|
}).Infof("Produced to cross-signing update topic '%s'", p.Topic)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -58,6 +58,10 @@ type Database interface {
|
||||||
// If there are some missing keys, they are omitted from the returned slice. There is no ordering on the returned slice.
|
// If there are some missing keys, they are omitted from the returned slice. There is no ordering on the returned slice.
|
||||||
DeviceKeysForUser(ctx context.Context, userID string, deviceIDs []string) ([]api.DeviceMessage, error)
|
DeviceKeysForUser(ctx context.Context, userID string, deviceIDs []string) ([]api.DeviceMessage, error)
|
||||||
|
|
||||||
|
// DeleteDeviceKeys removes the device keys for a given user/device, and any accompanying
|
||||||
|
// cross-signing signatures relating to that device.
|
||||||
|
DeleteDeviceKeys(ctx context.Context, userID string, deviceIDs []gomatrixserverlib.KeyID) error
|
||||||
|
|
||||||
// ClaimKeys based on the 3-uple of user_id, device_id and algorithm name. Returns the keys claimed. Returns no error if a key
|
// ClaimKeys based on the 3-uple of user_id, device_id and algorithm name. Returns the keys claimed. Returns no error if a key
|
||||||
// cannot be claimed or if none exist for this (user, device, algorithm), instead it is omitted from the returned slice.
|
// cannot be claimed or if none exist for this (user, device, algorithm), instead it is omitted from the returned slice.
|
||||||
ClaimKeys(ctx context.Context, userToDeviceToAlgorithm map[string]map[string]string) ([]api.OneTimeKeys, error)
|
ClaimKeys(ctx context.Context, userToDeviceToAlgorithm map[string]map[string]string) ([]api.OneTimeKeys, error)
|
||||||
|
|
|
||||||
|
|
@ -46,10 +46,14 @@ const upsertCrossSigningSigsForTargetSQL = "" +
|
||||||
" VALUES($1, $2, $3, $4, $5)" +
|
" VALUES($1, $2, $3, $4, $5)" +
|
||||||
" ON CONFLICT (origin_user_id, target_user_id, target_key_id) DO UPDATE SET (origin_key_id, signature) = ($2, $5)"
|
" ON CONFLICT (origin_user_id, target_user_id, target_key_id) DO UPDATE SET (origin_key_id, signature) = ($2, $5)"
|
||||||
|
|
||||||
|
const deleteCrossSigningSigsForTargetSQL = "" +
|
||||||
|
"DELETE FROM keyserver_cross_signing_sigs WHERE target_user_id=$1 AND target_key_id=$2"
|
||||||
|
|
||||||
type crossSigningSigsStatements struct {
|
type crossSigningSigsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
selectCrossSigningSigsForTargetStmt *sql.Stmt
|
selectCrossSigningSigsForTargetStmt *sql.Stmt
|
||||||
upsertCrossSigningSigsForTargetStmt *sql.Stmt
|
upsertCrossSigningSigsForTargetStmt *sql.Stmt
|
||||||
|
deleteCrossSigningSigsForTargetStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, error) {
|
func NewPostgresCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, error) {
|
||||||
|
|
@ -63,6 +67,7 @@ func NewPostgresCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, erro
|
||||||
return s, sqlutil.StatementList{
|
return s, sqlutil.StatementList{
|
||||||
{&s.selectCrossSigningSigsForTargetStmt, selectCrossSigningSigsForTargetSQL},
|
{&s.selectCrossSigningSigsForTargetStmt, selectCrossSigningSigsForTargetSQL},
|
||||||
{&s.upsertCrossSigningSigsForTargetStmt, upsertCrossSigningSigsForTargetSQL},
|
{&s.upsertCrossSigningSigsForTargetStmt, upsertCrossSigningSigsForTargetSQL},
|
||||||
|
{&s.deleteCrossSigningSigsForTargetStmt, deleteCrossSigningSigsForTargetSQL},
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -101,3 +106,13 @@ func (s *crossSigningSigsStatements) UpsertCrossSigningSigsForTarget(
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *crossSigningSigsStatements) DeleteCrossSigningSigsForTarget(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
targetUserID string, targetKeyID gomatrixserverlib.KeyID,
|
||||||
|
) error {
|
||||||
|
if _, err := sqlutil.TxStmt(txn, s.deleteCrossSigningSigsForTargetStmt).ExecContext(ctx, targetUserID, targetKeyID); err != nil {
|
||||||
|
return fmt.Errorf("s.deleteCrossSigningSigsForTargetStmt: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -62,6 +62,9 @@ const selectMaxStreamForUserSQL = "" +
|
||||||
const countStreamIDsForUserSQL = "" +
|
const countStreamIDsForUserSQL = "" +
|
||||||
"SELECT COUNT(*) FROM keyserver_device_keys WHERE user_id=$1 AND stream_id = ANY($2)"
|
"SELECT COUNT(*) FROM keyserver_device_keys WHERE user_id=$1 AND stream_id = ANY($2)"
|
||||||
|
|
||||||
|
const deleteDeviceKeysSQL = "" +
|
||||||
|
"DELETE FROM keyserver_device_keys WHERE user_id=$1 AND device_id=$2"
|
||||||
|
|
||||||
const deleteAllDeviceKeysSQL = "" +
|
const deleteAllDeviceKeysSQL = "" +
|
||||||
"DELETE FROM keyserver_device_keys WHERE user_id=$1"
|
"DELETE FROM keyserver_device_keys WHERE user_id=$1"
|
||||||
|
|
||||||
|
|
@ -72,6 +75,7 @@ type deviceKeysStatements struct {
|
||||||
selectBatchDeviceKeysStmt *sql.Stmt
|
selectBatchDeviceKeysStmt *sql.Stmt
|
||||||
selectMaxStreamForUserStmt *sql.Stmt
|
selectMaxStreamForUserStmt *sql.Stmt
|
||||||
countStreamIDsForUserStmt *sql.Stmt
|
countStreamIDsForUserStmt *sql.Stmt
|
||||||
|
deleteDeviceKeysStmt *sql.Stmt
|
||||||
deleteAllDeviceKeysStmt *sql.Stmt
|
deleteAllDeviceKeysStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -98,6 +102,9 @@ func NewPostgresDeviceKeysTable(db *sql.DB) (tables.DeviceKeys, error) {
|
||||||
if s.countStreamIDsForUserStmt, err = db.Prepare(countStreamIDsForUserSQL); err != nil {
|
if s.countStreamIDsForUserStmt, err = db.Prepare(countStreamIDsForUserSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if s.deleteDeviceKeysStmt, err = db.Prepare(deleteDeviceKeysSQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if s.deleteAllDeviceKeysStmt, err = db.Prepare(deleteAllDeviceKeysSQL); err != nil {
|
if s.deleteAllDeviceKeysStmt, err = db.Prepare(deleteAllDeviceKeysSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -163,6 +170,11 @@ func (s *deviceKeysStatements) InsertDeviceKeys(ctx context.Context, txn *sql.Tx
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *deviceKeysStatements) DeleteDeviceKeys(ctx context.Context, txn *sql.Tx, userID, deviceID string) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.deleteDeviceKeysStmt).ExecContext(ctx, userID, deviceID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *deviceKeysStatements) DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error {
|
func (s *deviceKeysStatements) DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error {
|
||||||
_, err := sqlutil.TxStmt(txn, s.deleteAllDeviceKeysStmt).ExecContext(ctx, userID)
|
_, err := sqlutil.TxStmt(txn, s.deleteAllDeviceKeysStmt).ExecContext(ctx, userID)
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -158,6 +158,22 @@ func (d *Database) MarkDeviceListStale(ctx context.Context, userID string, isSta
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteDeviceKeys removes the device keys for a given user/device, and any accompanying
|
||||||
|
// cross-signing signatures relating to that device.
|
||||||
|
func (d *Database) DeleteDeviceKeys(ctx context.Context, userID string, deviceIDs []gomatrixserverlib.KeyID) error {
|
||||||
|
return d.Writer.Do(nil, nil, func(txn *sql.Tx) error {
|
||||||
|
for _, deviceID := range deviceIDs {
|
||||||
|
if err := d.CrossSigningSigsTable.DeleteCrossSigningSigsForTarget(ctx, txn, userID, deviceID); err != nil && err != sql.ErrNoRows {
|
||||||
|
return fmt.Errorf("d.CrossSigningSigsTable.DeleteCrossSigningSigsForTarget: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.DeviceKeysTable.DeleteDeviceKeys(ctx, txn, userID, string(deviceID)); err != nil && err != sql.ErrNoRows {
|
||||||
|
return fmt.Errorf("d.DeviceKeysTable.DeleteDeviceKeys: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// CrossSigningKeysForUser returns the latest known cross-signing keys for a user, if any.
|
// CrossSigningKeysForUser returns the latest known cross-signing keys for a user, if any.
|
||||||
func (d *Database) CrossSigningKeysForUser(ctx context.Context, userID string) (map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey, error) {
|
func (d *Database) CrossSigningKeysForUser(ctx context.Context, userID string) (map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey, error) {
|
||||||
keyMap, err := d.CrossSigningKeysTable.SelectCrossSigningKeysForUser(ctx, nil, userID)
|
keyMap, err := d.CrossSigningKeysTable.SelectCrossSigningKeysForUser(ctx, nil, userID)
|
||||||
|
|
|
||||||
|
|
@ -45,10 +45,14 @@ const upsertCrossSigningSigsForTargetSQL = "" +
|
||||||
"INSERT OR REPLACE INTO keyserver_cross_signing_sigs (origin_user_id, origin_key_id, target_user_id, target_key_id, signature)" +
|
"INSERT OR REPLACE INTO keyserver_cross_signing_sigs (origin_user_id, origin_key_id, target_user_id, target_key_id, signature)" +
|
||||||
" VALUES($1, $2, $3, $4, $5)"
|
" VALUES($1, $2, $3, $4, $5)"
|
||||||
|
|
||||||
|
const deleteCrossSigningSigsForTargetSQL = "" +
|
||||||
|
"DELETE FROM keyserver_cross_signing_sigs WHERE target_user_id=$1 AND target_key_id=$2"
|
||||||
|
|
||||||
type crossSigningSigsStatements struct {
|
type crossSigningSigsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
selectCrossSigningSigsForTargetStmt *sql.Stmt
|
selectCrossSigningSigsForTargetStmt *sql.Stmt
|
||||||
upsertCrossSigningSigsForTargetStmt *sql.Stmt
|
upsertCrossSigningSigsForTargetStmt *sql.Stmt
|
||||||
|
deleteCrossSigningSigsForTargetStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, error) {
|
func NewSqliteCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, error) {
|
||||||
|
|
@ -62,6 +66,7 @@ func NewSqliteCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, error)
|
||||||
return s, sqlutil.StatementList{
|
return s, sqlutil.StatementList{
|
||||||
{&s.selectCrossSigningSigsForTargetStmt, selectCrossSigningSigsForTargetSQL},
|
{&s.selectCrossSigningSigsForTargetStmt, selectCrossSigningSigsForTargetSQL},
|
||||||
{&s.upsertCrossSigningSigsForTargetStmt, upsertCrossSigningSigsForTargetSQL},
|
{&s.upsertCrossSigningSigsForTargetStmt, upsertCrossSigningSigsForTargetSQL},
|
||||||
|
{&s.deleteCrossSigningSigsForTargetStmt, deleteCrossSigningSigsForTargetSQL},
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -100,3 +105,13 @@ func (s *crossSigningSigsStatements) UpsertCrossSigningSigsForTarget(
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *crossSigningSigsStatements) DeleteCrossSigningSigsForTarget(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
targetUserID string, targetKeyID gomatrixserverlib.KeyID,
|
||||||
|
) error {
|
||||||
|
if _, err := sqlutil.TxStmt(txn, s.deleteCrossSigningSigsForTargetStmt).ExecContext(ctx, targetUserID, targetKeyID); err != nil {
|
||||||
|
return fmt.Errorf("s.deleteCrossSigningSigsForTargetStmt: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -58,6 +58,9 @@ const selectMaxStreamForUserSQL = "" +
|
||||||
const countStreamIDsForUserSQL = "" +
|
const countStreamIDsForUserSQL = "" +
|
||||||
"SELECT COUNT(*) FROM keyserver_device_keys WHERE user_id=$1 AND stream_id IN ($2)"
|
"SELECT COUNT(*) FROM keyserver_device_keys WHERE user_id=$1 AND stream_id IN ($2)"
|
||||||
|
|
||||||
|
const deleteDeviceKeysSQL = "" +
|
||||||
|
"DELETE FROM keyserver_device_keys WHERE user_id=$1 AND device_id=$2"
|
||||||
|
|
||||||
const deleteAllDeviceKeysSQL = "" +
|
const deleteAllDeviceKeysSQL = "" +
|
||||||
"DELETE FROM keyserver_device_keys WHERE user_id=$1"
|
"DELETE FROM keyserver_device_keys WHERE user_id=$1"
|
||||||
|
|
||||||
|
|
@ -67,6 +70,7 @@ type deviceKeysStatements struct {
|
||||||
selectDeviceKeysStmt *sql.Stmt
|
selectDeviceKeysStmt *sql.Stmt
|
||||||
selectBatchDeviceKeysStmt *sql.Stmt
|
selectBatchDeviceKeysStmt *sql.Stmt
|
||||||
selectMaxStreamForUserStmt *sql.Stmt
|
selectMaxStreamForUserStmt *sql.Stmt
|
||||||
|
deleteDeviceKeysStmt *sql.Stmt
|
||||||
deleteAllDeviceKeysStmt *sql.Stmt
|
deleteAllDeviceKeysStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -90,12 +94,20 @@ func NewSqliteDeviceKeysTable(db *sql.DB) (tables.DeviceKeys, error) {
|
||||||
if s.selectMaxStreamForUserStmt, err = db.Prepare(selectMaxStreamForUserSQL); err != nil {
|
if s.selectMaxStreamForUserStmt, err = db.Prepare(selectMaxStreamForUserSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if s.deleteDeviceKeysStmt, err = db.Prepare(deleteDeviceKeysSQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if s.deleteAllDeviceKeysStmt, err = db.Prepare(deleteAllDeviceKeysSQL); err != nil {
|
if s.deleteAllDeviceKeysStmt, err = db.Prepare(deleteAllDeviceKeysSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *deviceKeysStatements) DeleteDeviceKeys(ctx context.Context, txn *sql.Tx, userID, deviceID string) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.deleteDeviceKeysStmt).ExecContext(ctx, userID, deviceID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *deviceKeysStatements) DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error {
|
func (s *deviceKeysStatements) DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error {
|
||||||
_, err := sqlutil.TxStmt(txn, s.deleteAllDeviceKeysStmt).ExecContext(ctx, userID)
|
_, err := sqlutil.TxStmt(txn, s.deleteAllDeviceKeysStmt).ExecContext(ctx, userID)
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@ type DeviceKeys interface {
|
||||||
SelectMaxStreamIDForUser(ctx context.Context, txn *sql.Tx, userID string) (streamID int32, err error)
|
SelectMaxStreamIDForUser(ctx context.Context, txn *sql.Tx, userID string) (streamID int32, err error)
|
||||||
CountStreamIDsForUser(ctx context.Context, userID string, streamIDs []int64) (int, error)
|
CountStreamIDsForUser(ctx context.Context, userID string, streamIDs []int64) (int, error)
|
||||||
SelectBatchDeviceKeys(ctx context.Context, userID string, deviceIDs []string) ([]api.DeviceMessage, error)
|
SelectBatchDeviceKeys(ctx context.Context, userID string, deviceIDs []string) ([]api.DeviceMessage, error)
|
||||||
|
DeleteDeviceKeys(ctx context.Context, txn *sql.Tx, userID, deviceID string) error
|
||||||
DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error
|
DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -62,4 +63,5 @@ type CrossSigningKeys interface {
|
||||||
type CrossSigningSigs interface {
|
type CrossSigningSigs interface {
|
||||||
SelectCrossSigningSigsForTarget(ctx context.Context, txn *sql.Tx, targetUserID string, targetKeyID gomatrixserverlib.KeyID) (r types.CrossSigningSigMap, err error)
|
SelectCrossSigningSigsForTarget(ctx context.Context, txn *sql.Tx, targetUserID string, targetKeyID gomatrixserverlib.KeyID) (r types.CrossSigningSigMap, err error)
|
||||||
UpsertCrossSigningSigsForTarget(ctx context.Context, txn *sql.Tx, originUserID string, originKeyID gomatrixserverlib.KeyID, targetUserID string, targetKeyID gomatrixserverlib.KeyID, signature gomatrixserverlib.Base64Bytes) error
|
UpsertCrossSigningSigsForTarget(ctx context.Context, txn *sql.Tx, originUserID string, originKeyID gomatrixserverlib.KeyID, targetUserID string, targetKeyID gomatrixserverlib.KeyID, signature gomatrixserverlib.Base64Bytes) error
|
||||||
|
DeleteCrossSigningSigsForTarget(ctx context.Context, txn *sql.Tx, targetUserID string, targetKeyID gomatrixserverlib.KeyID) error
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ const (
|
||||||
TopicOutputRoomEvent = "OutputRoomEvent"
|
TopicOutputRoomEvent = "OutputRoomEvent"
|
||||||
TopicOutputClientData = "OutputClientData"
|
TopicOutputClientData = "OutputClientData"
|
||||||
TopicOutputReceiptEvent = "OutputReceiptEvent"
|
TopicOutputReceiptEvent = "OutputReceiptEvent"
|
||||||
TopicOutputSigningKeyUpdate = "OutputSigningKeyUpdate"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Kafka struct {
|
type Kafka struct {
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -104,13 +105,23 @@ func (s *OutputKeyChangeEventConsumer) updateOffset(msg *sarama.ConsumerMessage)
|
||||||
func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
defer s.updateOffset(msg)
|
defer s.updateOffset(msg)
|
||||||
|
|
||||||
var output api.DeviceMessage
|
var m api.DeviceMessage
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &m); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
||||||
log.WithError(err).Error("syncapi: failed to unmarshal key change event from key server")
|
return nil
|
||||||
sentry.CaptureException(err)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
switch m.Type {
|
||||||
|
case api.TypeCrossSigningUpdate:
|
||||||
|
return s.onCrossSigningMessage(m, msg.Offset, msg.Partition)
|
||||||
|
case api.TypeDeviceKeyUpdate:
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
return s.onDeviceKeyMessage(m, msg.Offset, msg.Partition)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64, partition int32) error {
|
||||||
|
output := m.DeviceKeys
|
||||||
// work out who we need to notify about the new key
|
// work out who we need to notify about the new key
|
||||||
var queryRes roomserverAPI.QuerySharedUsersResponse
|
var queryRes roomserverAPI.QuerySharedUsersResponse
|
||||||
err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{
|
err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{
|
||||||
|
|
@ -124,8 +135,35 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
|
||||||
// make sure we get our own key updates too!
|
// make sure we get our own key updates too!
|
||||||
queryRes.UserIDsToCount[output.UserID] = 1
|
queryRes.UserIDsToCount[output.UserID] = 1
|
||||||
posUpdate := types.LogPosition{
|
posUpdate := types.LogPosition{
|
||||||
Offset: msg.Offset,
|
Offset: offset,
|
||||||
Partition: msg.Partition,
|
Partition: partition,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.stream.Advance(posUpdate)
|
||||||
|
for userID := range queryRes.UserIDsToCount {
|
||||||
|
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, offset int64, partition int32) error {
|
||||||
|
output := m.CrossSigningKeyUpdate
|
||||||
|
// work out who we need to notify about the new key
|
||||||
|
var queryRes roomserverAPI.QuerySharedUsersResponse
|
||||||
|
err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{
|
||||||
|
UserID: output.UserID,
|
||||||
|
}, &queryRes)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
|
||||||
|
sentry.CaptureException(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// make sure we get our own key updates too!
|
||||||
|
queryRes.UserIDsToCount[output.UserID] = 1
|
||||||
|
posUpdate := types.LogPosition{
|
||||||
|
Offset: offset,
|
||||||
|
Partition: partition,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.stream.Advance(posUpdate)
|
s.stream.Advance(posUpdate)
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,8 @@ func (k *mockKeyAPI) SetUserAPI(i userapi.UserInternalAPI) {}
|
||||||
// PerformClaimKeys claims one-time keys for use in pre-key messages
|
// PerformClaimKeys claims one-time keys for use in pre-key messages
|
||||||
func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) {
|
func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) {
|
||||||
}
|
}
|
||||||
|
func (k *mockKeyAPI) PerformDeleteKeys(ctx context.Context, req *keyapi.PerformDeleteKeysRequest, res *keyapi.PerformDeleteKeysResponse) {
|
||||||
|
}
|
||||||
func (k *mockKeyAPI) PerformUploadDeviceKeys(ctx context.Context, req *keyapi.PerformUploadDeviceKeysRequest, res *keyapi.PerformUploadDeviceKeysResponse) {
|
func (k *mockKeyAPI) PerformUploadDeviceKeys(ctx context.Context, req *keyapi.PerformUploadDeviceKeysRequest, res *keyapi.PerformUploadDeviceKeysResponse) {
|
||||||
}
|
}
|
||||||
func (k *mockKeyAPI) PerformUploadDeviceSignatures(ctx context.Context, req *keyapi.PerformUploadDeviceSignaturesRequest, res *keyapi.PerformUploadDeviceSignaturesResponse) {
|
func (k *mockKeyAPI) PerformUploadDeviceSignatures(ctx context.Context, req *keyapi.PerformUploadDeviceSignaturesRequest, res *keyapi.PerformUploadDeviceSignaturesResponse) {
|
||||||
|
|
|
||||||
|
|
@ -554,3 +554,5 @@ Can upload self-signing keys
|
||||||
Fails to upload self-signing keys with no auth
|
Fails to upload self-signing keys with no auth
|
||||||
Fails to upload self-signing key without master key
|
Fails to upload self-signing key without master key
|
||||||
can fetch self-signing keys over federation
|
can fetch self-signing keys over federation
|
||||||
|
Changing master key notifies local users
|
||||||
|
Changing user-signing key notifies local users
|
||||||
|
|
|
||||||
|
|
@ -145,6 +145,18 @@ func (a *UserInternalAPI) PerformDeviceDeletion(ctx context.Context, req *api.Pe
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Ask the keyserver to delete device keys and signatures for those devices
|
||||||
|
deleteReq := &keyapi.PerformDeleteKeysRequest{
|
||||||
|
UserID: req.UserID,
|
||||||
|
}
|
||||||
|
for _, keyID := range req.DeviceIDs {
|
||||||
|
deleteReq.KeyIDs = append(deleteReq.KeyIDs, gomatrixserverlib.KeyID(keyID))
|
||||||
|
}
|
||||||
|
deleteRes := &keyapi.PerformDeleteKeysResponse{}
|
||||||
|
a.KeyAPI.PerformDeleteKeys(ctx, deleteReq, deleteRes)
|
||||||
|
if err := deleteRes.Error; err != nil {
|
||||||
|
return fmt.Errorf("a.KeyAPI.PerformDeleteKeys: %w", err)
|
||||||
|
}
|
||||||
// create empty device keys and upload them to delete what was once there and trigger device list changes
|
// create empty device keys and upload them to delete what was once there and trigger device list changes
|
||||||
return a.deviceListUpdate(req.UserID, deletedDeviceIDs)
|
return a.deviceListUpdate(req.UserID, deletedDeviceIDs)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue