mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-22 14:21:55 -06:00
Remove OutputKeyChangeEvent consumer on keyserver (#2160)
* Remove keyserver consumer * Remove keyserver from eduserver * Directly upload device keys without eduserver * Add passing tests
This commit is contained in:
parent
457a07eac5
commit
2771d93748
|
@ -100,10 +100,4 @@ type EDUServerInputAPI interface {
|
|||
request *InputReceiptEventRequest,
|
||||
response *InputReceiptEventResponse,
|
||||
) error
|
||||
|
||||
InputCrossSigningKeyUpdate(
|
||||
ctx context.Context,
|
||||
request *InputCrossSigningKeyUpdateRequest,
|
||||
response *InputCrossSigningKeyUpdateResponse,
|
||||
) error
|
||||
}
|
||||
|
|
|
@ -51,7 +51,6 @@ func NewInternalAPI(
|
|||
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
|
||||
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
|
||||
OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
|
||||
OutputKeyChangeEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
|
||||
ServerName: cfg.Matrix.ServerName,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
|
@ -40,8 +39,6 @@ type EDUServerInputAPI struct {
|
|||
OutputSendToDeviceEventTopic string
|
||||
// The kafka topic to output new receipt events to
|
||||
OutputReceiptEventTopic string
|
||||
// The kafka topic to output new key change events to
|
||||
OutputKeyChangeEventTopic string
|
||||
// kafka producer
|
||||
JetStream nats.JetStreamContext
|
||||
// Internal user query API
|
||||
|
@ -80,34 +77,6 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent(
|
|||
return t.sendToDeviceEvent(ise)
|
||||
}
|
||||
|
||||
// InputCrossSigningKeyUpdate implements api.EDUServerInputAPI
|
||||
func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate(
|
||||
ctx context.Context,
|
||||
request *api.InputCrossSigningKeyUpdateRequest,
|
||||
response *api.InputCrossSigningKeyUpdateResponse,
|
||||
) error {
|
||||
eventJSON, err := json.Marshal(&keyapi.DeviceMessage{
|
||||
Type: keyapi.TypeCrossSigningUpdate,
|
||||
OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{
|
||||
CrossSigningKeyUpdate: request.CrossSigningKeyUpdate,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"user_id": request.UserID,
|
||||
}).Tracef("Producing to topic '%s'", t.OutputKeyChangeEventTopic)
|
||||
|
||||
_, err = t.JetStream.PublishMsg(&nats.Msg{
|
||||
Subject: t.OutputKeyChangeEventTopic,
|
||||
Header: nats.Header{},
|
||||
Data: eventJSON,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
|
||||
ev := &api.TypingEvent{
|
||||
Type: gomatrixserverlib.MTyping,
|
||||
|
|
|
@ -15,7 +15,6 @@ const (
|
|||
EDUServerInputTypingEventPath = "/eduserver/input"
|
||||
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
|
||||
EDUServerInputReceiptEventPath = "/eduserver/receipt"
|
||||
EDUServerInputCrossSigningKeyUpdatePath = "/eduserver/crossSigningKeyUpdate"
|
||||
)
|
||||
|
||||
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
|
||||
|
@ -69,16 +68,3 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent(
|
|||
apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
|
||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
||||
// InputCrossSigningKeyUpdate implements EDUServerInputAPI
|
||||
func (h *httpEDUServerInputAPI) InputCrossSigningKeyUpdate(
|
||||
ctx context.Context,
|
||||
request *api.InputCrossSigningKeyUpdateRequest,
|
||||
response *api.InputCrossSigningKeyUpdateResponse,
|
||||
) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "InputCrossSigningKeyUpdate")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.eduServerURL + EDUServerInputCrossSigningKeyUpdatePath
|
||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
|
|
@ -51,17 +51,4 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
|
|||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(EDUServerInputCrossSigningKeyUpdatePath,
|
||||
httputil.MakeInternalAPI("inputCrossSigningKeyUpdate", func(req *http.Request) util.JSONResponse {
|
||||
var request api.InputCrossSigningKeyUpdateRequest
|
||||
var response api.InputCrossSigningKeyUpdateResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
if err := t.InputCrossSigningKeyUpdate(req.Context(), &request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationapi/queue"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||
"github.com/matrix-org/dendrite/federationapi/types"
|
||||
|
|
|
@ -382,20 +382,8 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
case eduserverAPI.MSigningKeyUpdate:
|
||||
var updatePayload eduserverAPI.CrossSigningKeyUpdate
|
||||
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
|
||||
"user_id": updatePayload.UserID,
|
||||
}).Debug("Failed to send signing key update to edu server")
|
||||
continue
|
||||
}
|
||||
inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{
|
||||
CrossSigningKeyUpdate: updatePayload,
|
||||
}
|
||||
inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{}
|
||||
if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal cross-signing update")
|
||||
continue
|
||||
if err := t.processSigningKeyUpdate(ctx, e); err != nil {
|
||||
logrus.WithError(err).Errorf("Failed to process signing key update")
|
||||
}
|
||||
default:
|
||||
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
|
||||
|
@ -403,6 +391,34 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error {
|
||||
var updatePayload eduserverAPI.CrossSigningKeyUpdate
|
||||
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
|
||||
"user_id": updatePayload.UserID,
|
||||
}).Debug("Failed to unmarshal signing key update")
|
||||
return err
|
||||
}
|
||||
|
||||
keys := gomatrixserverlib.CrossSigningKeys{}
|
||||
if updatePayload.MasterKey != nil {
|
||||
keys.MasterKey = *updatePayload.MasterKey
|
||||
}
|
||||
if updatePayload.SelfSigningKey != nil {
|
||||
keys.SelfSigningKey = *updatePayload.SelfSigningKey
|
||||
}
|
||||
uploadReq := &keyapi.PerformUploadDeviceKeysRequest{
|
||||
CrossSigningKeys: keys,
|
||||
UserID: updatePayload.UserID,
|
||||
}
|
||||
uploadRes := &keyapi.PerformUploadDeviceKeysResponse{}
|
||||
t.keyAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes)
|
||||
if uploadRes.Error != nil {
|
||||
return uploadRes.Error
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processReceiptEvent sends receipt events to the edu server
|
||||
func (t *txnReq) processReceiptEvent(ctx context.Context,
|
||||
userID, roomID, receiptType string,
|
||||
|
|
|
@ -1,123 +0,0 @@
|
|||
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package consumers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type OutputCrossSigningKeyUpdateConsumer struct {
|
||||
ctx context.Context
|
||||
keyDB storage.Database
|
||||
keyAPI api.KeyInternalAPI
|
||||
serverName string
|
||||
jetstream nats.JetStreamContext
|
||||
durable string
|
||||
topic string
|
||||
}
|
||||
|
||||
func NewOutputCrossSigningKeyUpdateConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.Dendrite,
|
||||
js nats.JetStreamContext,
|
||||
keyDB storage.Database,
|
||||
keyAPI api.KeyInternalAPI,
|
||||
) *OutputCrossSigningKeyUpdateConsumer {
|
||||
// The keyserver both produces and consumes on the TopicOutputKeyChangeEvent
|
||||
// topic. We will only produce events where the UserID matches our server name,
|
||||
// and we will only consume events where the UserID does NOT match our server
|
||||
// name (because the update came from a remote server).
|
||||
s := &OutputCrossSigningKeyUpdateConsumer{
|
||||
ctx: process.Context(),
|
||||
keyDB: keyDB,
|
||||
jetstream: js,
|
||||
durable: cfg.Global.JetStream.Durable("KeyServerCrossSigningConsumer"),
|
||||
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
|
||||
keyAPI: keyAPI,
|
||||
serverName: string(cfg.Global.ServerName),
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
|
||||
return jetstream.JetStreamConsumer(
|
||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
||||
nats.DeliverAll(), nats.ManualAck(),
|
||||
)
|
||||
}
|
||||
|
||||
// onMessage is called in response to a message received on the
|
||||
// key change events topic from the key server.
|
||||
func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
||||
var m api.DeviceMessage
|
||||
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
||||
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
||||
return true
|
||||
}
|
||||
if m.OutputCrossSigningKeyUpdate == nil {
|
||||
// This probably shouldn't happen but stops us from panicking if we come
|
||||
// across an update that doesn't satisfy either types.
|
||||
return true
|
||||
}
|
||||
switch m.Type {
|
||||
case api.TypeCrossSigningUpdate:
|
||||
return t.onCrossSigningMessage(m)
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
||||
output := m.CrossSigningKeyUpdate
|
||||
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("eduserver output log: user ID parse failure")
|
||||
return true
|
||||
}
|
||||
if host == gomatrixserverlib.ServerName(s.serverName) {
|
||||
// Ignore any messages that contain information about our own users, as
|
||||
// they already originated from this server.
|
||||
return true
|
||||
}
|
||||
uploadReq := &api.PerformUploadDeviceKeysRequest{
|
||||
UserID: output.UserID,
|
||||
}
|
||||
if output.MasterKey != nil {
|
||||
uploadReq.MasterKey = *output.MasterKey
|
||||
}
|
||||
if output.SelfSigningKey != nil {
|
||||
uploadReq.SelfSigningKey = *output.SelfSigningKey
|
||||
}
|
||||
uploadRes := &api.PerformUploadDeviceKeysResponse{}
|
||||
s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes)
|
||||
if uploadRes.Error != nil {
|
||||
// If the error is due to a missing or invalid parameter then we'd might
|
||||
// as well just acknowledge the message, because otherwise otherwise we'll
|
||||
// just keep getting delivered a faulty message over and over again.
|
||||
return uploadRes.Error.IsMissingParam || uploadRes.Error.IsInvalidParam
|
||||
}
|
||||
return true
|
||||
}
|
|
@ -219,7 +219,6 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
|
|||
}
|
||||
|
||||
// Finally, generate a notification that we updated the keys.
|
||||
if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer {
|
||||
update := eduserverAPI.CrossSigningKeyUpdate{
|
||||
UserID: req.UserID,
|
||||
}
|
||||
|
@ -238,7 +237,6 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
|
|||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req *api.PerformUploadDeviceSignaturesRequest, res *api.PerformUploadDeviceSignaturesResponse) {
|
||||
|
@ -310,7 +308,6 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
|
|||
|
||||
// Finally, generate a notification that we updated the signatures.
|
||||
for userID := range req.Signatures {
|
||||
if _, host, err := gomatrixserverlib.SplitID('@', userID); err == nil && host == a.ThisServer {
|
||||
update := eduserverAPI.CrossSigningKeyUpdate{
|
||||
UserID: userID,
|
||||
}
|
||||
|
@ -321,7 +318,6 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
|
|||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) processSelfSignatures(
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
"github.com/gorilla/mux"
|
||||
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/consumers"
|
||||
"github.com/matrix-org/dendrite/keyserver/internal"
|
||||
"github.com/matrix-org/dendrite/keyserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/keyserver/producers"
|
||||
|
@ -65,12 +64,5 @@ func NewInternalAPI(
|
|||
}
|
||||
}()
|
||||
|
||||
keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
|
||||
base.ProcessContext, base.Cfg, js, db, ap,
|
||||
)
|
||||
if err := keyconsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer")
|
||||
}
|
||||
|
||||
return ap
|
||||
}
|
||||
|
|
|
@ -282,6 +282,8 @@ func membershipEvents(res *types.Response) (joinUserIDs, leaveUserIDs []string)
|
|||
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil {
|
||||
if strings.Contains(string(ev.Content), `"join"`) {
|
||||
joinUserIDs = append(joinUserIDs, *ev.StateKey)
|
||||
} else if strings.Contains(string(ev.Content), `"invite"`) {
|
||||
joinUserIDs = append(joinUserIDs, *ev.StateKey)
|
||||
} else if strings.Contains(string(ev.Content), `"leave"`) {
|
||||
leaveUserIDs = append(leaveUserIDs, *ev.StateKey)
|
||||
} else if strings.Contains(string(ev.Content), `"ban"`) {
|
||||
|
|
|
@ -590,3 +590,4 @@ Can reject invites over federation for rooms with version 9
|
|||
Can receive redactions from regular users over federation in room version 9
|
||||
Forward extremities remain so even after the next events are populated as outliers
|
||||
If a device list update goes missing, the server resyncs on the next one
|
||||
uploading self-signing key notifies over federation
|
||||
|
|
Loading…
Reference in a new issue