Remove internal.ContinualConsumer from keyserver

This commit is contained in:
Till Faelligen 2022-02-01 19:08:05 +01:00
parent dac2055579
commit 061c3a9e43
8 changed files with 56 additions and 57 deletions

View file

@ -228,7 +228,7 @@ type QueryKeyChangesRequest struct {
// The offset of the last received key event, or sarama.OffsetOldest if this is from the beginning
Offset int64
// The inclusive offset where to track key changes up to. Messages with this offset are included in the response.
// Use sarama.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing).
// Use jetstream.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing).
ToOffset int64
}

View file

@ -18,29 +18,29 @@ import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"github.com/Shopify/sarama"
)
type OutputCrossSigningKeyUpdateConsumer struct {
eduServerConsumer *internal.ContinualConsumer
keyDB storage.Database
keyAPI api.KeyInternalAPI
serverName string
ctx context.Context
keyDB storage.Database
keyAPI api.KeyInternalAPI
serverName string
jetstream nats.JetStreamContext
topic string
}
func NewOutputCrossSigningKeyUpdateConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
js nats.JetStreamContext,
keyDB storage.Database,
keyAPI api.KeyInternalAPI,
) *OutputCrossSigningKeyUpdateConsumer {
@ -48,60 +48,59 @@ func NewOutputCrossSigningKeyUpdateConsumer(
// topic. We will only produce events where the UserID matches our server name,
// and we will only consume events where the UserID does NOT match our server
// name (because the update came from a remote server).
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "keyserver/keyserver",
Topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
Consumer: kafkaConsumer,
PartitionStore: keyDB,
}
s := &OutputCrossSigningKeyUpdateConsumer{
eduServerConsumer: &consumer,
keyDB: keyDB,
keyAPI: keyAPI,
serverName: string(cfg.Global.ServerName),
ctx: process.Context(),
keyDB: keyDB,
jetstream: js,
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
keyAPI: keyAPI,
serverName: string(cfg.Global.ServerName),
}
consumer.ProcessMessage = s.onMessage
return s
}
func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
return s.eduServerConsumer.Start()
_, err := s.jetstream.Subscribe(
s.topic, s.onMessage,
)
return err
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var m api.DeviceMessage
if err := json.Unmarshal(msg.Value, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
return nil
}
if m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
return nil
}
switch m.Type {
case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m)
default:
return nil
}
func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
var m api.DeviceMessage
if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
return true
}
if m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
return true
}
switch m.Type {
case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m)
default:
return true
}
})
}
func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
output := m.CrossSigningKeyUpdate
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
logrus.WithError(err).Errorf("eduserver output log: user ID parse failure")
return nil
return true
}
if host == gomatrixserverlib.ServerName(s.serverName) {
// Ignore any messages that contain information about our own users, as
// they already originated from this server.
return nil
return true
}
uploadReq := &api.PerformUploadDeviceKeysRequest{
UserID: output.UserID,
@ -114,5 +113,8 @@ func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.Device
}
uploadRes := &api.PerformUploadDeviceKeysResponse{}
s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes)
return uploadRes.Error
if uploadRes.Error != nil {
return false
}
return true
}

View file

@ -40,7 +40,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
func NewInternalAPI(
base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
) api.KeyInternalAPI {
js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
js := jetstream.Prepare(&cfg.Matrix.JetStream)
db, err := storage.NewDatabase(&cfg.Database)
if err != nil {
@ -66,7 +66,7 @@ func NewInternalAPI(
}()
keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
base.ProcessContext, base.Cfg, consumer, db, ap,
base.ProcessContext, base.Cfg, js, db, ap,
)
if err := keyconsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer")

View file

@ -18,15 +18,12 @@ import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
internal.PartitionStorer
// ExistingOneTimeKeys returns a map of keyIDWithAlgorithm to key JSON for the given parameters. If no keys exist with this combination
// of user/device/key/algorithm 4-uple then it is omitted from the map. Returns an error when failing to communicate with the database.
ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error)
@ -71,7 +68,7 @@ type Database interface {
StoreKeyChange(ctx context.Context, userID string) (int64, error)
// KeyChanges returns a list of user IDs who have modified their keys from the offset given (exclusive) to the offset given (inclusive).
// A to offset of sarama.OffsetNewest means no upper limit.
// A to offset of jetstream.OffsetNewest means no upper limit.
// Returns the offset of the latest key change.
KeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)

View file

@ -19,9 +19,9 @@ import (
"database/sql"
"math"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
"github.com/matrix-org/dendrite/setup/jetstream"
)
var keyChangesSchema = `
@ -78,7 +78,7 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID strin
func (s *keyChangesStatements) SelectKeyChanges(
ctx context.Context, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
if toOffset == sarama.OffsetNewest {
if toOffset == jetstream.OffsetNewest {
toOffset = math.MaxInt64
}
latestOffset = fromOffset

View file

@ -19,9 +19,9 @@ import (
"database/sql"
"math"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
"github.com/matrix-org/dendrite/setup/jetstream"
)
var keyChangesSchema = `
@ -76,7 +76,7 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID strin
func (s *keyChangesStatements) SelectKeyChanges(
ctx context.Context, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
if toOffset == sarama.OffsetNewest {
if toOffset == jetstream.OffsetNewest {
toOffset = math.MaxInt64
}
latestOffset = fromOffset

View file

@ -9,9 +9,9 @@ import (
"reflect"
"testing"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
)
var ctx = context.Background()
@ -50,7 +50,7 @@ func TestKeyChanges(t *testing.T) {
MustNotError(t, err)
deviceChangeIDC, err := db.StoreKeyChange(ctx, "@charlie:localhost")
MustNotError(t, err)
userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, sarama.OffsetNewest)
userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, jetstream.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
@ -74,7 +74,7 @@ func TestKeyChangesNoDupes(t *testing.T) {
}
deviceChangeID, err := db.StoreKeyChange(ctx, "@alice:localhost")
MustNotError(t, err)
userIDs, latest, err := db.KeyChanges(ctx, 0, sarama.OffsetNewest)
userIDs, latest, err := db.KeyChanges(ctx, 0, jetstream.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}

View file

@ -46,7 +46,7 @@ type DeviceKeys interface {
type KeyChanges interface {
InsertKeyChange(ctx context.Context, userID string) (int64, error)
// SelectKeyChanges returns the set (de-duplicated) of users who have changed their keys between the two offsets.
// Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of sarama.OffsetNewest means no upper offset.
// Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of jetstream.OffsetNewest means no upper offset.
SelectKeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)
Prepare() error