Merge remote-tracking branch 'upstream/master' into piotr-kozimor/threepid-sessions-in-userapi

This commit is contained in:
Piotr Kozimor 2021-09-08 09:18:45 +02:00
commit d3f63db611
51 changed files with 897 additions and 317 deletions

View file

@ -1,5 +1,21 @@
# Changelog # Changelog
## Dendrite 0.5.0 (2021-08-24)
### Features
* Support for serverside key backups has been added, allowing your E2EE keys to be backed up and to be restored after logging out or when logging in from a new device
* Experimental support for cross-signing has been added, allowing verifying your own device keys and verifying other user's public keys
* Dendrite can now send logs to a TCP syslog server by using the `syslog` logger type (contributed by [sambhavsaggi](https://github.com/sambhavsaggi))
* Go 1.15 is now the minimum supported version for Dendrite
### Fixes
* Device keys are now cleaned up from the keyserver when the user API removes a device session
* The `M_ROOM_IN_USE` error code is now returned when a room alias is already taken (contributed by [nivekuil](https://github.com/nivekuil))
* A bug in the state storage migration has been fixed where room create events had incorrect state snapshots
* A bug when deactivating accounts caused by only reading the deprecated username field has been fixed
## Dendrite 0.4.1 (2021-07-26) ## Dendrite 0.4.1 (2021-07-26)
### Features ### Features

View file

@ -51,6 +51,10 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) { if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) {
// The full path to the rooms API, includes hs token // The full path to the rooms API, includes hs token
URL, err := url.Parse(appservice.URL + roomAliasExistsPath) URL, err := url.Parse(appservice.URL + roomAliasExistsPath)
if err != nil {
return err
}
URL.Path += request.Alias URL.Path += request.Alias
apiURL := URL.String() + "?access_token=" + appservice.HSToken apiURL := URL.String() + "?access_token=" + appservice.HSToken
@ -114,6 +118,9 @@ func (a *AppServiceQueryAPI) UserIDExists(
if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) { if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) {
// The full path to the rooms API, includes hs token // The full path to the rooms API, includes hs token
URL, err := url.Parse(appservice.URL + userIDExistsPath) URL, err := url.Parse(appservice.URL + userIDExistsPath)
if err != nil {
return err
}
URL.Path += request.UserID URL.Path += request.UserID
apiURL := URL.String() + "?access_token=" + appservice.HSToken apiURL := URL.String() + "?access_token=" + appservice.HSToken

View file

@ -1,4 +1,4 @@
FROM docker.io/golang:1.15-alpine AS base FROM docker.io/golang:1.17-alpine AS base
RUN apk --update --no-cache add bash build-base RUN apk --update --no-cache add bash build-base

View file

@ -1,4 +1,4 @@
FROM docker.io/golang:1.15-alpine AS base FROM docker.io/golang:1.17-alpine AS base
RUN apk --update --no-cache add bash build-base RUN apk --update --no-cache add bash build-base

View file

@ -25,7 +25,7 @@ echo "Installing golangci-lint..."
# Make a backup of go.{mod,sum} first # Make a backup of go.{mod,sum} first
cp go.mod go.mod.bak && cp go.sum go.sum.bak cp go.mod go.mod.bak && cp go.sum go.sum.bak
go get github.com/golangci/golangci-lint/cmd/golangci-lint@v1.19.1 go get github.com/golangci/golangci-lint/cmd/golangci-lint@v1.41.1
# Run linting # Run linting
echo "Looking for lint..." echo "Looking for lint..."

View file

@ -111,6 +111,12 @@ func UserInUse(msg string) *MatrixError {
return &MatrixError{"M_USER_IN_USE", msg} return &MatrixError{"M_USER_IN_USE", msg}
} }
// RoomInUse is an error returned when the client tries to make a room
// that already exists
func RoomInUse(msg string) *MatrixError {
return &MatrixError{"M_ROOM_IN_USE", msg}
}
// ASExclusive is an error returned when an application service tries to // ASExclusive is an error returned when an application service tries to
// register an username that is outside of its registered namespace, or if a // register an username that is outside of its registered namespace, or if a
// user attempts to register a username or room alias within an exclusive // user attempts to register a username or room alias within an exclusive
@ -131,6 +137,12 @@ func InvalidSignature(msg string) *MatrixError {
return &MatrixError{"M_INVALID_SIGNATURE", msg} return &MatrixError{"M_INVALID_SIGNATURE", msg}
} }
// InvalidParam is an error that is returned when a parameter was invalid,
// traditionally with cross-signing.
func InvalidParam(msg string) *MatrixError {
return &MatrixError{"M_INVALID_PARAM", msg}
}
// MissingParam is an error that is returned when a parameter was incorrect, // MissingParam is an error that is returned when a parameter was incorrect,
// traditionally with cross-signing. // traditionally with cross-signing.
func MissingParam(msg string) *MatrixError { func MissingParam(msg string) *MatrixError {

View file

@ -325,7 +325,10 @@ func createRoom(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
if aliasResp.RoomID != "" { if aliasResp.RoomID != "" {
return util.MessageResponse(400, "Alias already exists") return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.RoomInUse("Room ID already exists."),
}
} }
aliasEvent = &fledglingEvent{ aliasEvent = &fledglingEvent{
@ -484,7 +487,10 @@ func createRoom(
} }
if aliasResp.AliasExists { if aliasResp.AliasExists {
return util.MessageResponse(400, "Alias already exists") return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.RoomInUse("Room alias already exists."),
}
} }
} }

View file

@ -15,11 +15,10 @@
package routing package routing
import ( import (
"encoding/json"
"io/ioutil"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
@ -29,37 +28,52 @@ import (
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
type crossSigningRequest struct {
api.PerformUploadDeviceKeysRequest
Auth newPasswordAuth `json:"auth"`
}
func UploadCrossSigningDeviceKeys( func UploadCrossSigningDeviceKeys(
req *http.Request, userInteractiveAuth *auth.UserInteractive, req *http.Request, userInteractiveAuth *auth.UserInteractive,
keyserverAPI api.KeyInternalAPI, device *userapi.Device, keyserverAPI api.KeyInternalAPI, device *userapi.Device,
accountDB accounts.Database, cfg *config.ClientAPI, accountDB accounts.Database, cfg *config.ClientAPI,
) util.JSONResponse { ) util.JSONResponse {
uploadReq := &api.PerformUploadDeviceKeysRequest{} uploadReq := &crossSigningRequest{}
uploadRes := &api.PerformUploadDeviceKeysResponse{} uploadRes := &api.PerformUploadDeviceKeysResponse{}
ctx := req.Context() resErr := httputil.UnmarshalJSONRequest(req, &uploadReq)
defer req.Body.Close() // nolint:errcheck if resErr != nil {
bodyBytes, err := ioutil.ReadAll(req.Body) return *resErr
if err != nil { }
sessionID := uploadReq.Auth.Session
if sessionID == "" {
sessionID = util.RandomString(sessionIDLength)
}
if uploadReq.Auth.Type != authtypes.LoginTypePassword {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusUnauthorized,
JSON: jsonerror.BadJSON("The request body could not be read: " + err.Error()), JSON: newUserInteractiveResponse(
sessionID,
[]authtypes.Flow{
{
Stages: []authtypes.LoginType{authtypes.LoginTypePassword},
},
},
nil,
),
} }
} }
typePassword := auth.LoginTypePassword{
if _, err := userInteractiveAuth.Verify(ctx, bodyBytes, device); err != nil { GetAccountByPassword: accountDB.GetAccountByPassword,
return *err Config: cfg,
} }
if _, authErr := typePassword.Login(req.Context(), &uploadReq.Auth.PasswordRequest); authErr != nil {
if err = json.Unmarshal(bodyBytes, &uploadReq); err != nil { return *authErr
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("The request body could not be unmarshalled: " + err.Error()),
}
} }
AddCompletedSessionStage(sessionID, authtypes.LoginTypePassword)
uploadReq.UserID = device.UserID uploadReq.UserID = device.UserID
keyserverAPI.PerformUploadDeviceKeys(req.Context(), uploadReq, uploadRes) keyserverAPI.PerformUploadDeviceKeys(req.Context(), &uploadReq.PerformUploadDeviceKeysRequest, uploadRes)
if err := uploadRes.Error; err != nil { if err := uploadRes.Error; err != nil {
switch { switch {
@ -73,6 +87,11 @@ func UploadCrossSigningDeviceKeys(
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
JSON: jsonerror.MissingParam(err.Error()), JSON: jsonerror.MissingParam(err.Error()),
} }
case err.IsInvalidParam:
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidParam(err.Error()),
}
default: default:
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
@ -110,6 +129,11 @@ func UploadCrossSigningDeviceSignatures(req *http.Request, keyserverAPI api.KeyI
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
JSON: jsonerror.MissingParam(err.Error()), JSON: jsonerror.MissingParam(err.Error()),
} }
case err.IsInvalidParam:
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidParam(err.Error()),
}
default: default:
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,

View file

@ -65,7 +65,7 @@ func Setup(
userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg) userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg)
unstableFeatures := map[string]bool{ unstableFeatures := map[string]bool{
//"org.matrix.e2e_cross_signing": true, "org.matrix.e2e_cross_signing": true,
} }
for _, msc := range cfg.MSCs.MSCs { for _, msc := range cfg.MSCs.MSCs {
unstableFeatures["org.matrix."+msc] = true unstableFeatures["org.matrix."+msc] = true
@ -291,10 +291,7 @@ func Setup(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
// If there's a trailing slash, remove it // If there's a trailing slash, remove it
eventType := vars["type"] eventType := strings.TrimSuffix(vars["type"], "/")
if strings.HasSuffix(eventType, "/") {
eventType = eventType[:len(eventType)-1]
}
eventFormat := req.URL.Query().Get("format") == "event" eventFormat := req.URL.Query().Get("format") == "event"
return OnIncomingStateTypeRequest(req.Context(), device, rsAPI, vars["roomID"], eventType, "", eventFormat) return OnIncomingStateTypeRequest(req.Context(), device, rsAPI, vars["roomID"], eventType, "", eventFormat)
})).Methods(http.MethodGet, http.MethodOptions) })).Methods(http.MethodGet, http.MethodOptions)
@ -315,11 +312,7 @@ func Setup(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
emptyString := "" emptyString := ""
eventType := vars["eventType"] eventType := strings.TrimSuffix(vars["eventType"], "/")
// If there's a trailing slash, remove it
if strings.HasSuffix(eventType, "/") {
eventType = eventType[:len(eventType)-1]
}
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil) return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)

View file

