From f4cc3d28e6561734741d43e79d57948676a1284e Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 10 Aug 2021 14:43:55 +0100 Subject: [PATCH] Produce/consume EDUs --- eduserver/api/types.go | 6 +- federationsender/consumers/signingkeys.go | 121 ++++++++++++++++++++++ federationsender/federationsender.go | 6 ++ keyserver/consumers/eduserver.go | 10 +- keyserver/internal/cross_signing.go | 18 ++++ keyserver/internal/internal.go | 17 +-- keyserver/keyserver.go | 13 ++- keyserver/producers/signingupdate.go | 51 +++++++++ 8 files changed, 223 insertions(+), 19 deletions(-) create mode 100644 federationsender/consumers/signingkeys.go create mode 100644 keyserver/producers/signingupdate.go diff --git a/eduserver/api/types.go b/eduserver/api/types.go index e04cff011..0332b7e53 100644 --- a/eduserver/api/types.go +++ b/eduserver/api/types.go @@ -41,7 +41,7 @@ type ReceiptTS struct { } type SigningKeyUpdate struct { - MasterKey gomatrixserverlib.CrossSigningKey `json:"master_key"` - SelfSigningKey gomatrixserverlib.CrossSigningKey `json:"self_signing_key"` - UserID string `json:"user_id"` + MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"` + SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"` + UserID string `json:"user_id"` } diff --git a/federationsender/consumers/signingkeys.go b/federationsender/consumers/signingkeys.go new file mode 100644 index 000000000..cf54da8a4 --- /dev/null +++ b/federationsender/consumers/signingkeys.go @@ -0,0 +1,121 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/Shopify/sarama" + eduapi "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/federationsender/queue" + "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/internal" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" +) + +type OutputSigningKeyUpdateConsumer struct { + consumer *internal.ContinualConsumer + db storage.Database + queues *queue.OutgoingQueues + serverName gomatrixserverlib.ServerName + rsAPI roomserverAPI.RoomserverInternalAPI +} + +func NewOutputSigningKeyUpdateConsumer( + process *process.ProcessContext, + cfg *config.KeyServer, + kafkaConsumer sarama.Consumer, + queues *queue.OutgoingQueues, + store storage.Database, + rsAPI roomserverAPI.RoomserverInternalAPI, +) *OutputSigningKeyUpdateConsumer { + c := &OutputSigningKeyUpdateConsumer{ + consumer: &internal.ContinualConsumer{ + Process: process, + ComponentName: "federationsender/signingkeys", + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate)), + Consumer: kafkaConsumer, + PartitionStore: store, + }, + queues: queues, + db: store, + serverName: cfg.Matrix.ServerName, + rsAPI: rsAPI, + } + c.consumer.ProcessMessage = c.onMessage + + return c +} + +func (t *OutputSigningKeyUpdateConsumer) Start() error { + if err := t.consumer.Start(); err != nil { + return fmt.Errorf("t.consumer.Start: %w", err) + } + return nil +} + +func (t *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { + var output eduapi.OutputSigningKeyUpdate + if err := json.Unmarshal(msg.Value, &output); err != nil { + logrus.WithError(err).Errorf("eduserver output log: message parse failure") + return nil + } + _, host, err := gomatrixserverlib.SplitID('@', output.UserID) + if err != nil { + logrus.WithError(err).Errorf("eduserver output log: user ID parse failure") + return nil + } + if host != gomatrixserverlib.ServerName(t.serverName) { + // Ignore any messages that didn't originate locally, otherwise we'll + // end up parroting information we received from other servers. + return nil + } + logger := log.WithField("user_id", output.UserID) + + var queryRes roomserverAPI.QueryRoomsForUserResponse + err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{ + UserID: output.UserID, + WantMembership: "join", + }, &queryRes) + if err != nil { + logger.WithError(err).Error("failed to calculate joined rooms for user") + return nil + } + // send this key change to all servers who share rooms with this user. + destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs) + if err != nil { + logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in") + return nil + } + + // Pack the EDU and marshal it + edu := &gomatrixserverlib.EDU{ + Type: eduapi.MSigningKeyUpdate, + Origin: string(t.serverName), + } + if edu.Content, err = json.Marshal(output.SigningKeyUpdate); err != nil { + return err + } + + logger.Infof("Sending cross-signing update message to %q", destinations) + return t.queues.SendEDU(edu, t.serverName, destinations) +} diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 0732c5d38..886b6557c 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -94,6 +94,12 @@ func NewInternalAPI( if err := keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") } + signingKeyConsumer := consumers.NewOutputSigningKeyUpdateConsumer( + base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI, + ) + if err := signingKeyConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start signing key consumer") + } return internal.NewFederationSenderInternalAPI(federationSenderDB, cfg, rsAPI, federation, keyRing, stats, queues) } diff --git a/keyserver/consumers/eduserver.go b/keyserver/consumers/eduserver.go index c5eafe860..7a3886e73 100644 --- a/keyserver/consumers/eduserver.go +++ b/keyserver/consumers/eduserver.go @@ -69,12 +69,14 @@ func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) return nil } uploadReq := &api.PerformUploadDeviceKeysRequest{ - CrossSigningKeys: gomatrixserverlib.CrossSigningKeys{ - MasterKey: output.MasterKey, - SelfSigningKey: output.SelfSigningKey, - }, UserID: output.UserID, } + if output.MasterKey != nil { + uploadReq.MasterKey = *output.MasterKey + } + if output.SelfSigningKey != nil { + uploadReq.SelfSigningKey = *output.SelfSigningKey + } uploadRes := &api.PerformUploadDeviceKeysResponse{} s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes) return uploadRes.Error diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go index 4009dd459..3f6560117 100644 --- a/keyserver/internal/cross_signing.go +++ b/keyserver/internal/cross_signing.go @@ -23,6 +23,7 @@ import ( "fmt" "strings" + eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/types" "github.com/matrix-org/gomatrixserverlib" @@ -222,6 +223,23 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P } } } + + // Finally, generate a notification that we updated the keys. + update := eduserverAPI.SigningKeyUpdate{ + UserID: req.UserID, + } + if _, ok := toStore[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { + update.MasterKey = &req.MasterKey + } + if _, ok := toStore[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok { + update.SelfSigningKey = &req.SelfSigningKey + } + if err := a.CrossSigningProducer.ProduceSigningKeyUpdate(update); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.CrossSigningProducer.ProduceSigningKeyUpdate: %s", err), + } + return + } } func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req *api.PerformUploadDeviceSignaturesRequest, res *api.PerformUploadDeviceSignaturesResponse) { diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index de2699114..ba5c74fb0 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -35,12 +35,13 @@ import ( ) type KeyInternalAPI struct { - DB storage.Database - ThisServer gomatrixserverlib.ServerName - FedClient fedsenderapi.FederationClient - UserAPI userapi.UserInternalAPI - Producer *producers.KeyChange - Updater *DeviceListUpdater + DB storage.Database + ThisServer gomatrixserverlib.ServerName + FedClient fedsenderapi.FederationClient + UserAPI userapi.UserInternalAPI + DeviceKeysProducer *producers.KeyChange + CrossSigningProducer *producers.SigningKeyUpdate + Updater *DeviceListUpdater } func (a *KeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) { @@ -60,7 +61,7 @@ func (a *KeyInternalAPI) InputDeviceListUpdate( func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { if req.Partition < 0 { - req.Partition = a.Producer.DefaultPartition() + req.Partition = a.DeviceKeysProducer.DefaultPartition() } userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset, req.ToOffset) if err != nil { @@ -597,7 +598,7 @@ func (a *KeyInternalAPI) uploadLocalDeviceKeys(ctx context.Context, req *api.Per } return } - err = emitDeviceKeyChanges(a.Producer, existingKeys, keysToStore) + err = emitDeviceKeyChanges(a.DeviceKeysProducer, existingKeys, keysToStore) if err != nil { util.GetLogger(ctx).Errorf("Failed to emitDeviceKeyChanges: %s", err) } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index fcfe24de8..5aab15197 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -51,11 +51,16 @@ func NewInternalAPI( Producer: producer, DB: db, } + signingKeyUpdateProducer := &producers.SigningKeyUpdate{ + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate)), + Producer: producer, + } ap := &internal.KeyInternalAPI{ - DB: db, - ThisServer: cfg.Matrix.ServerName, - FedClient: fedClient, - Producer: keyChangeProducer, + DB: db, + ThisServer: cfg.Matrix.ServerName, + FedClient: fedClient, + DeviceKeysProducer: keyChangeProducer, + CrossSigningProducer: signingKeyUpdateProducer, } updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable ap.Updater = updater diff --git a/keyserver/producers/signingupdate.go b/keyserver/producers/signingupdate.go new file mode 100644 index 000000000..cfe1dde73 --- /dev/null +++ b/keyserver/producers/signingupdate.go @@ -0,0 +1,51 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/eduserver/api" +) + +type SigningKeyUpdate struct { + Topic string + Producer sarama.SyncProducer +} + +func (p *SigningKeyUpdate) DefaultPartition() int32 { + return 0 +} + +func (p *SigningKeyUpdate) ProduceSigningKeyUpdate(key api.SigningKeyUpdate) error { + var m sarama.ProducerMessage + output := &api.OutputSigningKeyUpdate{ + SigningKeyUpdate: key, + } + + value, err := json.Marshal(output) + if err != nil { + return err + } + + m.Topic = string(p.Topic) + m.Key = sarama.StringEncoder(key.UserID) + m.Value = sarama.ByteEncoder(value) + + _, _, err = p.Producer.SendMessage(&m) + + return err +}