From 2771d93748380aa7dc21adca0ef690348d79f002 Mon Sep 17 00:00:00 2001 From: S7evinK <2353100+S7evinK@users.noreply.github.com> Date: Tue, 8 Feb 2022 18:13:38 +0100 Subject: [PATCH] Remove OutputKeyChangeEvent consumer on keyserver (#2160) * Remove keyserver consumer * Remove keyserver from eduserver * Directly upload device keys without eduserver * Add passing tests --- eduserver/api/input.go | 6 -- eduserver/eduserver.go | 1 - eduserver/input/input.go | 31 ------- eduserver/inthttp/client.go | 20 +---- eduserver/inthttp/server.go | 13 --- federationapi/consumers/roomserver.go | 1 - federationapi/routing/send.go | 44 ++++++--- keyserver/consumers/cross_signing.go | 123 -------------------------- keyserver/internal/cross_signing.go | 50 +++++------ keyserver/keyserver.go | 8 -- syncapi/internal/keychange.go | 2 + sytest-whitelist | 1 + 12 files changed, 59 insertions(+), 241 deletions(-) delete mode 100644 keyserver/consumers/cross_signing.go diff --git a/eduserver/api/input.go b/eduserver/api/input.go index 2fa253f4d..2aab107b2 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -100,10 +100,4 @@ type EDUServerInputAPI interface { request *InputReceiptEventRequest, response *InputReceiptEventResponse, ) error - - InputCrossSigningKeyUpdate( - ctx context.Context, - request *InputCrossSigningKeyUpdateRequest, - response *InputCrossSigningKeyUpdateResponse, - ) error } diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index febcf2864..9b7e21651 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -51,7 +51,6 @@ func NewInternalAPI( OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), - OutputKeyChangeEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), ServerName: cfg.Matrix.ServerName, } } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index 4f8ab3e34..e58f0dd34 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/cache" - keyapi "github.com/matrix-org/dendrite/keyserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" @@ -40,8 +39,6 @@ type EDUServerInputAPI struct { OutputSendToDeviceEventTopic string // The kafka topic to output new receipt events to OutputReceiptEventTopic string - // The kafka topic to output new key change events to - OutputKeyChangeEventTopic string // kafka producer JetStream nats.JetStreamContext // Internal user query API @@ -80,34 +77,6 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent( return t.sendToDeviceEvent(ise) } -// InputCrossSigningKeyUpdate implements api.EDUServerInputAPI -func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate( - ctx context.Context, - request *api.InputCrossSigningKeyUpdateRequest, - response *api.InputCrossSigningKeyUpdateResponse, -) error { - eventJSON, err := json.Marshal(&keyapi.DeviceMessage{ - Type: keyapi.TypeCrossSigningUpdate, - OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{ - CrossSigningKeyUpdate: request.CrossSigningKeyUpdate, - }, - }) - if err != nil { - return err - } - - logrus.WithFields(logrus.Fields{ - "user_id": request.UserID, - }).Tracef("Producing to topic '%s'", t.OutputKeyChangeEventTopic) - - _, err = t.JetStream.PublishMsg(&nats.Msg{ - Subject: t.OutputKeyChangeEventTopic, - Header: nats.Header{}, - Data: eventJSON, - }) - return err -} - func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { ev := &api.TypingEvent{ Type: gomatrixserverlib.MTyping, diff --git a/eduserver/inthttp/client.go b/eduserver/inthttp/client.go index 9a6f483c2..0690ed827 100644 --- a/eduserver/inthttp/client.go +++ b/eduserver/inthttp/client.go @@ -12,10 +12,9 @@ import ( // HTTP paths for the internal HTTP APIs const ( - EDUServerInputTypingEventPath = "/eduserver/input" - EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" - EDUServerInputReceiptEventPath = "/eduserver/receipt" - EDUServerInputCrossSigningKeyUpdatePath = "/eduserver/crossSigningKeyUpdate" + EDUServerInputTypingEventPath = "/eduserver/input" + EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" + EDUServerInputReceiptEventPath = "/eduserver/receipt" ) // NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. @@ -69,16 +68,3 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent( apiURL := h.eduServerURL + EDUServerInputReceiptEventPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } - -// InputCrossSigningKeyUpdate implements EDUServerInputAPI -func (h *httpEDUServerInputAPI) InputCrossSigningKeyUpdate( - ctx context.Context, - request *api.InputCrossSigningKeyUpdateRequest, - response *api.InputCrossSigningKeyUpdateResponse, -) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "InputCrossSigningKeyUpdate") - defer span.Finish() - - apiURL := h.eduServerURL + EDUServerInputCrossSigningKeyUpdatePath - return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) -} diff --git a/eduserver/inthttp/server.go b/eduserver/inthttp/server.go index a50ca84f9..a34943750 100644 --- a/eduserver/inthttp/server.go +++ b/eduserver/inthttp/server.go @@ -51,17 +51,4 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) - internalAPIMux.Handle(EDUServerInputCrossSigningKeyUpdatePath, - httputil.MakeInternalAPI("inputCrossSigningKeyUpdate", func(req *http.Request) util.JSONResponse { - var request api.InputCrossSigningKeyUpdateRequest - var response api.InputCrossSigningKeyUpdateResponse - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - if err := t.InputCrossSigningKeyUpdate(req.Context(), &request, &response); err != nil { - return util.ErrorResponse(err) - } - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) } diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index e9862000a..60066bb2f 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/types" diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 524fd510e..dd4fe13a8 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -382,20 +382,8 @@ func (t *txnReq) processEDUs(ctx context.Context) { } } case eduserverAPI.MSigningKeyUpdate: - var updatePayload eduserverAPI.CrossSigningKeyUpdate - if err := json.Unmarshal(e.Content, &updatePayload); err != nil { - util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ - "user_id": updatePayload.UserID, - }).Debug("Failed to send signing key update to edu server") - continue - } - inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{ - CrossSigningKeyUpdate: updatePayload, - } - inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{} - if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil { - util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal cross-signing update") - continue + if err := t.processSigningKeyUpdate(ctx, e); err != nil { + logrus.WithError(err).Errorf("Failed to process signing key update") } default: util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") @@ -403,6 +391,34 @@ func (t *txnReq) processEDUs(ctx context.Context) { } } +func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error { + var updatePayload eduserverAPI.CrossSigningKeyUpdate + if err := json.Unmarshal(e.Content, &updatePayload); err != nil { + util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ + "user_id": updatePayload.UserID, + }).Debug("Failed to unmarshal signing key update") + return err + } + + keys := gomatrixserverlib.CrossSigningKeys{} + if updatePayload.MasterKey != nil { + keys.MasterKey = *updatePayload.MasterKey + } + if updatePayload.SelfSigningKey != nil { + keys.SelfSigningKey = *updatePayload.SelfSigningKey + } + uploadReq := &keyapi.PerformUploadDeviceKeysRequest{ + CrossSigningKeys: keys, + UserID: updatePayload.UserID, + } + uploadRes := &keyapi.PerformUploadDeviceKeysResponse{} + t.keyAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes) + if uploadRes.Error != nil { + return uploadRes.Error + } + return nil +} + // processReceiptEvent sends receipt events to the edu server func (t *txnReq) processReceiptEvent(ctx context.Context, userID, roomID, receiptType string, diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go deleted file mode 100644 index aae69e960..000000000 --- a/keyserver/consumers/cross_signing.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2021 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "encoding/json" - - "github.com/matrix-org/dendrite/keyserver/api" - "github.com/matrix-org/dendrite/keyserver/storage" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - "github.com/sirupsen/logrus" -) - -type OutputCrossSigningKeyUpdateConsumer struct { - ctx context.Context - keyDB storage.Database - keyAPI api.KeyInternalAPI - serverName string - jetstream nats.JetStreamContext - durable string - topic string -} - -func NewOutputCrossSigningKeyUpdateConsumer( - process *process.ProcessContext, - cfg *config.Dendrite, - js nats.JetStreamContext, - keyDB storage.Database, - keyAPI api.KeyInternalAPI, -) *OutputCrossSigningKeyUpdateConsumer { - // The keyserver both produces and consumes on the TopicOutputKeyChangeEvent - // topic. We will only produce events where the UserID matches our server name, - // and we will only consume events where the UserID does NOT match our server - // name (because the update came from a remote server). - s := &OutputCrossSigningKeyUpdateConsumer{ - ctx: process.Context(), - keyDB: keyDB, - jetstream: js, - durable: cfg.Global.JetStream.Durable("KeyServerCrossSigningConsumer"), - topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), - keyAPI: keyAPI, - serverName: string(cfg.Global.ServerName), - } - - return s -} - -func (s *OutputCrossSigningKeyUpdateConsumer) Start() error { - return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, - nats.DeliverAll(), nats.ManualAck(), - ) -} - -// onMessage is called in response to a message received on the -// key change events topic from the key server. -func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var m api.DeviceMessage - if err := json.Unmarshal(msg.Data, &m); err != nil { - logrus.WithError(err).Errorf("failed to read device message from key change topic") - return true - } - if m.OutputCrossSigningKeyUpdate == nil { - // This probably shouldn't happen but stops us from panicking if we come - // across an update that doesn't satisfy either types. - return true - } - switch m.Type { - case api.TypeCrossSigningUpdate: - return t.onCrossSigningMessage(m) - default: - return true - } -} - -func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { - output := m.CrossSigningKeyUpdate - _, host, err := gomatrixserverlib.SplitID('@', output.UserID) - if err != nil { - logrus.WithError(err).Errorf("eduserver output log: user ID parse failure") - return true - } - if host == gomatrixserverlib.ServerName(s.serverName) { - // Ignore any messages that contain information about our own users, as - // they already originated from this server. - return true - } - uploadReq := &api.PerformUploadDeviceKeysRequest{ - UserID: output.UserID, - } - if output.MasterKey != nil { - uploadReq.MasterKey = *output.MasterKey - } - if output.SelfSigningKey != nil { - uploadReq.SelfSigningKey = *output.SelfSigningKey - } - uploadRes := &api.PerformUploadDeviceKeysResponse{} - s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes) - if uploadRes.Error != nil { - // If the error is due to a missing or invalid parameter then we'd might - // as well just acknowledge the message, because otherwise otherwise we'll - // just keep getting delivered a faulty message over and over again. - return uploadRes.Error.IsMissingParam || uploadRes.Error.IsInvalidParam - } - return true -} diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go index 1e1871b8b..527990cf9 100644 --- a/keyserver/internal/cross_signing.go +++ b/keyserver/internal/cross_signing.go @@ -219,25 +219,23 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P } // Finally, generate a notification that we updated the keys. - if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer { - update := eduserverAPI.CrossSigningKeyUpdate{ - UserID: req.UserID, - } - if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { - update.MasterKey = &mk - } - if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok { - update.SelfSigningKey = &ssk - } - if update.MasterKey == nil && update.SelfSigningKey == nil { - return - } - if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), - } - return + update := eduserverAPI.CrossSigningKeyUpdate{ + UserID: req.UserID, + } + if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { + update.MasterKey = &mk + } + if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok { + update.SelfSigningKey = &ssk + } + if update.MasterKey == nil && update.SelfSigningKey == nil { + return + } + if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), } + return } } @@ -310,16 +308,14 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req // Finally, generate a notification that we updated the signatures. for userID := range req.Signatures { - if _, host, err := gomatrixserverlib.SplitID('@', userID); err == nil && host == a.ThisServer { - update := eduserverAPI.CrossSigningKeyUpdate{ - UserID: userID, - } - if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), - } - return + update := eduserverAPI.CrossSigningKeyUpdate{ + UserID: userID, + } + if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), } + return } } } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 61ccc0303..bd36fd9f9 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -18,7 +18,6 @@ import ( "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" - "github.com/matrix-org/dendrite/keyserver/consumers" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/producers" @@ -65,12 +64,5 @@ func NewInternalAPI( } }() - keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer( - base.ProcessContext, base.Cfg, js, db, ap, - ) - if err := keyconsumer.Start(); err != nil { - logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer") - } - return ap } diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index fa1064b70..37a9e2d39 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -282,6 +282,8 @@ func membershipEvents(res *types.Response) (joinUserIDs, leaveUserIDs []string) if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil { if strings.Contains(string(ev.Content), `"join"`) { joinUserIDs = append(joinUserIDs, *ev.StateKey) + } else if strings.Contains(string(ev.Content), `"invite"`) { + joinUserIDs = append(joinUserIDs, *ev.StateKey) } else if strings.Contains(string(ev.Content), `"leave"`) { leaveUserIDs = append(leaveUserIDs, *ev.StateKey) } else if strings.Contains(string(ev.Content), `"ban"`) { diff --git a/sytest-whitelist b/sytest-whitelist index 7d26c610e..c6ce1daad 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -590,3 +590,4 @@ Can reject invites over federation for rooms with version 9 Can receive redactions from regular users over federation in room version 9 Forward extremities remain so even after the next events are populated as outliers If a device list update goes missing, the server resyncs on the next one +uploading self-signing key notifies over federation