Try to notify sync

This commit is contained in:
Neil Alexander 2021-08-10 15:35:47 +01:00
parent 371d71bba8
commit b0d3df8fae
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
7 changed files with 156 additions and 8 deletions

View file

@ -51,7 +51,7 @@ type OutputReceiptEvent struct {
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
} }
// OutputSigningKeyUpdate is an entry in the signing key update output kafka log // OutputCrossSigningKeyUpdate is an entry in the signing key update output kafka log
type OutputSigningKeyUpdate struct { type OutputCrossSigningKeyUpdate struct {
CrossSigningKeyUpdate `json:"signing_keys"` CrossSigningKeyUpdate `json:"signing_keys"`
} }

View file

@ -85,7 +85,7 @@ func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate(
request *api.InputCrossSigningKeyUpdateRequest, request *api.InputCrossSigningKeyUpdateRequest,
response *api.InputCrossSigningKeyUpdateResponse, response *api.InputCrossSigningKeyUpdateResponse,
) error { ) error {
eventJSON, err := json.Marshal(&api.OutputSigningKeyUpdate{ eventJSON, err := json.Marshal(&api.OutputCrossSigningKeyUpdate{
CrossSigningKeyUpdate: request.CrossSigningKeyUpdate, CrossSigningKeyUpdate: request.CrossSigningKeyUpdate,
}) })
if err != nil { if err != nil {

View file

@ -51,7 +51,7 @@ func NewCrossSigningKeyUpdateConsumer(
c := &CrossSigningKeyUpdateConsumer{ c := &CrossSigningKeyUpdateConsumer{
consumer: &internal.ContinualConsumer{ consumer: &internal.ContinualConsumer{
Process: process, Process: process,
ComponentName: "federationsender/signingkeys", ComponentName: "federationsender/crosssigning",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)), Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)),
Consumer: kafkaConsumer, Consumer: kafkaConsumer,
PartitionStore: store, PartitionStore: store,
@ -74,7 +74,7 @@ func (t *CrossSigningKeyUpdateConsumer) Start() error {
} }
func (t *CrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (t *CrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var output eduapi.OutputSigningKeyUpdate var output eduapi.OutputCrossSigningKeyUpdate
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
logrus.WithError(err).Errorf("eduserver output log: message parse failure") logrus.WithError(err).Errorf("eduserver output log: message parse failure")
return nil return nil

View file

@ -32,7 +32,7 @@ func NewOutputCrossSigningKeyUpdateConsumer(
) *OutputCrossSigningKeyUpdateConsumer { ) *OutputCrossSigningKeyUpdateConsumer {
consumer := internal.ContinualConsumer{ consumer := internal.ContinualConsumer{
Process: process, Process: process,
ComponentName: "keyserver/eduserver", ComponentName: "keyserver/crosssigning",
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate), Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate),
Consumer: kafkaConsumer, Consumer: kafkaConsumer,
PartitionStore: keyDB, PartitionStore: keyDB,
@ -53,7 +53,7 @@ func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
} }
func (s *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var output eduapi.OutputSigningKeyUpdate var output eduapi.OutputCrossSigningKeyUpdate
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
logrus.WithError(err).Errorf("eduserver output log: message parse failure") logrus.WithError(err).Errorf("eduserver output log: message parse failure")
return nil return nil

View file

@ -33,7 +33,7 @@ func (p *CrossSigningKeyUpdate) DefaultPartition() int32 {
func (p *CrossSigningKeyUpdate) ProduceSigningKeyUpdate(key api.CrossSigningKeyUpdate) error { func (p *CrossSigningKeyUpdate) ProduceSigningKeyUpdate(key api.CrossSigningKeyUpdate) error {
var m sarama.ProducerMessage var m sarama.ProducerMessage
output := &api.OutputSigningKeyUpdate{ output := &api.OutputCrossSigningKeyUpdate{
CrossSigningKeyUpdate: key, CrossSigningKeyUpdate: key,
} }

View file

@ -0,0 +1,140 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"sync"
"github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
// OutputCrossSigningKeyUpdateConsumer consumes events that originated in the key server.
type OutputCrossSigningKeyUpdateConsumer struct {
keyChangeConsumer *internal.ContinualConsumer
db storage.Database
notifier *notifier.Notifier
stream types.PartitionedStreamProvider
serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.RoomserverInternalAPI
keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
}
// NewOutputCrossSigningKeyUpdateConsumer creates a new OutputCrossSigningKeyUpdateConsumer.
// Call Start() to begin consuming from the key server.
func NewOutputCrossSigningKeyUpdateConsumer(
process *process.ProcessContext,
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
keyAPI api.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database,
notifier *notifier.Notifier,
stream types.PartitionedStreamProvider,
) *OutputCrossSigningKeyUpdateConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/crosssigning",
Topic: topic,
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputCrossSigningKeyUpdateConsumer{
keyChangeConsumer: &consumer,
db: store,
serverName: serverName,
keyAPI: keyAPI,
rsAPI: rsAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from the key server
func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
offsets, err := s.keyChangeConsumer.StartOffsets()
s.partitionToOffsetMu.Lock()
for _, o := range offsets {
s.partitionToOffset[o.Partition] = o.Offset
}
s.partitionToOffsetMu.Unlock()
return err
}
func (s *OutputCrossSigningKeyUpdateConsumer) updateOffset(msg *sarama.ConsumerMessage) {
s.partitionToOffsetMu.Lock()
defer s.partitionToOffsetMu.Unlock()
s.partitionToOffset[msg.Partition] = msg.Offset
}
func (s *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
defer s.updateOffset(msg)
var output eduserverAPI.OutputCrossSigningKeyUpdate
if err := json.Unmarshal(msg.Value, &output); err != nil {
logrus.WithError(err).Errorf("eduserver output log: message parse failure")
return nil
}
logrus.Infof("XXX: Sync API consumed crosssigning key update for %s", output.UserID)
// work out who we need to notify about the new key
var queryRes roomserverAPI.QuerySharedUsersResponse
err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{
UserID: output.UserID,
}, &queryRes)
if err != nil {
log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err)
return err
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
posUpdate := types.LogPosition{
Offset: msg.Offset,
Partition: msg.Partition,
}
s.stream.Advance(posUpdate)
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
return nil
}

View file

@ -72,6 +72,14 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start key change consumer") logrus.WithError(err).Panicf("failed to start key change consumer")
} }
crossSigningKeyUpdateConsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
process, cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputCrossSigningKeyUpdate)),
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
)
if err = crossSigningKeyUpdateConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start cross-signing key change consumer")
}
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
process, cfg, consumer, syncDB, notifier, streams.PDUStreamProvider, process, cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI, streams.InviteStreamProvider, rsAPI,