@ -75,6 +75,12 @@ type InputReceiptEventRequest struct {
// InputReceiptEventResponse is a response to InputReceiptEventRequest // InputReceiptEventResponse is a response to InputReceiptEventRequest
type InputReceiptEventResponse struct{} type InputReceiptEventResponse struct{}
type InputCrossSigningKeyUpdateRequest struct {
CrossSigningKeyUpdate `json:"signing_keys"`
}
type InputCrossSigningKeyUpdateResponse struct{}
// EDUServerInputAPI is used to write events to the typing server. // EDUServerInputAPI is used to write events to the typing server.
type EDUServerInputAPI interface { type EDUServerInputAPI interface {
InputTypingEvent( InputTypingEvent(
@ -94,4 +100,10 @@ type EDUServerInputAPI interface {
request *InputReceiptEventRequest, request *InputReceiptEventRequest,
response *InputReceiptEventResponse, response *InputReceiptEventResponse,
) error ) error
InputCrossSigningKeyUpdate(
ctx context.Context,
request *InputCrossSigningKeyUpdateRequest,
response *InputCrossSigningKeyUpdateResponse,
) error
} }

View file

@ -33,14 +33,6 @@ type OutputTypingEvent struct {
ExpireTime *time.Time ExpireTime *time.Time
} }
// TypingEvent represents a matrix edu event of type 'm.typing'.
type TypingEvent struct {
Type string `json:"type"`
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}
// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log. // OutputSendToDeviceEvent is an entry in the send-to-device output kafka log.
// This contains the full event content, along with the user ID and device ID // This contains the full event content, along with the user ID and device ID
// to which it is destined. // to which it is destined.
@ -50,14 +42,6 @@ type OutputSendToDeviceEvent struct {
gomatrixserverlib.SendToDeviceEvent gomatrixserverlib.SendToDeviceEvent
} }
type ReceiptEvent struct {
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
EventID string `json:"event_id"`
Type string `json:"type"`
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
}
// OutputReceiptEvent is an entry in the receipt output kafka log // OutputReceiptEvent is an entry in the receipt output kafka log
type OutputReceiptEvent struct { type OutputReceiptEvent struct {
UserID string `json:"user_id"` UserID string `json:"user_id"`
@ -67,21 +51,7 @@ type OutputReceiptEvent struct {
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
} }
// Helper structs for receipts json creation // OutputCrossSigningKeyUpdate is an entry in the signing key update output kafka log
type ReceiptMRead struct { type OutputCrossSigningKeyUpdate struct {
User map[string]ReceiptTS `json:"m.read"` CrossSigningKeyUpdate `json:"signing_keys"`
}
type ReceiptTS struct {
TS gomatrixserverlib.Timestamp `json:"ts"`
}
// FederationSender output
type FederationReceiptMRead struct {
User map[string]FederationReceiptData `json:"m.read"`
}
type FederationReceiptData struct {
Data ReceiptTS `json:"data"`
EventIDs []string `json:"event_ids"`
} }

59
eduserver/api/types.go Normal file
View file

@ -0,0 +1,59 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import "github.com/matrix-org/gomatrixserverlib"
const (
MSigningKeyUpdate = "m.signing_key_update"
)
type TypingEvent struct {
Type string `json:"type"`
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}
type ReceiptEvent struct {
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
EventID string `json:"event_id"`
Type string `json:"type"`
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
}
type FederationReceiptMRead struct {
User map[string]FederationReceiptData `json:"m.read"`
}
type FederationReceiptData struct {
Data ReceiptTS `json:"data"`
EventIDs []string `json:"event_ids"`
}
type ReceiptMRead struct {
User map[string]ReceiptTS `json:"m.read"`
}
type ReceiptTS struct {
TS gomatrixserverlib.Timestamp `json:"ts"`
}
type CrossSigningKeyUpdate struct {
MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"`
SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"`
UserID string `json:"user_id"`
}

View file

@ -52,6 +52,7 @@ func NewInternalAPI(
OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
OutputKeyChangeEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent),
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
} }
} }

View file

@ -24,6 +24,7 @@ import (
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -39,6 +40,8 @@ type EDUServerInputAPI struct {
OutputSendToDeviceEventTopic string OutputSendToDeviceEventTopic string
// The kafka topic to output new receipt events to // The kafka topic to output new receipt events to
OutputReceiptEventTopic string OutputReceiptEventTopic string
// The kafka topic to output new key change events to
OutputKeyChangeEventTopic string
// kafka producer // kafka producer
Producer sarama.SyncProducer Producer sarama.SyncProducer
// Internal user query API // Internal user query API
@ -77,6 +80,36 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent(
return t.sendToDeviceEvent(ise) return t.sendToDeviceEvent(ise)
} }
// InputCrossSigningKeyUpdate implements api.EDUServerInputAPI
func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate(
ctx context.Context,
request *api.InputCrossSigningKeyUpdateRequest,
response *api.InputCrossSigningKeyUpdateResponse,
) error {
eventJSON, err := json.Marshal(&keyapi.DeviceMessage{
Type: keyapi.TypeCrossSigningUpdate,
OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{
CrossSigningKeyUpdate: request.CrossSigningKeyUpdate,
},
})
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{
"user_id": request.UserID,
}).Infof("Producing to topic '%s'", t.OutputKeyChangeEventTopic)
m := &sarama.ProducerMessage{
Topic: string(t.OutputKeyChangeEventTopic),
Key: sarama.StringEncoder(request.UserID),
Value: sarama.ByteEncoder(eventJSON),
}
_, _, err = t.Producer.SendMessage(m)
return err
}
func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
ev := &api.TypingEvent{ ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping, Type: gomatrixserverlib.MTyping,

View file

@ -12,9 +12,10 @@ import (
// HTTP paths for the internal HTTP APIs // HTTP paths for the internal HTTP APIs
const ( const (
EDUServerInputTypingEventPath = "/eduserver/input" EDUServerInputTypingEventPath = "/eduserver/input"
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
EDUServerInputReceiptEventPath = "/eduserver/receipt" EDUServerInputReceiptEventPath = "/eduserver/receipt"
EDUServerInputCrossSigningKeyUpdatePath = "/eduserver/crossSigningKeyUpdate"
) )
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. // NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
@ -68,3 +69,16 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent(
apiURL := h.eduServerURL + EDUServerInputReceiptEventPath apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// InputCrossSigningKeyUpdate implements EDUServerInputAPI
func (h *httpEDUServerInputAPI) InputCrossSigningKeyUpdate(
ctx context.Context,
request *api.InputCrossSigningKeyUpdateRequest,
response *api.InputCrossSigningKeyUpdateResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputCrossSigningKeyUpdate")
defer span.Finish()
apiURL := h.eduServerURL + EDUServerInputCrossSigningKeyUpdatePath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -51,4 +51,17 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(EDUServerInputCrossSigningKeyUpdatePath,
httputil.MakeInternalAPI("inputCrossSigningKeyUpdate", func(req *http.Request) util.JSONResponse {
var request api.InputCrossSigningKeyUpdateRequest
var response api.InputCrossSigningKeyUpdateResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := t.InputCrossSigningKeyUpdate(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
} }

View file

@ -156,7 +156,9 @@ func fillInRooms(ctx context.Context, roomIDs []string, rsAPI roomserverAPI.Room
case topicTuple: case topicTuple:
pub.Topic = contentVal pub.Topic = contentVal
case canonicalTuple: case canonicalTuple:
pub.CanonicalAlias = contentVal if _, _, err := gomatrixserverlib.SplitID('#', contentVal); err == nil {
pub.CanonicalAlias = contentVal
}
case visibilityTuple: case visibilityTuple:
pub.WorldReadable = contentVal == "world_readable" pub.WorldReadable = contentVal == "world_readable"
// need both of these to determine whether guests can join // need both of these to determine whether guests can join

View file

@ -449,7 +449,7 @@ func Setup(
httputil.MakeExternalAPI("federation_public_rooms", func(req *http.Request) util.JSONResponse { httputil.MakeExternalAPI("federation_public_rooms", func(req *http.Request) util.JSONResponse {
return GetPostPublicRooms(req, rsAPI) return GetPostPublicRooms(req, rsAPI)
}), }),
).Methods(http.MethodGet) ).Methods(http.MethodGet, http.MethodPost)
v1fedmux.Handle("/user/keys/claim", httputil.MakeFedAPI( v1fedmux.Handle("/user/keys/claim", httputil.MakeFedAPI(
"federation_keys_claim", cfg.Matrix.ServerName, keys, wakeup, "federation_keys_claim", cfg.Matrix.ServerName, keys, wakeup,

View file

@ -345,7 +345,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} }
if c := len(results); c > 0 { if c := len(results); c > 0 {
util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID) util.GetLogger(ctx).Infof("Processed %d PDUs from %v in transaction %q", c, t.Origin, t.TransactionID)
} }
return &gomatrixserverlib.RespSend{PDUs: results}, nil return &gomatrixserverlib.RespSend{PDUs: results}, nil
} }
@ -502,6 +502,22 @@ func (t *txnReq) processEDUs(ctx context.Context) {
} }
} }
} }
case eduserverAPI.MSigningKeyUpdate:
var updatePayload eduserverAPI.CrossSigningKeyUpdate
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"user_id": updatePayload.UserID,
}).Error("Failed to send signing key update to edu server")
continue
}
inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{
CrossSigningKeyUpdate: updatePayload,
}
inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{}
if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil {
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal cross-signing update")
continue
}
default: default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
} }

View file

