From 64fc48f75691a14bef0c36c4f9e26f98eb6763fb Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 11 Aug 2021 11:31:41 +0100 Subject: [PATCH] Use key change topic --- eduserver/eduserver.go | 16 +-- eduserver/input/input.go | 16 ++- federationsender/consumers/cross_signing.go | 121 ----------------- federationsender/consumers/keychange.go | 56 ++++++++ federationsender/federationsender.go | 6 - keyserver/api/api.go | 6 +- keyserver/consumers/cross_signing.go | 23 +++- keyserver/internal/cross_signing.go | 4 +- keyserver/internal/internal.go | 17 ++- keyserver/keyserver.go | 13 +- keyserver/producers/cross_signing.go | 58 -------- keyserver/producers/keychange.go | 30 +++++ setup/config/config_kafka.go | 13 +- syncapi/consumers/cross_signing.go | 140 -------------------- syncapi/consumers/keychange.go | 54 ++++++-- syncapi/syncapi.go | 8 -- 16 files changed, 191 insertions(+), 390 deletions(-) delete mode 100644 federationsender/consumers/cross_signing.go delete mode 100644 keyserver/producers/cross_signing.go delete mode 100644 syncapi/consumers/cross_signing.go diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index f9a1e6053..7875e27f1 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -46,13 +46,13 @@ func NewInternalAPI( _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) return &input.EDUServerInputAPI{ - Cache: eduCache, - UserAPI: userAPI, - Producer: producer, - OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), - OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), - OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), - OutputCrossSigningKeyUpdateTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate), - ServerName: cfg.Matrix.ServerName, + Cache: eduCache, + UserAPI: userAPI, + Producer: producer, + OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), + OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), + OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), + OutputKeyChangeEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent), + ServerName: cfg.Matrix.ServerName, } } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index 238ede6de..bdc243745 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -24,6 +24,7 @@ import ( "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/cache" + keyapi "github.com/matrix-org/dendrite/keyserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -39,8 +40,8 @@ type EDUServerInputAPI struct { OutputSendToDeviceEventTopic string // The kafka topic to output new receipt events to OutputReceiptEventTopic string - // The kafka topic to output new signing key changes to - OutputCrossSigningKeyUpdateTopic string + // The kafka topic to output new key change events to + OutputKeyChangeEventTopic string // kafka producer Producer sarama.SyncProducer // Internal user query API @@ -85,8 +86,11 @@ func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate( request *api.InputCrossSigningKeyUpdateRequest, response *api.InputCrossSigningKeyUpdateResponse, ) error { - eventJSON, err := json.Marshal(&api.OutputCrossSigningKeyUpdate{ - CrossSigningKeyUpdate: request.CrossSigningKeyUpdate, + eventJSON, err := json.Marshal(&keyapi.DeviceMessage{ + Type: keyapi.TypeCrossSigningUpdate, + OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{ + CrossSigningKeyUpdate: request.CrossSigningKeyUpdate, + }, }) if err != nil { return err @@ -94,10 +98,10 @@ func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate( logrus.WithFields(logrus.Fields{ "user_id": request.UserID, - }).Infof("Producing to topic '%s'", t.OutputCrossSigningKeyUpdateTopic) + }).Infof("Producing to topic '%s'", t.OutputKeyChangeEventTopic) m := &sarama.ProducerMessage{ - Topic: string(t.OutputCrossSigningKeyUpdateTopic), + Topic: string(t.OutputKeyChangeEventTopic), Key: sarama.StringEncoder(request.UserID), Value: sarama.ByteEncoder(eventJSON), } diff --git a/federationsender/consumers/cross_signing.go b/federationsender/consumers/cross_signing.go deleted file mode 100644 index 6ed7b64fe..000000000 --- a/federationsender/consumers/cross_signing.go +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/Shopify/sarama" - eduapi "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/federationsender/queue" - "github.com/matrix-org/dendrite/federationsender/storage" - "github.com/matrix-org/dendrite/internal" - roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/process" - "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" - log "github.com/sirupsen/logrus" -) - -type CrossSigningKeyUpdateConsumer struct { - consumer *internal.ContinualConsumer - db storage.Database - queues *queue.OutgoingQueues - serverName gomatrixserverlib.ServerName - rsAPI roomserverAPI.RoomserverInternalAPI -} - -func NewCrossSigningKeyUpdateConsumer( - process *process.ProcessContext, - cfg *config.KeyServer, - kafkaConsumer sarama.Consumer, - queues *queue.OutgoingQueues, - store storage.Database, - rsAPI roomserverAPI.RoomserverInternalAPI, -) *CrossSigningKeyUpdateConsumer { - c := &CrossSigningKeyUpdateConsumer{ - consumer: &internal.ContinualConsumer{ - Process: process, - ComponentName: "federationsender/crosssigning", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)), - Consumer: kafkaConsumer, - PartitionStore: store, - }, - queues: queues, - db: store, - serverName: cfg.Matrix.ServerName, - rsAPI: rsAPI, - } - c.consumer.ProcessMessage = c.onMessage - - return c -} - -func (t *CrossSigningKeyUpdateConsumer) Start() error { - if err := t.consumer.Start(); err != nil { - return fmt.Errorf("t.consumer.Start: %w", err) - } - return nil -} - -func (t *CrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { - var output eduapi.OutputCrossSigningKeyUpdate - if err := json.Unmarshal(msg.Value, &output); err != nil { - logrus.WithError(err).Errorf("eduserver output log: message parse failure") - return nil - } - _, host, err := gomatrixserverlib.SplitID('@', output.UserID) - if err != nil { - logrus.WithError(err).Errorf("eduserver output log: user ID parse failure") - return nil - } - if host != gomatrixserverlib.ServerName(t.serverName) { - // Ignore any messages that didn't originate locally, otherwise we'll - // end up parroting information we received from other servers. - return nil - } - logger := log.WithField("user_id", output.UserID) - - var queryRes roomserverAPI.QueryRoomsForUserResponse - err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{ - UserID: output.UserID, - WantMembership: "join", - }, &queryRes) - if err != nil { - logger.WithError(err).Error("failed to calculate joined rooms for user") - return nil - } - // send this key change to all servers who share rooms with this user. - destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs) - if err != nil { - logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in") - return nil - } - - // Pack the EDU and marshal it - edu := &gomatrixserverlib.EDU{ - Type: eduapi.MSigningKeyUpdate, - Origin: string(t.serverName), - } - if edu.Content, err = json.Marshal(output.CrossSigningKeyUpdate); err != nil { - return err - } - - logger.Infof("Sending cross-signing update message to %q", destinations) - return t.queues.SendEDU(edu, t.serverName, destinations) -} diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index 9e146390a..e5f1c9724 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/Shopify/sarama" + eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal" @@ -28,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) @@ -83,6 +85,17 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error { log.WithError(err).Errorf("failed to read device message from key change topic") return nil } + switch m.Type { + case api.TypeCrossSigningUpdate: + return t.onCrossSigningMessage(m) + case api.TypeDeviceKeyUpdate: + fallthrough + default: + return t.onDeviceKeyMessage(m) + } +} + +func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error { logger := log.WithField("user_id", m.UserID) // only send key change events which originated from us @@ -133,6 +146,49 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error { return t.queues.SendEDU(edu, t.serverName, destinations) } +func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error { + output := m.CrossSigningKeyUpdate + _, host, err := gomatrixserverlib.SplitID('@', output.UserID) + if err != nil { + logrus.WithError(err).Errorf("eduserver output log: user ID parse failure") + return nil + } + if host != gomatrixserverlib.ServerName(t.serverName) { + // Ignore any messages that didn't originate locally, otherwise we'll + // end up parroting information we received from other servers. + return nil + } + logger := log.WithField("user_id", output.UserID) + + var queryRes roomserverAPI.QueryRoomsForUserResponse + err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{ + UserID: output.UserID, + WantMembership: "join", + }, &queryRes) + if err != nil { + logger.WithError(err).Error("failed to calculate joined rooms for user") + return nil + } + // send this key change to all servers who share rooms with this user. + destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs) + if err != nil { + logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in") + return nil + } + + // Pack the EDU and marshal it + edu := &gomatrixserverlib.EDU{ + Type: eduserverAPI.MSigningKeyUpdate, + Origin: string(t.serverName), + } + if edu.Content, err = json.Marshal(output); err != nil { + return err + } + + logger.Infof("Sending cross-signing update message to %q", destinations) + return t.queues.SendEDU(edu, t.serverName, destinations) +} + func prevID(streamID int) []int { if streamID <= 1 { return nil diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index eb8be81c5..0732c5d38 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -94,12 +94,6 @@ func NewInternalAPI( if err := keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") } - signingKeyConsumer := consumers.NewCrossSigningKeyUpdateConsumer( - base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI, - ) - if err := signingKeyConsumer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start signing key consumer") - } return internal.NewFederationSenderInternalAPI(federationSenderDB, cfg, rsAPI, federation, keyRing, stats, queues) } diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 490f0e41c..449c42969 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -20,6 +20,7 @@ import ( "strings" "time" + eduapi "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/keyserver/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -62,8 +63,9 @@ const ( // DeviceMessage represents the message produced into Kafka by the key server. type DeviceMessage struct { - Type DeviceMessageType `json:"Type,omitempty"` - *DeviceKeys `json:"DeviceKeys,omitempty"` + Type DeviceMessageType `json:"Type,omitempty"` + *DeviceKeys `json:"DeviceKeys,omitempty"` + *eduapi.OutputCrossSigningKeyUpdate `json:"CrossSigningKeyUpdate,omitempty"` // A monotonically increasing number which represents device changes for this user. StreamID int } diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go index c1e5e3749..6e1488af3 100644 --- a/keyserver/consumers/cross_signing.go +++ b/keyserver/consumers/cross_signing.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" - eduapi "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage" @@ -33,7 +32,7 @@ func NewOutputCrossSigningKeyUpdateConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "keyserver/crosssigning", - Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate), + Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputKeyChangeEvent), Consumer: kafkaConsumer, PartitionStore: keyDB, } @@ -52,12 +51,24 @@ func (s *OutputCrossSigningKeyUpdateConsumer) Start() error { return s.eduServerConsumer.Start() } -func (s *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { - var output eduapi.OutputCrossSigningKeyUpdate - if err := json.Unmarshal(msg.Value, &output); err != nil { - logrus.WithError(err).Errorf("eduserver output log: message parse failure") +// onMessage is called in response to a message received on the +// key change events topic from the key server. +func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { + var m api.DeviceMessage + if err := json.Unmarshal(msg.Value, &m); err != nil { + logrus.WithError(err).Errorf("failed to read device message from key change topic") return nil } + switch m.Type { + case api.TypeCrossSigningUpdate: + return t.onCrossSigningMessage(m) + default: + return nil + } +} + +func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) error { + output := m.CrossSigningKeyUpdate _, host, err := gomatrixserverlib.SplitID('@', output.UserID) if err != nil { logrus.WithError(err).Errorf("eduserver output log: user ID parse failure") diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go index 77d9da21b..216cda186 100644 --- a/keyserver/internal/cross_signing.go +++ b/keyserver/internal/cross_signing.go @@ -235,9 +235,9 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P if _, ok := toStore[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok { update.SelfSigningKey = &req.SelfSigningKey } - if err := a.CrossSigningProducer.ProduceSigningKeyUpdate(update); err != nil { + if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { res.Error = &api.KeyError{ - Err: fmt.Sprintf("a.CrossSigningProducer.ProduceSigningKeyUpdate: %s", err), + Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), } return } diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index 77a7d4eda..47eda1798 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -35,13 +35,12 @@ import ( ) type KeyInternalAPI struct { - DB storage.Database - ThisServer gomatrixserverlib.ServerName - FedClient fedsenderapi.FederationClient - UserAPI userapi.UserInternalAPI - DeviceKeysProducer *producers.KeyChange - CrossSigningProducer *producers.CrossSigningKeyUpdate - Updater *DeviceListUpdater + DB storage.Database + ThisServer gomatrixserverlib.ServerName + FedClient fedsenderapi.FederationClient + UserAPI userapi.UserInternalAPI + Producer *producers.KeyChange + Updater *DeviceListUpdater } func (a *KeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) { @@ -61,7 +60,7 @@ func (a *KeyInternalAPI) InputDeviceListUpdate( func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { if req.Partition < 0 { - req.Partition = a.DeviceKeysProducer.DefaultPartition() + req.Partition = a.Producer.DefaultPartition() } userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset, req.ToOffset) if err != nil { @@ -599,7 +598,7 @@ func (a *KeyInternalAPI) uploadLocalDeviceKeys(ctx context.Context, req *api.Per } return } - err = emitDeviceKeyChanges(a.DeviceKeysProducer, existingKeys, keysToStore) + err = emitDeviceKeyChanges(a.Producer, existingKeys, keysToStore) if err != nil { util.GetLogger(ctx).Errorf("Failed to emitDeviceKeyChanges: %s", err) } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index e17c6fa31..603067552 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -51,16 +51,11 @@ func NewInternalAPI( Producer: producer, DB: db, } - crossSigningKeyUpdateProducer := &producers.CrossSigningKeyUpdate{ - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)), - Producer: producer, - } ap := &internal.KeyInternalAPI{ - DB: db, - ThisServer: cfg.Matrix.ServerName, - FedClient: fedClient, - DeviceKeysProducer: keyChangeProducer, - CrossSigningProducer: crossSigningKeyUpdateProducer, + DB: db, + ThisServer: cfg.Matrix.ServerName, + FedClient: fedClient, + Producer: keyChangeProducer, } updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable ap.Updater = updater diff --git a/keyserver/producers/cross_signing.go b/keyserver/producers/cross_signing.go deleted file mode 100644 index bf2a33231..000000000 --- a/keyserver/producers/cross_signing.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package producers - -import ( - "encoding/json" - - "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/sirupsen/logrus" -) - -type CrossSigningKeyUpdate struct { - Topic string - Producer sarama.SyncProducer -} - -func (p *CrossSigningKeyUpdate) DefaultPartition() int32 { - return 0 -} - -func (p *CrossSigningKeyUpdate) ProduceSigningKeyUpdate(key api.CrossSigningKeyUpdate) error { - var m sarama.ProducerMessage - output := &api.OutputCrossSigningKeyUpdate{ - CrossSigningKeyUpdate: key, - } - - value, err := json.Marshal(output) - if err != nil { - return err - } - - m.Topic = string(p.Topic) - m.Key = sarama.StringEncoder(key.UserID) - m.Value = sarama.ByteEncoder(value) - - _, _, err = p.Producer.SendMessage(&m) - if err != nil { - return err - } - - logrus.WithFields(logrus.Fields{ - "user_id": key.UserID, - }).Infof("Produced to cross-signing update topic '%s'", p.Topic) - return nil -} diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go index 0fe21d8b1..34cd49422 100644 --- a/keyserver/producers/keychange.go +++ b/keyserver/producers/keychange.go @@ -19,6 +19,7 @@ import ( "encoding/json" "github.com/Shopify/sarama" + eduapi "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage" "github.com/sirupsen/logrus" @@ -73,3 +74,32 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error { } return nil } + +func (p *KeyChange) ProduceSigningKeyUpdate(key eduapi.CrossSigningKeyUpdate) error { + var m sarama.ProducerMessage + output := &api.DeviceMessage{ + Type: api.TypeCrossSigningUpdate, + OutputCrossSigningKeyUpdate: &eduapi.OutputCrossSigningKeyUpdate{ + CrossSigningKeyUpdate: key, + }, + } + + value, err := json.Marshal(output) + if err != nil { + return err + } + + m.Topic = string(p.Topic) + m.Key = sarama.StringEncoder(key.UserID) + m.Value = sarama.ByteEncoder(value) + + _, _, err = p.Producer.SendMessage(&m) + if err != nil { + return err + } + + logrus.WithFields(logrus.Fields{ + "user_id": key.UserID, + }).Infof("Produced to cross-signing update topic '%s'", p.Topic) + return nil +} diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go index d25ab33e6..361914287 100644 --- a/setup/config/config_kafka.go +++ b/setup/config/config_kafka.go @@ -4,13 +4,12 @@ import "fmt" // Defined Kafka topics. const ( - TopicOutputTypingEvent = "OutputTypingEvent" - TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent" - TopicOutputKeyChangeEvent = "OutputKeyChangeEvent" - TopicOutputRoomEvent = "OutputRoomEvent" - TopicOutputClientData = "OutputClientData" - TopicOutputReceiptEvent = "OutputReceiptEvent" - TopicOutputCrossSigningKeyUpdate = "OutputCrossSigningKeyUpdate" + TopicOutputTypingEvent = "OutputTypingEvent" + TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent" + TopicOutputKeyChangeEvent = "OutputKeyChangeEvent" + TopicOutputRoomEvent = "OutputRoomEvent" + TopicOutputClientData = "OutputClientData" + TopicOutputReceiptEvent = "OutputReceiptEvent" ) type Kafka struct { diff --git a/syncapi/consumers/cross_signing.go b/syncapi/consumers/cross_signing.go deleted file mode 100644 index 0d259007a..000000000 --- a/syncapi/consumers/cross_signing.go +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "encoding/json" - "sync" - - "github.com/Shopify/sarama" - "github.com/getsentry/sentry-go" - eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/keyserver/api" - roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup/process" - "github.com/matrix-org/dendrite/syncapi/notifier" - "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" - log "github.com/sirupsen/logrus" -) - -// OutputCrossSigningKeyUpdateConsumer consumes events that originated in the key server. -type OutputCrossSigningKeyUpdateConsumer struct { - keyChangeConsumer *internal.ContinualConsumer - db storage.Database - notifier *notifier.Notifier - stream types.PartitionedStreamProvider - serverName gomatrixserverlib.ServerName // our server name - rsAPI roomserverAPI.RoomserverInternalAPI - keyAPI api.KeyInternalAPI - partitionToOffset map[int32]int64 - partitionToOffsetMu sync.Mutex -} - -// NewOutputCrossSigningKeyUpdateConsumer creates a new OutputCrossSigningKeyUpdateConsumer. -// Call Start() to begin consuming from the key server. -func NewOutputCrossSigningKeyUpdateConsumer( - process *process.ProcessContext, - serverName gomatrixserverlib.ServerName, - topic string, - kafkaConsumer sarama.Consumer, - keyAPI api.KeyInternalAPI, - rsAPI roomserverAPI.RoomserverInternalAPI, - store storage.Database, - notifier *notifier.Notifier, - stream types.PartitionedStreamProvider, -) *OutputCrossSigningKeyUpdateConsumer { - - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "syncapi/crosssigning", - Topic: topic, - Consumer: kafkaConsumer, - PartitionStore: store, - } - - s := &OutputCrossSigningKeyUpdateConsumer{ - keyChangeConsumer: &consumer, - db: store, - serverName: serverName, - keyAPI: keyAPI, - rsAPI: rsAPI, - partitionToOffset: make(map[int32]int64), - partitionToOffsetMu: sync.Mutex{}, - notifier: notifier, - stream: stream, - } - - consumer.ProcessMessage = s.onMessage - - return s -} - -// Start consuming from the key server -func (s *OutputCrossSigningKeyUpdateConsumer) Start() error { - offsets, err := s.keyChangeConsumer.StartOffsets() - s.partitionToOffsetMu.Lock() - for _, o := range offsets { - s.partitionToOffset[o.Partition] = o.Offset - } - s.partitionToOffsetMu.Unlock() - return err -} - -func (s *OutputCrossSigningKeyUpdateConsumer) updateOffset(msg *sarama.ConsumerMessage) { - s.partitionToOffsetMu.Lock() - defer s.partitionToOffsetMu.Unlock() - s.partitionToOffset[msg.Partition] = msg.Offset -} - -func (s *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { - defer s.updateOffset(msg) - - var output eduserverAPI.OutputCrossSigningKeyUpdate - if err := json.Unmarshal(msg.Value, &output); err != nil { - logrus.WithError(err).Errorf("eduserver output log: message parse failure") - return nil - } - - logrus.Infof("XXX: Sync API consumed crosssigning key update for %s", output.UserID) - - // work out who we need to notify about the new key - var queryRes roomserverAPI.QuerySharedUsersResponse - err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{ - UserID: output.UserID, - }, &queryRes) - if err != nil { - log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server") - sentry.CaptureException(err) - return err - } - // make sure we get our own key updates too! - queryRes.UserIDsToCount[output.UserID] = 1 - posUpdate := types.LogPosition{ - Offset: msg.Offset, - Partition: msg.Partition, - } - - s.stream.Advance(posUpdate) - for userID := range queryRes.UserIDsToCount { - s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) - } - - return nil -} diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 0d2ecd449..05fcf37d9 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -29,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) @@ -104,13 +105,23 @@ func (s *OutputKeyChangeEventConsumer) updateOffset(msg *sarama.ConsumerMessage) func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { defer s.updateOffset(msg) - var output api.DeviceMessage - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Error("syncapi: failed to unmarshal key change event from key server") - sentry.CaptureException(err) - return err + var m api.DeviceMessage + if err := json.Unmarshal(msg.Value, &m); err != nil { + logrus.WithError(err).Errorf("failed to read device message from key change topic") + return nil } + switch m.Type { + case api.TypeCrossSigningUpdate: + return s.onCrossSigningMessage(m, msg.Offset, msg.Partition) + case api.TypeDeviceKeyUpdate: + fallthrough + default: + return s.onDeviceKeyMessage(m, msg.Offset, msg.Partition) + } +} + +func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64, partition int32) error { + output := m.DeviceKeys // work out who we need to notify about the new key var queryRes roomserverAPI.QuerySharedUsersResponse err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{ @@ -124,8 +135,35 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er // make sure we get our own key updates too! queryRes.UserIDsToCount[output.UserID] = 1 posUpdate := types.LogPosition{ - Offset: msg.Offset, - Partition: msg.Partition, + Offset: offset, + Partition: partition, + } + + s.stream.Advance(posUpdate) + for userID := range queryRes.UserIDsToCount { + s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) + } + + return nil +} + +func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, offset int64, partition int32) error { + output := m.CrossSigningKeyUpdate + // work out who we need to notify about the new key + var queryRes roomserverAPI.QuerySharedUsersResponse + err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{ + UserID: output.UserID, + }, &queryRes) + if err != nil { + log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server") + sentry.CaptureException(err) + return err + } + // make sure we get our own key updates too! + queryRes.UserIDsToCount[output.UserID] = 1 + posUpdate := types.LogPosition{ + Offset: offset, + Partition: partition, } s.stream.Advance(posUpdate) diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 2ecbf1ce9..84c7140ca 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -72,14 +72,6 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start key change consumer") } - crossSigningKeyUpdateConsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer( - process, cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)), - consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, - ) - if err = crossSigningKeyUpdateConsumer.Start(); err != nil { - logrus.WithError(err).Panicf("failed to start cross-signing key change consumer") - } - roomConsumer := consumers.NewOutputRoomEventConsumer( process, cfg, consumer, syncDB, notifier, streams.PDUStreamProvider, streams.InviteStreamProvider, rsAPI,