From b0d3df8faef340d32bbef72ec4e40244ceda3b55 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 10 Aug 2021 15:35:47 +0100 Subject: [PATCH] Try to notify sync --- eduserver/api/output.go | 4 +- eduserver/input/input.go | 2 +- federationsender/consumers/cross_signing.go | 4 +- keyserver/consumers/cross_signing.go | 4 +- keyserver/producers/cross_signing.go | 2 +- syncapi/consumers/cross_signing.go | 140 ++++++++++++++++++++ syncapi/syncapi.go | 8 ++ 7 files changed, 156 insertions(+), 8 deletions(-) create mode 100644 syncapi/consumers/cross_signing.go diff --git a/eduserver/api/output.go b/eduserver/api/output.go index ce99fe914..c6de4e01c 100644 --- a/eduserver/api/output.go +++ b/eduserver/api/output.go @@ -51,7 +51,7 @@ type OutputReceiptEvent struct { Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` } -// OutputSigningKeyUpdate is an entry in the signing key update output kafka log -type OutputSigningKeyUpdate struct { +// OutputCrossSigningKeyUpdate is an entry in the signing key update output kafka log +type OutputCrossSigningKeyUpdate struct { CrossSigningKeyUpdate `json:"signing_keys"` } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index 86faabd7f..238ede6de 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -85,7 +85,7 @@ func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate( request *api.InputCrossSigningKeyUpdateRequest, response *api.InputCrossSigningKeyUpdateResponse, ) error { - eventJSON, err := json.Marshal(&api.OutputSigningKeyUpdate{ + eventJSON, err := json.Marshal(&api.OutputCrossSigningKeyUpdate{ CrossSigningKeyUpdate: request.CrossSigningKeyUpdate, }) if err != nil { diff --git a/federationsender/consumers/cross_signing.go b/federationsender/consumers/cross_signing.go index f9b3c143a..6ed7b64fe 100644 --- a/federationsender/consumers/cross_signing.go +++ b/federationsender/consumers/cross_signing.go @@ -51,7 +51,7 @@ func NewCrossSigningKeyUpdateConsumer( c := &CrossSigningKeyUpdateConsumer{ consumer: &internal.ContinualConsumer{ Process: process, - ComponentName: "federationsender/signingkeys", + ComponentName: "federationsender/crosssigning", Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)), Consumer: kafkaConsumer, PartitionStore: store, @@ -74,7 +74,7 @@ func (t *CrossSigningKeyUpdateConsumer) Start() error { } func (t *CrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { - var output eduapi.OutputSigningKeyUpdate + var output eduapi.OutputCrossSigningKeyUpdate if err := json.Unmarshal(msg.Value, &output); err != nil { logrus.WithError(err).Errorf("eduserver output log: message parse failure") return nil diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go index 38a801be0..c1e5e3749 100644 --- a/keyserver/consumers/cross_signing.go +++ b/keyserver/consumers/cross_signing.go @@ -32,7 +32,7 @@ func NewOutputCrossSigningKeyUpdateConsumer( ) *OutputCrossSigningKeyUpdateConsumer { consumer := internal.ContinualConsumer{ Process: process, - ComponentName: "keyserver/eduserver", + ComponentName: "keyserver/crosssigning", Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate), Consumer: kafkaConsumer, PartitionStore: keyDB, @@ -53,7 +53,7 @@ func (s *OutputCrossSigningKeyUpdateConsumer) Start() error { } func (s *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { - var output eduapi.OutputSigningKeyUpdate + var output eduapi.OutputCrossSigningKeyUpdate if err := json.Unmarshal(msg.Value, &output); err != nil { logrus.WithError(err).Errorf("eduserver output log: message parse failure") return nil diff --git a/keyserver/producers/cross_signing.go b/keyserver/producers/cross_signing.go index 3142c8a98..bf2a33231 100644 --- a/keyserver/producers/cross_signing.go +++ b/keyserver/producers/cross_signing.go @@ -33,7 +33,7 @@ func (p *CrossSigningKeyUpdate) DefaultPartition() int32 { func (p *CrossSigningKeyUpdate) ProduceSigningKeyUpdate(key api.CrossSigningKeyUpdate) error { var m sarama.ProducerMessage - output := &api.OutputSigningKeyUpdate{ + output := &api.OutputCrossSigningKeyUpdate{ CrossSigningKeyUpdate: key, } diff --git a/syncapi/consumers/cross_signing.go b/syncapi/consumers/cross_signing.go new file mode 100644 index 000000000..0d259007a --- /dev/null +++ b/syncapi/consumers/cross_signing.go @@ -0,0 +1,140 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + "sync" + + "github.com/Shopify/sarama" + "github.com/getsentry/sentry-go" + eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/keyserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" +) + +// OutputCrossSigningKeyUpdateConsumer consumes events that originated in the key server. +type OutputCrossSigningKeyUpdateConsumer struct { + keyChangeConsumer *internal.ContinualConsumer + db storage.Database + notifier *notifier.Notifier + stream types.PartitionedStreamProvider + serverName gomatrixserverlib.ServerName // our server name + rsAPI roomserverAPI.RoomserverInternalAPI + keyAPI api.KeyInternalAPI + partitionToOffset map[int32]int64 + partitionToOffsetMu sync.Mutex +} + +// NewOutputCrossSigningKeyUpdateConsumer creates a new OutputCrossSigningKeyUpdateConsumer. +// Call Start() to begin consuming from the key server. +func NewOutputCrossSigningKeyUpdateConsumer( + process *process.ProcessContext, + serverName gomatrixserverlib.ServerName, + topic string, + kafkaConsumer sarama.Consumer, + keyAPI api.KeyInternalAPI, + rsAPI roomserverAPI.RoomserverInternalAPI, + store storage.Database, + notifier *notifier.Notifier, + stream types.PartitionedStreamProvider, +) *OutputCrossSigningKeyUpdateConsumer { + + consumer := internal.ContinualConsumer{ + Process: process, + ComponentName: "syncapi/crosssigning", + Topic: topic, + Consumer: kafkaConsumer, + PartitionStore: store, + } + + s := &OutputCrossSigningKeyUpdateConsumer{ + keyChangeConsumer: &consumer, + db: store, + serverName: serverName, + keyAPI: keyAPI, + rsAPI: rsAPI, + partitionToOffset: make(map[int32]int64), + partitionToOffsetMu: sync.Mutex{}, + notifier: notifier, + stream: stream, + } + + consumer.ProcessMessage = s.onMessage + + return s +} + +// Start consuming from the key server +func (s *OutputCrossSigningKeyUpdateConsumer) Start() error { + offsets, err := s.keyChangeConsumer.StartOffsets() + s.partitionToOffsetMu.Lock() + for _, o := range offsets { + s.partitionToOffset[o.Partition] = o.Offset + } + s.partitionToOffsetMu.Unlock() + return err +} + +func (s *OutputCrossSigningKeyUpdateConsumer) updateOffset(msg *sarama.ConsumerMessage) { + s.partitionToOffsetMu.Lock() + defer s.partitionToOffsetMu.Unlock() + s.partitionToOffset[msg.Partition] = msg.Offset +} + +func (s *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { + defer s.updateOffset(msg) + + var output eduserverAPI.OutputCrossSigningKeyUpdate + if err := json.Unmarshal(msg.Value, &output); err != nil { + logrus.WithError(err).Errorf("eduserver output log: message parse failure") + return nil + } + + logrus.Infof("XXX: Sync API consumed crosssigning key update for %s", output.UserID) + + // work out who we need to notify about the new key + var queryRes roomserverAPI.QuerySharedUsersResponse + err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{ + UserID: output.UserID, + }, &queryRes) + if err != nil { + log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server") + sentry.CaptureException(err) + return err + } + // make sure we get our own key updates too! + queryRes.UserIDsToCount[output.UserID] = 1 + posUpdate := types.LogPosition{ + Offset: msg.Offset, + Partition: msg.Partition, + } + + s.stream.Advance(posUpdate) + for userID := range queryRes.UserIDsToCount { + s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) + } + + return nil +} diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 84c7140ca..2ecbf1ce9 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -72,6 +72,14 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start key change consumer") } + crossSigningKeyUpdateConsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer( + process, cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)), + consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, + ) + if err = crossSigningKeyUpdateConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start cross-signing key change consumer") + } + roomConsumer := consumers.NewOutputRoomEventConsumer( process, cfg, consumer, syncDB, notifier, streams.PDUStreamProvider, streams.InviteStreamProvider, rsAPI,