@ -84,6 +84,14 @@ func (o *testEDUProducer) InputReceiptEvent(
return nil return nil
} }
func (o *testEDUProducer) InputCrossSigningKeyUpdate(
ctx context.Context,
request *eduAPI.InputCrossSigningKeyUpdateRequest,
response *eduAPI.InputCrossSigningKeyUpdateResponse,
) error {
return nil
}
type testRoomserverAPI struct { type testRoomserverAPI struct {
api.RoomserverInternalAPITrace api.RoomserverInternalAPITrace
inputRoomEvents []api.InputRoomEvent inputRoomEvents []api.InputRoomEvent

View file

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
@ -28,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -83,6 +85,17 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
log.WithError(err).Errorf("failed to read device message from key change topic") log.WithError(err).Errorf("failed to read device message from key change topic")
return nil return nil
} }
switch m.Type {
case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m)
case api.TypeDeviceKeyUpdate:
fallthrough
default:
return t.onDeviceKeyMessage(m)
}
}
func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
logger := log.WithField("user_id", m.UserID) logger := log.WithField("user_id", m.UserID)
// only send key change events which originated from us // only send key change events which originated from us
@ -133,6 +146,50 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return t.queues.SendEDU(edu, t.serverName, destinations) return t.queues.SendEDU(edu, t.serverName, destinations)
} }
func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
output := m.CrossSigningKeyUpdate
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
return nil
}
if host != gomatrixserverlib.ServerName(t.serverName) {
// Ignore any messages that didn't originate locally, otherwise we'll
// end up parroting information we received from other servers.
return nil
}
logger := log.WithField("user_id", output.UserID)
var queryRes roomserverAPI.QueryRoomsForUserResponse
err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{
UserID: output.UserID,
WantMembership: "join",
}, &queryRes)
if err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined rooms for user")
return nil
}
// send this key change to all servers who share rooms with this user.
destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs)
if err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
return nil
}
// Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{
Type: eduserverAPI.MSigningKeyUpdate,
Origin: string(t.serverName),
}
if edu.Content, err = json.Marshal(output); err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to marshal output, dropping")
return nil
}
logger.Infof("Sending cross-signing update message to %q", destinations)
return t.queues.SendEDU(edu, t.serverName, destinations)
}
func prevID(streamID int) []int { func prevID(streamID int) []int {
if streamID <= 1 { if streamID <= 1 {
return nil return nil

6
go.mod
View file

@ -17,7 +17,7 @@ require (
github.com/h2non/filetype v1.1.1 // indirect github.com/h2non/filetype v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru v0.5.4
github.com/juju/testing v0.0.0-20210324180055-18c50b0c2098 // indirect github.com/juju/testing v0.0.0-20210324180055-18c50b0c2098 // indirect
github.com/lib/pq v1.10.2 github.com/lib/pq v1.10.1
github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-circuit v0.4.0 github.com/libp2p/go-libp2p-circuit v0.4.0
github.com/libp2p/go-libp2p-core v0.8.3 github.com/libp2p/go-libp2p-core v0.8.3
@ -31,9 +31,9 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20210809130922-d9c3f400582b github.com/matrix-org/gomatrixserverlib v0.0.0-20210817115641-f9416ac1a723
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b github.com/matrix-org/pinecone v0.0.0-20210819150600-e692df1a5c42
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/matryer/is v1.4.0 github.com/matryer/is v1.4.0
github.com/mattn/go-sqlite3 v1.14.8 github.com/mattn/go-sqlite3 v1.14.8

12
go.sum
View file

@ -746,8 +746,8 @@ github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvf
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= github.com/lib/pq v1.10.1 h1:6VXZrLU0jHBYyAqrSPa+MgPfnSvTPuMgK+k0o5kVFWo=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.1/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ=
github.com/libp2p/go-addr-util v0.0.2 h1:7cWK5cdA5x72jX0g8iLrQWm5TRJZ6CzGdPEhWj7plWU= github.com/libp2p/go-addr-util v0.0.2 h1:7cWK5cdA5x72jX0g8iLrQWm5TRJZ6CzGdPEhWj7plWU=
github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E=
@ -994,12 +994,12 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20210809130922-d9c3f400582b h1:8St1B8QmlvMLsOmGqW3++0akUs0250IAi+AGcr5faxw= github.com/matrix-org/gomatrixserverlib v0.0.0-20210817115641-f9416ac1a723 h1:b8cyR4aYv9Lmf1lKgASJ+PFSp/GBv8ZFgb/O42ZXLGA=
github.com/matrix-org/gomatrixserverlib v0.0.0-20210809130922-d9c3f400582b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20210817115641-f9416ac1a723/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 h1:HZCzy4oVzz55e+cOMiX/JtSF2UOY1evBl2raaE7ACcU= github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 h1:HZCzy4oVzz55e+cOMiX/JtSF2UOY1evBl2raaE7ACcU=
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE= github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b h1:5X5vdWQ13xrNkJVqaJHPsrt7rKkMJH5iac0EtfOuxSg= github.com/matrix-org/pinecone v0.0.0-20210819150600-e692df1a5c42 h1:ZO39w5Kbq9Aw3uHHT5QjFR2kpVHxuJxHD3zOhmU5BVI=
github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b/go.mod h1:CVlrvs1R5iz7Omy2GqAjJJKbACn07GZgUq1Gli18FYE= github.com/matrix-org/pinecone v0.0.0-20210819150600-e692df1a5c42/go.mod h1:CVlrvs1R5iz7Omy2GqAjJJKbACn07GZgUq1Gli18FYE=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -18,6 +18,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"log/syslog"
"net/http" "net/http"
"os" "os"
"path" "path"
@ -30,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dugong" "github.com/matrix-org/dugong"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
lSyslog "github.com/sirupsen/logrus/hooks/syslog"
) )
type utcFormatter struct { type utcFormatter struct {
@ -128,6 +130,9 @@ func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
case "file": case "file":
checkFileHookParams(hook.Params) checkFileHookParams(hook.Params)
setupFileHook(hook, level, componentName) setupFileHook(hook, level, componentName)
case "syslog":
checkSyslogHookParams(hook.Params)
setupSyslogHook(hook, level, componentName)
default: default:
logrus.Fatalf("Unrecognised logging hook type: %s", hook.Type) logrus.Fatalf("Unrecognised logging hook type: %s", hook.Type)
} }
@ -173,6 +178,34 @@ func setupFileHook(hook config.LogrusHook, level logrus.Level, componentName str
}) })
} }
func checkSyslogHookParams(params map[string]interface{}) {
addr, ok := params["address"]
if !ok {
logrus.Fatalf("Expecting a parameter \"address\" for logging hook of type \"syslog\"")
}
if _, ok := addr.(string); !ok {
logrus.Fatalf("Parameter \"address\" for logging hook of type \"syslog\" should be a string")
}
proto, ok2 := params["protocol"]
if !ok2 {
logrus.Fatalf("Expecting a parameter \"protocol\" for logging hook of type \"syslog\"")
}
if _, ok2 := proto.(string); !ok2 {
logrus.Fatalf("Parameter \"protocol\" for logging hook of type \"syslog\" should be a string")
}
}
func setupSyslogHook(hook config.LogrusHook, level logrus.Level, componentName string) {
syslogHook, err := lSyslog.NewSyslogHook(hook.Params["protocol"].(string), hook.Params["address"].(string), syslog.LOG_INFO, componentName)
if err == nil {
logrus.AddHook(&logLevelHook{level, syslogHook})
}
}
//CloseAndLogIfError Closes io.Closer and logs the error if any //CloseAndLogIfError Closes io.Closer and logs the error if any
func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string) { func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string) {
if closer == nil { if closer == nil {

View file

@ -16,8 +16,8 @@ var build string
const ( const (
VersionMajor = 0 VersionMajor = 0
VersionMinor = 4 VersionMinor = 5
VersionPatch = 1 VersionPatch = 0
VersionTag = "" // example: "rc1" VersionTag = "" // example: "rc1"
) )

View file

@ -20,6 +20,7 @@ import (
"strings" "strings"
"time" "time"
eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/keyserver/types" "github.com/matrix-org/dendrite/keyserver/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -33,6 +34,7 @@ type KeyInternalAPI interface {
PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse) PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse)
// PerformClaimKeys claims one-time keys for use in pre-key messages // PerformClaimKeys claims one-time keys for use in pre-key messages
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
PerformDeleteKeys(ctx context.Context, req *PerformDeleteKeysRequest, res *PerformDeleteKeysResponse)
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse) PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse) PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse)
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse) QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
@ -47,15 +49,25 @@ type KeyError struct {
Err string `json:"error"` Err string `json:"error"`
IsInvalidSignature bool `json:"is_invalid_signature,omitempty"` // M_INVALID_SIGNATURE IsInvalidSignature bool `json:"is_invalid_signature,omitempty"` // M_INVALID_SIGNATURE
IsMissingParam bool `json:"is_missing_param,omitempty"` // M_MISSING_PARAM IsMissingParam bool `json:"is_missing_param,omitempty"` // M_MISSING_PARAM
IsInvalidParam bool `json:"is_invalid_param,omitempty"` // M_INVALID_PARAM
} }
func (k *KeyError) Error() string { func (k *KeyError) Error() string {
return k.Err return k.Err
} }
type DeviceMessageType int
const (
TypeDeviceKeyUpdate DeviceMessageType = iota
TypeCrossSigningUpdate
)
// DeviceMessage represents the message produced into Kafka by the key server. // DeviceMessage represents the message produced into Kafka by the key server.
type DeviceMessage struct { type DeviceMessage struct {
DeviceKeys Type DeviceMessageType `json:"Type,omitempty"`
*DeviceKeys `json:"DeviceKeys,omitempty"`
*eduapi.OutputCrossSigningKeyUpdate `json:"CrossSigningKeyUpdate,omitempty"`
// A monotonically increasing number which represents device changes for this user. // A monotonically increasing number which represents device changes for this user.
StreamID int StreamID int
} }
@ -76,7 +88,7 @@ type DeviceKeys struct {
// WithStreamID returns a copy of this device message with the given stream ID // WithStreamID returns a copy of this device message with the given stream ID
func (k *DeviceKeys) WithStreamID(streamID int) DeviceMessage { func (k *DeviceKeys) WithStreamID(streamID int) DeviceMessage {
return DeviceMessage{ return DeviceMessage{
DeviceKeys: *k, DeviceKeys: k,
StreamID: streamID, StreamID: streamID,
} }
} }
@ -134,6 +146,18 @@ type PerformUploadKeysResponse struct {
OneTimeKeyCounts []OneTimeKeysCount OneTimeKeyCounts []OneTimeKeysCount
} }
// PerformDeleteKeysRequest asks the keyserver to forget about certain
// keys, and signatures related to those keys.
type PerformDeleteKeysRequest struct {
UserID string
KeyIDs []gomatrixserverlib.KeyID
}
// PerformDeleteKeysResponse is the response to PerformDeleteKeysRequest.
type PerformDeleteKeysResponse struct {
Error *KeyError
}
// KeyError sets a key error field on KeyErrors // KeyError sets a key error field on KeyErrors
func (r *PerformUploadKeysResponse) KeyError(userID, deviceID string, err *KeyError) { func (r *PerformUploadKeysResponse) KeyError(userID, deviceID string, err *KeyError) {
if r.KeyErrors[userID] == nil { if r.KeyErrors[userID] == nil {

View file

@ -0,0 +1,112 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/Shopify/sarama"
)
type OutputCrossSigningKeyUpdateConsumer struct {
eduServerConsumer *internal.ContinualConsumer
keyDB storage.Database
keyAPI api.KeyInternalAPI
serverName string
}
func NewOutputCrossSigningKeyUpdateConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
keyDB storage.Database,
keyAPI api.KeyInternalAPI,
) *OutputCrossSigningKeyUpdateConsumer {
// The keyserver both produces and consumes on the TopicOutputKeyChangeEvent
// topic. We will only produce events where the UserID matches our server name,
// and we will only consume events where the UserID does NOT match our server
// name (because the update came from a remote server).
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "keyserver/keyserver",
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputKeyChangeEvent),
Consumer: kafkaConsumer,
PartitionStore: keyDB,
}
s := &OutputCrossSigningKeyUpdateConsumer{
eduServerConsumer: &consumer,
keyDB: keyDB,
keyAPI: keyAPI,
serverName: string(cfg.Global.ServerName),
}
consumer.ProcessMessage = s.onMessage
return s
}
func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
return s.eduServerConsumer.Start()
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var m api.DeviceMessage
if err := json.Unmarshal(msg.Value, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
return nil
}
switch m.Type {
case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m)
default:
return nil
}
}
func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
output := m.CrossSigningKeyUpdate
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
logrus.WithError(err).Errorf("eduserver output log: user ID parse failure")
return nil
}
if host == gomatrixserverlib.ServerName(s.serverName) {
// Ignore any messages that contain information about our own users, as
// they already originated from this server.
return nil
}
uploadReq := &api.PerformUploadDeviceKeysRequest{
UserID: output.UserID,
}
if output.MasterKey != nil {
uploadReq.MasterKey = *output.MasterKey
}
if output.SelfSigningKey != nil {
uploadReq.SelfSigningKey = *output.SelfSigningKey
}
uploadRes := &api.PerformUploadDeviceKeysResponse{}
s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes)
return uploadRes.Error
}

View file

@ -1,61 +0,0 @@
package consumers
import (
"fmt"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/Shopify/sarama"
)
type OutputSigningKeyUpdateConsumer struct {
eduServerConsumer *internal.ContinualConsumer
keyDB storage.Database
keyAPI api.KeyInternalAPI
serverName string
}
func NewOutputSigningKeyUpdateConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
keyDB storage.Database,
keyAPI api.KeyInternalAPI,
) *OutputSigningKeyUpdateConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "keyserver/eduserver",
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate),
Consumer: kafkaConsumer,
PartitionStore: keyDB,
}
s := &OutputSigningKeyUpdateConsumer{
eduServerConsumer: &consumer,
keyDB: keyDB,
keyAPI: keyAPI,
serverName: string(cfg.Global.ServerName),
}
consumer.ProcessMessage = s.onMessage
return s
}
func (s *OutputSigningKeyUpdateConsumer) Start() error {
return s.eduServerConsumer.Start()
}
func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
/*
var output eduapi.OutputSigningKeyUpdate
if err := json.Unmarshal(msg.Value, &output); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failure")
return nil
}
return nil
*/
return fmt.Errorf("TODO")
}

View file

@ -19,14 +19,15 @@ import (
"context" "context"
"crypto/ed25519" "crypto/ed25519"
"database/sql" "database/sql"
"encoding/json"
"fmt" "fmt"
"strings" "strings"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/types" "github.com/matrix-org/dendrite/keyserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/crypto/curve25519"
) )
func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpose gomatrixserverlib.CrossSigningKeyPurpose) error { func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpose gomatrixserverlib.CrossSigningKeyPurpose) error {
@ -45,6 +46,41 @@ func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpos
if tokens[1] != b64 { if tokens[1] != b64 {
return fmt.Errorf("key ID isn't correct") return fmt.Errorf("key ID isn't correct")
} }
switch tokens[0] {
case "ed25519":
if len(keyData) != ed25519.PublicKeySize {
return fmt.Errorf("ed25519 key is not the correct length")
}
case "curve25519":
if len(keyData) != curve25519.PointSize {
return fmt.Errorf("curve25519 key is not the correct length")
}
default:
// We can't enforce the key length to be correct for an
// algorithm that we don't recognise, so instead we'll
// just make sure that it isn't incredibly excessive.
if l := len(keyData); l > 4096 {
return fmt.Errorf("unknown key type is too long (%d bytes)", l)
}
}
}
// Check to see if the signatures make sense
for _, forOriginUser := range key.Signatures {
for originKeyID, originSignature := range forOriginUser {
switch strings.SplitN(string(originKeyID), ":", 1)[0] {
case "ed25519":
if len(originSignature) != ed25519.SignatureSize {
return fmt.Errorf("ed25519 signature is not the correct length")
}
case "curve25519":
return fmt.Errorf("curve25519 signatures are impossible")
default:
if l := len(originSignature); l > 4096 {
return fmt.Errorf("unknown signature type is too long (%d bytes)", l)
}
}
}
} }
// Does the key claim to be from the right user? // Does the key claim to be from the right user?
@ -69,42 +105,68 @@ func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpos
// nolint:gocyclo // nolint:gocyclo
func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) { func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) {
var masterKey gomatrixserverlib.Base64Bytes // Find the keys to store.
byPurpose := map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey{}
toStore := types.CrossSigningKeyMap{}
hasMasterKey := false hasMasterKey := false
if len(req.MasterKey.Keys) > 0 { if len(req.MasterKey.Keys) > 0 {
if err := sanityCheckKey(req.MasterKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err != nil { if err := sanityCheckKey(req.MasterKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err != nil {
res.Error = &api.KeyError{ res.Error = &api.KeyError{
Err: "Master key sanity check failed: " + err.Error(), Err: "Master key sanity check failed: " + err.Error(),
IsInvalidParam: true,
} }
return return
} }
for _, keyData := range req.MasterKey.Keys { // iterates once, because sanityCheckKey requires one key
hasMasterKey = true byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster] = req.MasterKey
masterKey = keyData for _, key := range req.MasterKey.Keys { // iterates once, see sanityCheckKey
toStore[gomatrixserverlib.CrossSigningKeyPurposeMaster] = key
} }
hasMasterKey = true
} }
if len(req.SelfSigningKey.Keys) > 0 { if len(req.SelfSigningKey.Keys) > 0 {
if err := sanityCheckKey(req.SelfSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeSelfSigning); err != nil { if err := sanityCheckKey(req.SelfSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeSelfSigning); err != nil {
res.Error = &api.KeyError{ res.Error = &api.KeyError{
Err: "Self-signing key sanity check failed: " + err.Error(), Err: "Self-signing key sanity check failed: " + err.Error(),
IsInvalidParam: true,
} }
return return
} }
byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning] = req.SelfSigningKey
for _, key := range req.SelfSigningKey.Keys { // iterates once, see sanityCheckKey
toStore[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning] = key
}
} }
if len(req.UserSigningKey.Keys) > 0 { if len(req.UserSigningKey.Keys) > 0 {
if err := sanityCheckKey(req.UserSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeUserSigning); err != nil { if err := sanityCheckKey(req.UserSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeUserSigning); err != nil {
res.Error = &api.KeyError{ res.Error = &api.KeyError{
Err: "User-signing key sanity check failed: " + err.Error(), Err: "User-signing key sanity check failed: " + err.Error(),
IsInvalidParam: true,
} }
return return
} }
byPurpose[gomatrixserverlib.CrossSigningKeyPurposeUserSigning] = req.UserSigningKey
for _, key := range req.UserSigningKey.Keys { // iterates once, see sanityCheckKey
toStore[gomatrixserverlib.CrossSigningKeyPurposeUserSigning] = key
}
} }
// If the user hasn't given a new master key, then let's go and get their // If there's nothing to do then stop here.
// existing keys from the database. if len(toStore) == 0 {
res.Error = &api.KeyError{
Err: "No keys were supplied in the request",
IsMissingParam: true,
}
return
}
// We can't have a self-signing or user-signing key without a master
// key, so make sure we have one of those.
if !hasMasterKey { if !hasMasterKey {
existingKeys, err := a.DB.CrossSigningKeysDataForUser(ctx, req.UserID) existingKeys, err := a.DB.CrossSigningKeysDataForUser(ctx, req.UserID)
if err != nil { if err != nil {
@ -114,87 +176,20 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
return return
} }
masterKey, hasMasterKey = existingKeys[gomatrixserverlib.CrossSigningKeyPurposeMaster] _, hasMasterKey = existingKeys[gomatrixserverlib.CrossSigningKeyPurposeMaster]
} }
// If we still don't have a master key at this point then there's nothing else // If we still can't find a master key for the user then stop the upload.
// we can do - we've checked both the request and the database. // This satisfies the "Fails to upload self-signing key without master key" test.
if !hasMasterKey { if !hasMasterKey {
res.Error = &api.KeyError{ res.Error = &api.KeyError{
Err: "No master key was found either in the database or in the request!", Err: "No master key was found",
IsMissingParam: true,
}
return
}
// The key ID is basically the key itself.
masterKeyID := gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", masterKey.Encode()))
// Work out which things we need to verify the signatures for.
toVerify := make(map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey, 3)
toStore := types.CrossSigningKeyMap{}
if len(req.MasterKey.Keys) > 0 {
toVerify[gomatrixserverlib.CrossSigningKeyPurposeMaster] = req.MasterKey
}
if len(req.SelfSigningKey.Keys) > 0 {
toVerify[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning] = req.SelfSigningKey
}
if len(req.UserSigningKey.Keys) > 0 {
toVerify[gomatrixserverlib.CrossSigningKeyPurposeUserSigning] = req.UserSigningKey
}
if len(toVerify) == 0 {
res.Error = &api.KeyError{
Err: "No supplied keys available for verification",
IsMissingParam: true,
}
return
}
for purpose, key := range toVerify {
// Collect together the key IDs we need to verify with. This will include
// all of the key IDs specified in the signatures.
keyJSON, err := json.Marshal(key)
if err != nil {
res.Error = &api.KeyError{
Err: fmt.Sprintf("The JSON of the key section is invalid: %s", err.Error()),
}
return
}
switch purpose {
case gomatrixserverlib.CrossSigningKeyPurposeMaster:
// The master key might have a signature attached to it from the
// previous key, or from a device key, but there's no real need
// to verify it. Clients will perform key checks when the master
// key changes.
default:
// Sub-keys should be signed by the master key.
if err := gomatrixserverlib.VerifyJSON(req.UserID, masterKeyID, ed25519.PublicKey(masterKey), keyJSON); err != nil {
res.Error = &api.KeyError{
Err: fmt.Sprintf("The %q sub-key failed master key signature verification: %s", purpose, err.Error()),
IsInvalidSignature: true,
}
return
}
}
// If we've reached this point then all the signatures are valid so
// add the key to the list of keys to store.
for _, keyData := range key.Keys { // iterates once, see sanityCheckKey
toStore[purpose] = keyData
}
}
if len(toStore) == 0 {
res.Error = &api.KeyError{
Err: "No supplied keys passed verification",
IsMissingParam: true, IsMissingParam: true,
} }
return return
} }
// Store the keys.
if err := a.DB.StoreCrossSigningKeysForUser(ctx, req.UserID, toStore); err != nil { if err := a.DB.StoreCrossSigningKeysForUser(ctx, req.UserID, toStore); err != nil {
res.Error = &api.KeyError{ res.Error = &api.KeyError{
Err: fmt.Sprintf("a.DB.StoreCrossSigningKeysForUser: %s", err), Err: fmt.Sprintf("a.DB.StoreCrossSigningKeysForUser: %s", err),
@ -203,7 +198,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
} }
// Now upload any signatures that were included with the keys. // Now upload any signatures that were included with the keys.
for _, key := range toVerify { for _, key := range byPurpose {
var targetKeyID gomatrixserverlib.KeyID var targetKeyID gomatrixserverlib.KeyID
for targetKey := range key.Keys { // iterates once, see sanityCheckKey for targetKey := range key.Keys { // iterates once, see sanityCheckKey
targetKeyID = targetKey targetKeyID = targetKey
@ -222,6 +217,28 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
} }
} }
} }
// Finally, generate a notification that we updated the keys.
if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer {
update := eduserverAPI.CrossSigningKeyUpdate{
UserID: req.UserID,
}
if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok {
update.MasterKey = &mk
}
if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok {
update.SelfSigningKey = &ssk
}
if update.MasterKey == nil && update.SelfSigningKey == nil {
return
}
if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
res.Error = &api.KeyError{
Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
}
return
}
}
} }
func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req *api.PerformUploadDeviceSignaturesRequest, res *api.PerformUploadDeviceSignaturesResponse) { func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req *api.PerformUploadDeviceSignaturesRequest, res *api.PerformUploadDeviceSignaturesResponse) {
@ -277,7 +294,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
} }
} }
if err := a.processSelfSignatures(ctx, req.UserID, queryRes, selfSignatures); err != nil { if err := a.processSelfSignatures(ctx, selfSignatures); err != nil {
res.Error = &api.KeyError{ res.Error = &api.KeyError{
Err: fmt.Sprintf("a.processSelfSignatures: %s", err), Err: fmt.Sprintf("a.processSelfSignatures: %s", err),
} }
@ -290,10 +307,25 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
} }
return return
} }
// Finally, generate a notification that we updated the signatures.
for userID := range req.Signatures {
if _, host, err := gomatrixserverlib.SplitID('@', userID); err == nil && host == a.ThisServer {
update := eduserverAPI.CrossSigningKeyUpdate{
UserID: userID,
}
if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
res.Error = &api.KeyError{
Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
}
return
}
}
}
} }
func (a *KeyInternalAPI) processSelfSignatures( func (a *KeyInternalAPI) processSelfSignatures(
ctx context.Context, _ string, queryRes *api.QueryKeysResponse, ctx context.Context,
signatures map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice, signatures map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice,
) error { ) error {
// Here we will process: // Here we will process:
@ -304,37 +336,8 @@ func (a *KeyInternalAPI) processSelfSignatures(
for targetKeyID, signature := range forTargetUserID { for targetKeyID, signature := range forTargetUserID {
switch sig := signature.CrossSigningBody.(type) { switch sig := signature.CrossSigningBody.(type) {
case *gomatrixserverlib.CrossSigningKey: case *gomatrixserverlib.CrossSigningKey:
// The user is signing their master key with one of their devices
// The QueryKeys response should contain the device key hopefully.
// First we need to marshal the blob back into JSON so we can verify
// it.
j, err := json.Marshal(sig)
if err != nil {
return fmt.Errorf("json.Marshal: %w", err)
}
for originUserID, forOriginUserID := range sig.Signatures { for originUserID, forOriginUserID := range sig.Signatures {
originDeviceKeys, ok := queryRes.DeviceKeys[originUserID]
if !ok {
return fmt.Errorf("missing device keys for user %q", originUserID)
}
for originKeyID, originSig := range forOriginUserID { for originKeyID, originSig := range forOriginUserID {
var originKey gomatrixserverlib.DeviceKeys
if err := json.Unmarshal(originDeviceKeys[string(originKeyID)], &originKey); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
originSigningKey, ok := originKey.Keys[originKeyID]
if !ok {
return fmt.Errorf("missing origin signing key %q", originKeyID)
}
originSigningKeyPublic := ed25519.PublicKey(originSigningKey)
if err := gomatrixserverlib.VerifyJSON(originUserID, originKeyID, originSigningKeyPublic, j); err != nil {
return fmt.Errorf("gomatrixserverlib.VerifyJSON: %w", err)
}
if err := a.DB.StoreCrossSigningSigsForTarget( if err := a.DB.StoreCrossSigningSigsForTarget(
ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig, ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig,
); err != nil { ); err != nil {
@ -344,35 +347,8 @@ func (a *KeyInternalAPI) processSelfSignatures(
} }
case *gomatrixserverlib.DeviceKeys: case *gomatrixserverlib.DeviceKeys:
// The user is signing one of their devices with their self-signing key
// The QueryKeys response should contain the master key hopefully.
// First we need to marshal the blob back into JSON so we can verify
// it.
j, err := json.Marshal(sig)
if err != nil {
return fmt.Errorf("json.Marshal: %w", err)
}
for originUserID, forOriginUserID := range sig.Signatures { for originUserID, forOriginUserID := range sig.Signatures {
for originKeyID, originSig := range forOriginUserID { for originKeyID, originSig := range forOriginUserID {
originSelfSigningKeys, ok := queryRes.SelfSigningKeys[originUserID]
if !ok {
return fmt.Errorf("missing self-signing key for user %q", originUserID)
}
var originSelfSigningKeyID gomatrixserverlib.KeyID
var originSelfSigningKey gomatrixserverlib.Base64Bytes
for keyID, key := range originSelfSigningKeys.Keys {
originSelfSigningKeyID, originSelfSigningKey = keyID, key
break
}
originSelfSigningKeyPublic := ed25519.PublicKey(originSelfSigningKey)
if err := gomatrixserverlib.VerifyJSON(originUserID, originSelfSigningKeyID, originSelfSigningKeyPublic, j); err != nil {
return fmt.Errorf("gomatrixserverlib.VerifyJSON: %w", err)
}
if err := a.DB.StoreCrossSigningSigsForTarget( if err := a.DB.StoreCrossSigningSigsForTarget(
ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig, ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig,
); err != nil { ); err != nil {

View file

@ -231,7 +231,8 @@ func (u *DeviceListUpdater) update(ctx context.Context, event gomatrixserverlib.
} }
keys := []api.DeviceMessage{ keys := []api.DeviceMessage{
{ {
DeviceKeys: api.DeviceKeys{ Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{
DeviceID: event.DeviceID, DeviceID: event.DeviceID,
DisplayName: event.DeviceDisplayName, DisplayName: event.DeviceDisplayName,
KeyJSON: k, KeyJSON: k,
@ -417,8 +418,9 @@ func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevi
continue continue
} }
keys[i] = api.DeviceMessage{ keys[i] = api.DeviceMessage{
Type: api.TypeDeviceKeyUpdate,
StreamID: res.StreamID, StreamID: res.StreamID,
DeviceKeys: api.DeviceKeys{ DeviceKeys: &api.DeviceKeys{
DeviceID: device.DeviceID, DeviceID: device.DeviceID,
DisplayName: device.DisplayName, DisplayName: device.DisplayName,
UserID: res.UserID, UserID: res.UserID,
@ -426,7 +428,8 @@ func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevi
}, },
} }
existingKeys[i] = api.DeviceMessage{ existingKeys[i] = api.DeviceMessage{
DeviceKeys: api.DeviceKeys{ Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{
UserID: res.UserID, UserID: res.UserID,
DeviceID: device.DeviceID, DeviceID: device.DeviceID,
}, },

View file

@ -146,8 +146,9 @@ func TestUpdateHavePrevID(t *testing.T) {
t.Fatalf("Update returned an error: %s", err) t.Fatalf("Update returned an error: %s", err)
} }
want := api.DeviceMessage{ want := api.DeviceMessage{
Type: api.TypeDeviceKeyUpdate,
StreamID: event.StreamID, StreamID: event.StreamID,
DeviceKeys: api.DeviceKeys{ DeviceKeys: &api.DeviceKeys{
DeviceID: event.DeviceID, DeviceID: event.DeviceID,
DisplayName: event.DeviceDisplayName, DisplayName: event.DeviceDisplayName,
KeyJSON: event.Keys, KeyJSON: event.Keys,
@ -224,8 +225,9 @@ func TestUpdateNoPrevID(t *testing.T) {
// wait a bit for db to be updated... // wait a bit for db to be updated...
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
want := api.DeviceMessage{ want := api.DeviceMessage{
Type: api.TypeDeviceKeyUpdate,
StreamID: 5, StreamID: 5,
DeviceKeys: api.DeviceKeys{ DeviceKeys: &api.DeviceKeys{
DeviceID: "JLAFKJWSCS", DeviceID: "JLAFKJWSCS",
DisplayName: "Mobile Phone", DisplayName: "Mobile Phone",
UserID: remoteUserID, UserID: remoteUserID,

View file

@ -182,6 +182,14 @@ func (a *KeyInternalAPI) claimRemoteKeys(
util.GetLogger(ctx).WithField("num_keys", keysClaimed).Info("Claimed remote keys") util.GetLogger(ctx).WithField("num_keys", keysClaimed).Info("Claimed remote keys")
} }
func (a *KeyInternalAPI) PerformDeleteKeys(ctx context.Context, req *api.PerformDeleteKeysRequest, res *api.PerformDeleteKeysResponse) {
if err := a.DB.DeleteDeviceKeys(ctx, req.UserID, req.KeyIDs); err != nil {
res.Error = &api.KeyError{
Err: fmt.Sprintf("Failed to delete device keys: %s", err),
}
}
}
func (a *KeyInternalAPI) QueryOneTimeKeys(ctx context.Context, req *api.QueryOneTimeKeysRequest, res *api.QueryOneTimeKeysResponse) { func (a *KeyInternalAPI) QueryOneTimeKeys(ctx context.Context, req *api.QueryOneTimeKeysRequest, res *api.QueryOneTimeKeysResponse) {
count, err := a.DB.OneTimeKeysCount(ctx, req.UserID, req.DeviceID) count, err := a.DB.OneTimeKeysCount(ctx, req.UserID, req.DeviceID)
if err != nil { if err != nil {
@ -573,7 +581,8 @@ func (a *KeyInternalAPI) uploadLocalDeviceKeys(ctx context.Context, req *api.Per
existingKeys := make([]api.DeviceMessage, len(keysToStore)) existingKeys := make([]api.DeviceMessage, len(keysToStore))
for i := range keysToStore { for i := range keysToStore {
existingKeys[i] = api.DeviceMessage{ existingKeys[i] = api.DeviceMessage{
DeviceKeys: api.DeviceKeys{ Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{
UserID: keysToStore[i].UserID, UserID: keysToStore[i].UserID,
DeviceID: keysToStore[i].DeviceID, DeviceID: keysToStore[i].DeviceID,
}, },

View file

@ -30,6 +30,7 @@ const (
InputDeviceListUpdatePath = "/keyserver/inputDeviceListUpdate" InputDeviceListUpdatePath = "/keyserver/inputDeviceListUpdate"
PerformUploadKeysPath = "/keyserver/performUploadKeys" PerformUploadKeysPath = "/keyserver/performUploadKeys"
PerformClaimKeysPath = "/keyserver/performClaimKeys" PerformClaimKeysPath = "/keyserver/performClaimKeys"
PerformDeleteKeysPath = "/keyserver/performDeleteKeys"
PerformUploadDeviceKeysPath = "/keyserver/performUploadDeviceKeys" PerformUploadDeviceKeysPath = "/keyserver/performUploadDeviceKeys"
PerformUploadDeviceSignaturesPath = "/keyserver/performUploadDeviceSignatures" PerformUploadDeviceSignaturesPath = "/keyserver/performUploadDeviceSignatures"
QueryKeysPath = "/keyserver/queryKeys" QueryKeysPath = "/keyserver/queryKeys"
@ -94,6 +95,23 @@ func (h *httpKeyInternalAPI) PerformClaimKeys(
} }
} }
func (h *httpKeyInternalAPI) PerformDeleteKeys(
ctx context.Context,
request *api.PerformDeleteKeysRequest,
response *api.PerformDeleteKeysResponse,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformClaimKeys")
defer span.Finish()
apiURL := h.apiURL + PerformClaimKeysPath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.KeyError{
Err: err.Error(),
}
}
}
func (h *httpKeyInternalAPI) PerformUploadKeys( func (h *httpKeyInternalAPI) PerformUploadKeys(
ctx context.Context, ctx context.Context,
request *api.PerformUploadKeysRequest, request *api.PerformUploadKeysRequest,

View file

@ -47,6 +47,17 @@ func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(PerformDeleteKeysPath,
httputil.MakeInternalAPI("performDeleteKeys", func(req *http.Request) util.JSONResponse {
request := api.PerformDeleteKeysRequest{}
response := api.PerformDeleteKeysResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
s.PerformDeleteKeys(req.Context(), &request, &response)
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(PerformUploadKeysPath, internalAPIMux.Handle(PerformUploadKeysPath,
httputil.MakeInternalAPI("performUploadKeys", func(req *http.Request) util.JSONResponse { httputil.MakeInternalAPI("performUploadKeys", func(req *http.Request) util.JSONResponse {
request := api.PerformUploadKeysRequest{} request := api.PerformUploadKeysRequest{}

View file

@ -65,7 +65,7 @@ func NewInternalAPI(
} }
}() }()
keyconsumer := consumers.NewOutputSigningKeyUpdateConsumer( keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
base.ProcessContext, base.Cfg, consumer, db, ap, base.ProcessContext, base.Cfg, consumer, db, ap,
) )
if err := keyconsumer.Start(); err != nil { if err := keyconsumer.Start(); err != nil {

View file

@ -19,6 +19,7 @@ import (
"encoding/json" "encoding/json"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage" "github.com/matrix-org/dendrite/keyserver/storage"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -73,3 +74,36 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
} }
return nil return nil
} }
func (p *KeyChange) ProduceSigningKeyUpdate(key eduapi.CrossSigningKeyUpdate) error {
var m sarama.ProducerMessage
output := &api.DeviceMessage{
Type: api.TypeCrossSigningUpdate,
OutputCrossSigningKeyUpdate: &eduapi.OutputCrossSigningKeyUpdate{
CrossSigningKeyUpdate: key,
},
}
value, err := json.Marshal(output)
if err != nil {
return err
}
m.Topic = string(p.Topic)
m.Key = sarama.StringEncoder(key.UserID)
m.Value = sarama.ByteEncoder(value)
partition, offset, err := p.Producer.SendMessage(&m)
if err != nil {
return err
}
err = p.DB.StoreKeyChange(context.Background(), partition, offset, key.UserID)
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{
"user_id": key.UserID,
}).Infof("Produced to cross-signing update topic '%s'", p.Topic)
return nil
}

View file

@ -58,6 +58,10 @@ type Database interface {
// If there are some missing keys, they are omitted from the returned slice. There is no ordering on the returned slice. // If there are some missing keys, they are omitted from the returned slice. There is no ordering on the returned slice.
DeviceKeysForUser(ctx context.Context, userID string, deviceIDs []string) ([]api.DeviceMessage, error) DeviceKeysForUser(ctx context.Context, userID string, deviceIDs []string) ([]api.DeviceMessage, error)
// DeleteDeviceKeys removes the device keys for a given user/device, and any accompanying
// cross-signing signatures relating to that device.
DeleteDeviceKeys(ctx context.Context, userID string, deviceIDs []gomatrixserverlib.KeyID) error
// ClaimKeys based on the 3-uple of user_id, device_id and algorithm name. Returns the keys claimed. Returns no error if a key // ClaimKeys based on the 3-uple of user_id, device_id and algorithm name. Returns the keys claimed. Returns no error if a key
// cannot be claimed or if none exist for this (user, device, algorithm), instead it is omitted from the returned slice. // cannot be claimed or if none exist for this (user, device, algorithm), instead it is omitted from the returned slice.
ClaimKeys(ctx context.Context, userToDeviceToAlgorithm map[string]map[string]string) ([]api.OneTimeKeys, error) ClaimKeys(ctx context.Context, userToDeviceToAlgorithm map[string]map[string]string) ([]api.OneTimeKeys, error)

View file

@ -46,10 +46,14 @@ const upsertCrossSigningSigsForTargetSQL = "" +
" VALUES($1, $2, $3, $4, $5)" + " VALUES($1, $2, $3, $4, $5)" +
" ON CONFLICT (origin_user_id, target_user_id, target_key_id) DO UPDATE SET (origin_key_id, signature) = ($2, $5)" " ON CONFLICT (origin_user_id, target_user_id, target_key_id) DO UPDATE SET (origin_key_id, signature) = ($2, $5)"
const deleteCrossSigningSigsForTargetSQL = "" +
"DELETE FROM keyserver_cross_signing_sigs WHERE target_user_id=$1 AND target_key_id=$2"
type crossSigningSigsStatements struct { type crossSigningSigsStatements struct {
db *sql.DB db *sql.DB
selectCrossSigningSigsForTargetStmt *sql.Stmt selectCrossSigningSigsForTargetStmt *sql.Stmt
upsertCrossSigningSigsForTargetStmt *sql.Stmt upsertCrossSigningSigsForTargetStmt *sql.Stmt
deleteCrossSigningSigsForTargetStmt *sql.Stmt
} }
func NewPostgresCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, error) { func NewPostgresCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, error) {
@ -63,6 +67,7 @@ func NewPostgresCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, erro
return s, sqlutil.StatementList{ return s, sqlutil.StatementList{
{&s.selectCrossSigningSigsForTargetStmt, selectCrossSigningSigsForTargetSQL}, {&s.selectCrossSigningSigsForTargetStmt, selectCrossSigningSigsForTargetSQL},
{&s.upsertCrossSigningSigsForTargetStmt, upsertCrossSigningSigsForTargetSQL}, {&s.upsertCrossSigningSigsForTargetStmt, upsertCrossSigningSigsForTargetSQL},
{&s.deleteCrossSigningSigsForTargetStmt, deleteCrossSigningSigsForTargetSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -101,3 +106,13 @@ func (s *crossSigningSigsStatements) UpsertCrossSigningSigsForTarget(
} }
return nil return nil
} }
func (s *crossSigningSigsStatements) DeleteCrossSigningSigsForTarget(
ctx context.Context, txn *sql.Tx,
targetUserID string, targetKeyID gomatrixserverlib.KeyID,
) error {
if _, err := sqlutil.TxStmt(txn, s.deleteCrossSigningSigsForTargetStmt).ExecContext(ctx, targetUserID, targetKeyID); err != nil {
return fmt.Errorf("s.deleteCrossSigningSigsForTargetStmt: %w", err)
}
return nil
}

View file

@ -62,6 +62,9 @@ const selectMaxStreamForUserSQL = "" +
const countStreamIDsForUserSQL = "" + const countStreamIDsForUserSQL = "" +
"SELECT COUNT(*) FROM keyserver_device_keys WHERE user_id=$1 AND stream_id = ANY($2)" "SELECT COUNT(*) FROM keyserver_device_keys WHERE user_id=$1 AND stream_id = ANY($2)"
const deleteDeviceKeysSQL = "" +
"DELETE FROM keyserver_device_keys WHERE user_id=$1 AND device_id=$2"
const deleteAllDeviceKeysSQL = "" + const deleteAllDeviceKeysSQL = "" +
"DELETE FROM keyserver_device_keys WHERE user_id=$1" "DELETE FROM keyserver_device_keys WHERE user_id=$1"
@ -72,6 +75,7 @@ type deviceKeysStatements struct {
selectBatchDeviceKeysStmt *sql.Stmt selectBatchDeviceKeysStmt *sql.Stmt
selectMaxStreamForUserStmt *sql.Stmt selectMaxStreamForUserStmt *sql.Stmt
countStreamIDsForUserStmt *sql.Stmt countStreamIDsForUserStmt *sql.Stmt
deleteDeviceKeysStmt *sql.Stmt
deleteAllDeviceKeysStmt *sql.Stmt deleteAllDeviceKeysStmt *sql.Stmt
} }
@ -98,6 +102,9 @@ func NewPostgresDeviceKeysTable(db *sql.DB) (tables.DeviceKeys, error) {
if s.countStreamIDsForUserStmt, err = db.Prepare(countStreamIDsForUserSQL); err != nil { if s.countStreamIDsForUserStmt, err = db.Prepare(countStreamIDsForUserSQL); err != nil {
return nil, err return nil, err
} }
if s.deleteDeviceKeysStmt, err = db.Prepare(deleteDeviceKeysSQL); err != nil {
return nil, err
}
if s.deleteAllDeviceKeysStmt, err = db.Prepare(deleteAllDeviceKeysSQL); err != nil { if s.deleteAllDeviceKeysStmt, err = db.Prepare(deleteAllDeviceKeysSQL); err != nil {
return nil, err return nil, err
} }
@ -114,6 +121,7 @@ func (s *deviceKeysStatements) SelectDeviceKeysJSON(ctx context.Context, keys []
return err return err
} }
// this will be '' when there is no device // this will be '' when there is no device
keys[i].Type = api.TypeDeviceKeyUpdate
keys[i].KeyJSON = []byte(keyJSONStr) keys[i].KeyJSON = []byte(keyJSONStr)
keys[i].StreamID = streamID keys[i].StreamID = streamID
if displayName.Valid { if displayName.Valid {
@ -162,6 +170,11 @@ func (s *deviceKeysStatements) InsertDeviceKeys(ctx context.Context, txn *sql.Tx
return nil return nil
} }
func (s *deviceKeysStatements) DeleteDeviceKeys(ctx context.Context, txn *sql.Tx, userID, deviceID string) error {
_, err := sqlutil.TxStmt(txn, s.deleteDeviceKeysStmt).ExecContext(ctx, userID, deviceID)
return err
}
func (s *deviceKeysStatements) DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error { func (s *deviceKeysStatements) DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error {
_, err := sqlutil.TxStmt(txn, s.deleteAllDeviceKeysStmt).ExecContext(ctx, userID) _, err := sqlutil.TxStmt(txn, s.deleteAllDeviceKeysStmt).ExecContext(ctx, userID)
return err return err
@ -179,7 +192,10 @@ func (s *deviceKeysStatements) SelectBatchDeviceKeys(ctx context.Context, userID
} }
var result []api.DeviceMessage var result []api.DeviceMessage
for rows.Next() { for rows.Next() {
var dk api.DeviceMessage dk := api.DeviceMessage{
Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{},
}
dk.UserID = userID dk.UserID = userID
var keyJSON string var keyJSON string
var streamID int var streamID int

View file

@ -158,6 +158,22 @@ func (d *Database) MarkDeviceListStale(ctx context.Context, userID string, isSta
}) })
} }
// DeleteDeviceKeys removes the device keys for a given user/device, and any accompanying
// cross-signing signatures relating to that device.
func (d *Database) DeleteDeviceKeys(ctx context.Context, userID string, deviceIDs []gomatrixserverlib.KeyID) error {
return d.Writer.Do(nil, nil, func(txn *sql.Tx) error {
for _, deviceID := range deviceIDs {
if err := d.CrossSigningSigsTable.DeleteCrossSigningSigsForTarget(ctx, txn, userID, deviceID); err != nil && err != sql.ErrNoRows {
return fmt.Errorf("d.CrossSigningSigsTable.DeleteCrossSigningSigsForTarget: %w", err)
}
if err := d.DeviceKeysTable.DeleteDeviceKeys(ctx, txn, userID, string(deviceID)); err != nil && err != sql.ErrNoRows {
return fmt.Errorf("d.DeviceKeysTable.DeleteDeviceKeys: %w", err)
}
}
return nil
})
}
// CrossSigningKeysForUser returns the latest known cross-signing keys for a user, if any. // CrossSigningKeysForUser returns the latest known cross-signing keys for a user, if any.
func (d *Database) CrossSigningKeysForUser(ctx context.Context, userID string) (map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey, error) { func (d *Database) CrossSigningKeysForUser(ctx context.Context, userID string) (map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey, error) {
keyMap, err := d.CrossSigningKeysTable.SelectCrossSigningKeysForUser(ctx, nil, userID) keyMap, err := d.CrossSigningKeysTable.SelectCrossSigningKeysForUser(ctx, nil, userID)

View file

@ -45,10 +45,14 @@ const upsertCrossSigningSigsForTargetSQL = "" +
"INSERT OR REPLACE INTO keyserver_cross_signing_sigs (origin_user_id, origin_key_id, target_user_id, target_key_id, signature)" + "INSERT OR REPLACE INTO keyserver_cross_signing_sigs (origin_user_id, origin_key_id, target_user_id, target_key_id, signature)" +
" VALUES($1, $2, $3, $4, $5)" " VALUES($1, $2, $3, $4, $5)"
const deleteCrossSigningSigsForTargetSQL = "" +
"DELETE FROM keyserver_cross_signing_sigs WHERE target_user_id=$1 AND target_key_id=$2"
type crossSigningSigsStatements struct { type crossSigningSigsStatements struct {
db *sql.DB db *sql.DB
selectCrossSigningSigsForTargetStmt *sql.Stmt selectCrossSigningSigsForTargetStmt *sql.Stmt
upsertCrossSigningSigsForTargetStmt *sql.Stmt upsertCrossSigningSigsForTargetStmt *sql.Stmt
deleteCrossSigningSigsForTargetStmt *sql.Stmt
} }
func NewSqliteCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, error) { func NewSqliteCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, error) {
@ -62,6 +66,7 @@ func NewSqliteCrossSigningSigsTable(db *sql.DB) (tables.CrossSigningSigs, error)
return s, sqlutil.StatementList{ return s, sqlutil.StatementList{
{&s.selectCrossSigningSigsForTargetStmt, selectCrossSigningSigsForTargetSQL}, {&s.selectCrossSigningSigsForTargetStmt, selectCrossSigningSigsForTargetSQL},
{&s.upsertCrossSigningSigsForTargetStmt, upsertCrossSigningSigsForTargetSQL}, {&s.upsertCrossSigningSigsForTargetStmt, upsertCrossSigningSigsForTargetSQL},
{&s.deleteCrossSigningSigsForTargetStmt, deleteCrossSigningSigsForTargetSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -100,3 +105,13 @@ func (s *crossSigningSigsStatements) UpsertCrossSigningSigsForTarget(
} }
return nil return nil
} }
func (s *crossSigningSigsStatements) DeleteCrossSigningSigsForTarget(
ctx context.Context, txn *sql.Tx,
targetUserID string, targetKeyID gomatrixserverlib.KeyID,
) error {
if _, err := sqlutil.TxStmt(txn, s.deleteCrossSigningSigsForTargetStmt).ExecContext(ctx, targetUserID, targetKeyID); err != nil {
return fmt.Errorf("s.deleteCrossSigningSigsForTargetStmt: %w", err)
}
return nil
}

View file

@ -58,6 +58,9 @@ const selectMaxStreamForUserSQL = "" +
const countStreamIDsForUserSQL = "" + const countStreamIDsForUserSQL = "" +
"SELECT COUNT(*) FROM keyserver_device_keys WHERE user_id=$1 AND stream_id IN ($2)" "SELECT COUNT(*) FROM keyserver_device_keys WHERE user_id=$1 AND stream_id IN ($2)"
const deleteDeviceKeysSQL = "" +
"DELETE FROM keyserver_device_keys WHERE user_id=$1 AND device_id=$2"
const deleteAllDeviceKeysSQL = "" + const deleteAllDeviceKeysSQL = "" +
"DELETE FROM keyserver_device_keys WHERE user_id=$1" "DELETE FROM keyserver_device_keys WHERE user_id=$1"
@ -67,6 +70,7 @@ type deviceKeysStatements struct {
selectDeviceKeysStmt *sql.Stmt selectDeviceKeysStmt *sql.Stmt
selectBatchDeviceKeysStmt *sql.Stmt selectBatchDeviceKeysStmt *sql.Stmt
selectMaxStreamForUserStmt *sql.Stmt selectMaxStreamForUserStmt *sql.Stmt
deleteDeviceKeysStmt *sql.Stmt
deleteAllDeviceKeysStmt *sql.Stmt deleteAllDeviceKeysStmt *sql.Stmt
} }
@ -90,12 +94,20 @@ func NewSqliteDeviceKeysTable(db *sql.DB) (tables.DeviceKeys, error) {
if s.selectMaxStreamForUserStmt, err = db.Prepare(selectMaxStreamForUserSQL); err != nil { if s.selectMaxStreamForUserStmt, err = db.Prepare(selectMaxStreamForUserSQL); err != nil {
return nil, err return nil, err
} }
if s.deleteDeviceKeysStmt, err = db.Prepare(deleteDeviceKeysSQL); err != nil {
return nil, err
}
if s.deleteAllDeviceKeysStmt, err = db.Prepare(deleteAllDeviceKeysSQL); err != nil { if s.deleteAllDeviceKeysStmt, err = db.Prepare(deleteAllDeviceKeysSQL); err != nil {
return nil, err return nil, err
} }
return s, nil return s, nil
} }
func (s *deviceKeysStatements) DeleteDeviceKeys(ctx context.Context, txn *sql.Tx, userID, deviceID string) error {
_, err := sqlutil.TxStmt(txn, s.deleteDeviceKeysStmt).ExecContext(ctx, userID, deviceID)
return err
}
func (s *deviceKeysStatements) DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error { func (s *deviceKeysStatements) DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error {
_, err := sqlutil.TxStmt(txn, s.deleteAllDeviceKeysStmt).ExecContext(ctx, userID) _, err := sqlutil.TxStmt(txn, s.deleteAllDeviceKeysStmt).ExecContext(ctx, userID)
return err return err
@ -113,7 +125,11 @@ func (s *deviceKeysStatements) SelectBatchDeviceKeys(ctx context.Context, userID
defer internal.CloseAndLogIfError(ctx, rows, "selectBatchDeviceKeysStmt: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "selectBatchDeviceKeysStmt: rows.close() failed")
var result []api.DeviceMessage var result []api.DeviceMessage
for rows.Next() { for rows.Next() {
var dk api.DeviceMessage dk := api.DeviceMessage{
Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{},
}
dk.Type = api.TypeDeviceKeyUpdate
dk.UserID = userID dk.UserID = userID
var keyJSON string var keyJSON string
var streamID int var streamID int
@ -144,6 +160,7 @@ func (s *deviceKeysStatements) SelectDeviceKeysJSON(ctx context.Context, keys []
return err return err
} }
// this will be '' when there is no device // this will be '' when there is no device
keys[i].Type = api.TypeDeviceKeyUpdate
keys[i].KeyJSON = []byte(keyJSONStr) keys[i].KeyJSON = []byte(keyJSONStr)
keys[i].StreamID = streamID keys[i].StreamID = streamID
if displayName.Valid { if displayName.Valid {

View file

@ -105,7 +105,8 @@ func TestDeviceKeysStreamIDGeneration(t *testing.T) {
bob := "@bob:TestDeviceKeysStreamIDGeneration" bob := "@bob:TestDeviceKeysStreamIDGeneration"
msgs := []api.DeviceMessage{ msgs := []api.DeviceMessage{
{ {
DeviceKeys: api.DeviceKeys{ Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{
DeviceID: "AAA", DeviceID: "AAA",
UserID: alice, UserID: alice,
KeyJSON: []byte(`{"key":"v1"}`), KeyJSON: []byte(`{"key":"v1"}`),
@ -113,7 +114,8 @@ func TestDeviceKeysStreamIDGeneration(t *testing.T) {
// StreamID: 1 // StreamID: 1
}, },
{ {
DeviceKeys: api.DeviceKeys{ Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{
DeviceID: "AAA", DeviceID: "AAA",
UserID: bob, UserID: bob,
KeyJSON: []byte(`{"key":"v1"}`), KeyJSON: []byte(`{"key":"v1"}`),
@ -121,7 +123,8 @@ func TestDeviceKeysStreamIDGeneration(t *testing.T) {
// StreamID: 1 as this is a different user // StreamID: 1 as this is a different user
}, },
{ {
DeviceKeys: api.DeviceKeys{ Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{
DeviceID: "another_device", DeviceID: "another_device",
UserID: alice, UserID: alice,
KeyJSON: []byte(`{"key":"v1"}`), KeyJSON: []byte(`{"key":"v1"}`),
@ -143,7 +146,8 @@ func TestDeviceKeysStreamIDGeneration(t *testing.T) {
// updating a device sets the next stream ID for that user // updating a device sets the next stream ID for that user
msgs = []api.DeviceMessage{ msgs = []api.DeviceMessage{
{ {
DeviceKeys: api.DeviceKeys{ Type: api.TypeDeviceKeyUpdate,
DeviceKeys: &api.DeviceKeys{
DeviceID: "AAA", DeviceID: "AAA",
UserID: alice, UserID: alice,
KeyJSON: []byte(`{"key":"v2"}`), KeyJSON: []byte(`{"key":"v2"}`),

View file

@ -39,6 +39,7 @@ type DeviceKeys interface {
SelectMaxStreamIDForUser(ctx context.Context, txn *sql.Tx, userID string) (streamID int32, err error) SelectMaxStreamIDForUser(ctx context.Context, txn *sql.Tx, userID string) (streamID int32, err error)
CountStreamIDsForUser(ctx context.Context, userID string, streamIDs []int64) (int, error) CountStreamIDsForUser(ctx context.Context, userID string, streamIDs []int64) (int, error)
SelectBatchDeviceKeys(ctx context.Context, userID string, deviceIDs []string) ([]api.DeviceMessage, error) SelectBatchDeviceKeys(ctx context.Context, userID string, deviceIDs []string) ([]api.DeviceMessage, error)
DeleteDeviceKeys(ctx context.Context, txn *sql.Tx, userID, deviceID string) error
DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error DeleteAllDeviceKeys(ctx context.Context, txn *sql.Tx, userID string) error
} }
@ -62,4 +63,5 @@ type CrossSigningKeys interface {
type CrossSigningSigs interface { type CrossSigningSigs interface {
SelectCrossSigningSigsForTarget(ctx context.Context, txn *sql.Tx, targetUserID string, targetKeyID gomatrixserverlib.KeyID) (r types.CrossSigningSigMap, err error) SelectCrossSigningSigsForTarget(ctx context.Context, txn *sql.Tx, targetUserID string, targetKeyID gomatrixserverlib.KeyID) (r types.CrossSigningSigMap, err error)
UpsertCrossSigningSigsForTarget(ctx context.Context, txn *sql.Tx, originUserID string, originKeyID gomatrixserverlib.KeyID, targetUserID string, targetKeyID gomatrixserverlib.KeyID, signature gomatrixserverlib.Base64Bytes) error UpsertCrossSigningSigsForTarget(ctx context.Context, txn *sql.Tx, originUserID string, originKeyID gomatrixserverlib.KeyID, targetUserID string, targetKeyID gomatrixserverlib.KeyID, signature gomatrixserverlib.Base64Bytes) error
DeleteCrossSigningSigsForTarget(ctx context.Context, txn *sql.Tx, targetUserID string, targetKeyID gomatrixserverlib.KeyID) error
} }

View file

@ -215,7 +215,9 @@ func PopulatePublicRooms(ctx context.Context, roomIDs []string, rsAPI Roomserver
case topicTuple: case topicTuple:
pub.Topic = contentVal pub.Topic = contentVal
case canonicalTuple: case canonicalTuple:
pub.CanonicalAlias = contentVal if _, _, err := gomatrixserverlib.SplitID('#', contentVal); err == nil {
pub.CanonicalAlias = contentVal
}
case visibilityTuple: case visibilityTuple:
pub.WorldReadable = contentVal == "world_readable" pub.WorldReadable = contentVal == "world_readable"
// need both of these to determine whether guests can join // need both of these to determine whether guests can join

View file

@ -220,7 +220,6 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
index := stateSnapshotData{snapshot.StateSnapshotNID, snapshot.RoomNID} index := stateSnapshotData{snapshot.StateSnapshotNID, snapshot.RoomNID}
newsnapshots[index] = append(newsnapshots[index], blocknid) newsnapshots[index] = append(newsnapshots[index], blocknid)
} }
for snapshotdata, newblocks := range newsnapshots { for snapshotdata, newblocks := range newsnapshots {
var newblocksarray pq.Int64Array var newblocksarray pq.Int64Array
for _, b := range newblocks { for _, b := range newblocks {
@ -229,11 +228,11 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
var newNID types.StateSnapshotNID var newNID types.StateSnapshotNID
err = tx.QueryRow(` err = tx.QueryRow(`
INSERT INTO roomserver_state_snapshots (state_snapshot_hash, room_nid, state_block_nids) INSERT INTO roomserver_state_snapshots (state_snapshot_hash, room_nid, state_block_nids)
VALUES ($1, $2, $3) VALUES ($1, $2, $3)
ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$2 ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$2
RETURNING state_snapshot_nid RETURNING state_snapshot_nid
`, newblocks.Hash(), snapshotdata.RoomNID, newblocksarray).Scan(&newNID) `, newblocks.Hash(), snapshotdata.RoomNID, newblocksarray).Scan(&newNID)
if err != nil { if err != nil {
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err) return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
} }
@ -252,16 +251,49 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
// If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist // If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist
// in roomserver_state_snapshots // in roomserver_state_snapshots
var count int64 var count int64
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil { if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err) return fmt.Errorf("assertion query failed: %s", err)
} }
if count > 0 { if count > 0 {
var debugEventID, debugRoomID string
var debugEventTypeNID, debugStateKeyNID, debugSnapNID, debugDepth int64
err = tx.QueryRow(
`SELECT event_id, event_type_nid, event_state_key_nid, roomserver_events.state_snapshot_nid, depth, room_id FROM roomserver_events
JOIN roomserver_rooms ON roomserver_rooms.room_nid = roomserver_events.room_nid WHERE roomserver_events.state_snapshot_nid < $1 AND roomserver_events.state_snapshot_nid != 0`, maxsnapshotid,
).Scan(&debugEventID, &debugEventTypeNID, &debugStateKeyNID, &debugSnapNID, &debugDepth, &debugRoomID)
if err != nil {
logrus.Errorf("cannot extract debug info: %v", err)
} else {
logrus.Errorf(
"Affected row: event_id=%v room_id=%v type=%v state_key=%v snapshot=%v depth=%v",
debugEventID, debugRoomID, debugEventTypeNID, debugStateKeyNID, debugSnapNID, debugDepth,
)
logrus.Errorf("To fix this manually, run this query first then retry the migration: "+
"UPDATE roomserver_events SET state_snapshot_nid=0 WHERE event_id='%v'", debugEventID)
}
return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count) return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
} }
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil { if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
return fmt.Errorf("assertion query failed: %s", err) return fmt.Errorf("assertion query failed: %s", err)
} }
if count > 0 { if count > 0 {
var debugRoomID string
var debugSnapNID, debugLastEventNID int64
err = tx.QueryRow(
`SELECT room_id, state_snapshot_nid, last_event_sent_nid FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid,
).Scan(&debugRoomID, &debugSnapNID, &debugLastEventNID)
if err != nil {
logrus.Errorf("cannot extract debug info: %v", err)
} else {
logrus.Errorf(
"Affected row: room_id=%v snapshot=%v last_sent=%v",
debugRoomID, debugSnapNID, debugLastEventNID,
)
logrus.Errorf("To fix this manually, run this query first then retry the migration: "+
"UPDATE roomserver_rooms SET state_snapshot_nid=0 WHERE room_id='%v'", debugRoomID)
logrus.Errorf("Running this UPDATE will cause the room in question to become unavailable on this server. Leave and re-join the room afterwards.")
}
return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count) return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
} }

View file

@ -10,7 +10,6 @@ const (
TopicOutputRoomEvent = "OutputRoomEvent" TopicOutputRoomEvent = "OutputRoomEvent"
TopicOutputClientData = "OutputClientData" TopicOutputClientData = "OutputClientData"
TopicOutputReceiptEvent = "OutputReceiptEvent" TopicOutputReceiptEvent = "OutputReceiptEvent"
TopicOutputSigningKeyUpdate = "OutputSigningKeyUpdate"
) )
type Kafka struct { type Kafka struct {

View file

@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -104,13 +105,23 @@ func (s *OutputKeyChangeEventConsumer) updateOffset(msg *sarama.ConsumerMessage)
func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
defer s.updateOffset(msg) defer s.updateOffset(msg)
var output api.DeviceMessage var m api.DeviceMessage
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &m); err != nil {
// If the message was invalid, log it and move on to the next message in the stream logrus.WithError(err).Errorf("failed to read device message from key change topic")
log.WithError(err).Error("syncapi: failed to unmarshal key change event from key server") return nil
sentry.CaptureException(err)
return err
} }
switch m.Type {
case api.TypeCrossSigningUpdate:
return s.onCrossSigningMessage(m, msg.Offset, msg.Partition)
case api.TypeDeviceKeyUpdate:
fallthrough
default:
return s.onDeviceKeyMessage(m, msg.Offset, msg.Partition)
}
}
func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64, partition int32) error {
output := m.DeviceKeys
// work out who we need to notify about the new key // work out who we need to notify about the new key
var queryRes roomserverAPI.QuerySharedUsersResponse var queryRes roomserverAPI.QuerySharedUsersResponse
err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{ err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{
@ -124,8 +135,35 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
// make sure we get our own key updates too! // make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1 queryRes.UserIDsToCount[output.UserID] = 1
posUpdate := types.LogPosition{ posUpdate := types.LogPosition{
Offset: msg.Offset, Offset: offset,
Partition: msg.Partition, Partition: partition,
}
s.stream.Advance(posUpdate)
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
return nil
}
func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, offset int64, partition int32) error {
output := m.CrossSigningKeyUpdate
// work out who we need to notify about the new key
var queryRes roomserverAPI.QuerySharedUsersResponse
err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{
UserID: output.UserID,
}, &queryRes)
if err != nil {
log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err)
return err
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
posUpdate := types.LogPosition{
Offset: offset,
Partition: partition,
} }
s.stream.Advance(posUpdate) s.stream.Advance(posUpdate)

View file

@ -33,6 +33,8 @@ func (k *mockKeyAPI) SetUserAPI(i userapi.UserInternalAPI) {}
// PerformClaimKeys claims one-time keys for use in pre-key messages // PerformClaimKeys claims one-time keys for use in pre-key messages
func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) { func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) {
} }
func (k *mockKeyAPI) PerformDeleteKeys(ctx context.Context, req *keyapi.PerformDeleteKeysRequest, res *keyapi.PerformDeleteKeysResponse) {
}
func (k *mockKeyAPI) PerformUploadDeviceKeys(ctx context.Context, req *keyapi.PerformUploadDeviceKeysRequest, res *keyapi.PerformUploadDeviceKeysResponse) { func (k *mockKeyAPI) PerformUploadDeviceKeys(ctx context.Context, req *keyapi.PerformUploadDeviceKeysRequest, res *keyapi.PerformUploadDeviceKeysResponse) {
} }
func (k *mockKeyAPI) PerformUploadDeviceSignatures(ctx context.Context, req *keyapi.PerformUploadDeviceSignaturesRequest, res *keyapi.PerformUploadDeviceSignaturesResponse) { func (k *mockKeyAPI) PerformUploadDeviceSignatures(ctx context.Context, req *keyapi.PerformUploadDeviceSignaturesRequest, res *keyapi.PerformUploadDeviceSignaturesResponse) {

View file

@ -554,3 +554,5 @@ Can upload self-signing keys
Fails to upload self-signing keys with no auth Fails to upload self-signing keys with no auth
Fails to upload self-signing key without master key Fails to upload self-signing key without master key
can fetch self-signing keys over federation can fetch self-signing keys over federation
Changing master key notifies local users
Changing user-signing key notifies local users

View file

@ -149,6 +149,18 @@ func (a *UserInternalAPI) PerformDeviceDeletion(ctx context.Context, req *api.Pe
if err != nil { if err != nil {
return err return err
} }
// Ask the keyserver to delete device keys and signatures for those devices
deleteReq := &keyapi.PerformDeleteKeysRequest{
UserID: req.UserID,
}
for _, keyID := range req.DeviceIDs {
deleteReq.KeyIDs = append(deleteReq.KeyIDs, gomatrixserverlib.KeyID(keyID))
}
deleteRes := &keyapi.PerformDeleteKeysResponse{}
a.KeyAPI.PerformDeleteKeys(ctx, deleteReq, deleteRes)
if err := deleteRes.Error; err != nil {
return fmt.Errorf("a.KeyAPI.PerformDeleteKeys: %w", err)
}
// create empty device keys and upload them to delete what was once there and trigger device list changes // create empty device keys and upload them to delete what was once there and trigger device list changes
return a.deviceListUpdate(req.UserID, deletedDeviceIDs) return a.deviceListUpdate(req.UserID, deletedDeviceIDs)
} }