From 061c3a9e43c432d6bd19d277cc10060ef1ef47ea Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Tue, 1 Feb 2022 19:08:05 +0100 Subject: [PATCH] Remove internal.ContinualConsumer from keyserver --- keyserver/api/api.go | 2 +- keyserver/consumers/cross_signing.go | 86 ++++++++++--------- keyserver/keyserver.go | 4 +- keyserver/storage/interface.go | 5 +- .../storage/postgres/key_changes_table.go | 4 +- .../storage/sqlite3/key_changes_table.go | 4 +- keyserver/storage/storage_test.go | 6 +- keyserver/storage/tables/interface.go | 2 +- 8 files changed, 56 insertions(+), 57 deletions(-) diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 0eea2f0fa..2d9caa1f7 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -228,7 +228,7 @@ type QueryKeyChangesRequest struct { // The offset of the last received key event, or sarama.OffsetOldest if this is from the beginning Offset int64 // The inclusive offset where to track key changes up to. Messages with this offset are included in the response. - // Use sarama.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing). + // Use jetstream.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing). ToOffset int64 } diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go index 4b2bd4a9a..a8a160e42 100644 --- a/keyserver/consumers/cross_signing.go +++ b/keyserver/consumers/cross_signing.go @@ -18,29 +18,29 @@ import ( "context" "encoding/json" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" - - "github.com/Shopify/sarama" ) type OutputCrossSigningKeyUpdateConsumer struct { - eduServerConsumer *internal.ContinualConsumer - keyDB storage.Database - keyAPI api.KeyInternalAPI - serverName string + ctx context.Context + keyDB storage.Database + keyAPI api.KeyInternalAPI + serverName string + jetstream nats.JetStreamContext + topic string } func NewOutputCrossSigningKeyUpdateConsumer( process *process.ProcessContext, cfg *config.Dendrite, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, keyDB storage.Database, keyAPI api.KeyInternalAPI, ) *OutputCrossSigningKeyUpdateConsumer { @@ -48,60 +48,59 @@ func NewOutputCrossSigningKeyUpdateConsumer( // topic. We will only produce events where the UserID matches our server name, // and we will only consume events where the UserID does NOT match our server // name (because the update came from a remote server). - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "keyserver/keyserver", - Topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), - Consumer: kafkaConsumer, - PartitionStore: keyDB, - } s := &OutputCrossSigningKeyUpdateConsumer{ - eduServerConsumer: &consumer, - keyDB: keyDB, - keyAPI: keyAPI, - serverName: string(cfg.Global.ServerName), + ctx: process.Context(), + keyDB: keyDB, + jetstream: js, + topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), + keyAPI: keyAPI, + serverName: string(cfg.Global.ServerName), } - consumer.ProcessMessage = s.onMessage return s } func (s *OutputCrossSigningKeyUpdateConsumer) Start() error { - return s.eduServerConsumer.Start() + _, err := s.jetstream.Subscribe( + s.topic, s.onMessage, + ) + return err } // onMessage is called in response to a message received on the // key change events topic from the key server. -func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { - var m api.DeviceMessage - if err := json.Unmarshal(msg.Value, &m); err != nil { - logrus.WithError(err).Errorf("failed to read device message from key change topic") - return nil - } - if m.OutputCrossSigningKeyUpdate == nil { - // This probably shouldn't happen but stops us from panicking if we come - // across an update that doesn't satisfy either types. - return nil - } - switch m.Type { - case api.TypeCrossSigningUpdate: - return t.onCrossSigningMessage(m) - default: - return nil - } +func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *nats.Msg) { + jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool { + var m api.DeviceMessage + if err := json.Unmarshal(msg.Data, &m); err != nil { + logrus.WithError(err).Errorf("failed to read device message from key change topic") + return true + } + if m.OutputCrossSigningKeyUpdate == nil { + // This probably shouldn't happen but stops us from panicking if we come + // across an update that doesn't satisfy either types. + return true + } + switch m.Type { + case api.TypeCrossSigningUpdate: + return t.onCrossSigningMessage(m) + default: + return true + } + }) } -func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) error { +func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { output := m.CrossSigningKeyUpdate _, host, err := gomatrixserverlib.SplitID('@', output.UserID) if err != nil { logrus.WithError(err).Errorf("eduserver output log: user ID parse failure") - return nil + return true } if host == gomatrixserverlib.ServerName(s.serverName) { // Ignore any messages that contain information about our own users, as // they already originated from this server. - return nil + return true } uploadReq := &api.PerformUploadDeviceKeysRequest{ UserID: output.UserID, @@ -114,5 +113,8 @@ func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.Device } uploadRes := &api.PerformUploadDeviceKeysResponse{} s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes) - return uploadRes.Error + if uploadRes.Error != nil { + return false + } + return true } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 8cc50ea0d..61ccc0303 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -40,7 +40,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { func NewInternalAPI( base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js := jetstream.Prepare(&cfg.Matrix.JetStream) db, err := storage.NewDatabase(&cfg.Database) if err != nil { @@ -66,7 +66,7 @@ func NewInternalAPI( }() keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer( - base.ProcessContext, base.Cfg, consumer, db, ap, + base.ProcessContext, base.Cfg, js, db, ap, ) if err := keyconsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer") diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go index 87feae47d..de298f979 100644 --- a/keyserver/storage/interface.go +++ b/keyserver/storage/interface.go @@ -18,15 +18,12 @@ import ( "context" "encoding/json" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/types" "github.com/matrix-org/gomatrixserverlib" ) type Database interface { - internal.PartitionStorer - // ExistingOneTimeKeys returns a map of keyIDWithAlgorithm to key JSON for the given parameters. If no keys exist with this combination // of user/device/key/algorithm 4-uple then it is omitted from the map. Returns an error when failing to communicate with the database. ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error) @@ -71,7 +68,7 @@ type Database interface { StoreKeyChange(ctx context.Context, userID string) (int64, error) // KeyChanges returns a list of user IDs who have modified their keys from the offset given (exclusive) to the offset given (inclusive). - // A to offset of sarama.OffsetNewest means no upper limit. + // A to offset of jetstream.OffsetNewest means no upper limit. // Returns the offset of the latest key change. KeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) diff --git a/keyserver/storage/postgres/key_changes_table.go b/keyserver/storage/postgres/key_changes_table.go index 20d227c24..d204784f1 100644 --- a/keyserver/storage/postgres/key_changes_table.go +++ b/keyserver/storage/postgres/key_changes_table.go @@ -19,9 +19,9 @@ import ( "database/sql" "math" - "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/storage/tables" + "github.com/matrix-org/dendrite/setup/jetstream" ) var keyChangesSchema = ` @@ -78,7 +78,7 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID strin func (s *keyChangesStatements) SelectKeyChanges( ctx context.Context, fromOffset, toOffset int64, ) (userIDs []string, latestOffset int64, err error) { - if toOffset == sarama.OffsetNewest { + if toOffset == jetstream.OffsetNewest { toOffset = math.MaxInt64 } latestOffset = fromOffset diff --git a/keyserver/storage/sqlite3/key_changes_table.go b/keyserver/storage/sqlite3/key_changes_table.go index d43c15ca9..6145b8c4f 100644 --- a/keyserver/storage/sqlite3/key_changes_table.go +++ b/keyserver/storage/sqlite3/key_changes_table.go @@ -19,9 +19,9 @@ import ( "database/sql" "math" - "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/storage/tables" + "github.com/matrix-org/dendrite/setup/jetstream" ) var keyChangesSchema = ` @@ -76,7 +76,7 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID strin func (s *keyChangesStatements) SelectKeyChanges( ctx context.Context, fromOffset, toOffset int64, ) (userIDs []string, latestOffset int64, err error) { - if toOffset == sarama.OffsetNewest { + if toOffset == jetstream.OffsetNewest { toOffset = math.MaxInt64 } latestOffset = fromOffset diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go index 2f8cf809b..5453a1eb0 100644 --- a/keyserver/storage/storage_test.go +++ b/keyserver/storage/storage_test.go @@ -9,9 +9,9 @@ import ( "reflect" "testing" - "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" ) var ctx = context.Background() @@ -50,7 +50,7 @@ func TestKeyChanges(t *testing.T) { MustNotError(t, err) deviceChangeIDC, err := db.StoreKeyChange(ctx, "@charlie:localhost") MustNotError(t, err) - userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, sarama.OffsetNewest) + userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, jetstream.OffsetNewest) if err != nil { t.Fatalf("Failed to KeyChanges: %s", err) } @@ -74,7 +74,7 @@ func TestKeyChangesNoDupes(t *testing.T) { } deviceChangeID, err := db.StoreKeyChange(ctx, "@alice:localhost") MustNotError(t, err) - userIDs, latest, err := db.KeyChanges(ctx, 0, sarama.OffsetNewest) + userIDs, latest, err := db.KeyChanges(ctx, 0, jetstream.OffsetNewest) if err != nil { t.Fatalf("Failed to KeyChanges: %s", err) } diff --git a/keyserver/storage/tables/interface.go b/keyserver/storage/tables/interface.go index 0d94c94cc..62f4ada65 100644 --- a/keyserver/storage/tables/interface.go +++ b/keyserver/storage/tables/interface.go @@ -46,7 +46,7 @@ type DeviceKeys interface { type KeyChanges interface { InsertKeyChange(ctx context.Context, userID string) (int64, error) // SelectKeyChanges returns the set (de-duplicated) of users who have changed their keys between the two offsets. - // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of sarama.OffsetNewest means no upper offset. + // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of jetstream.OffsetNewest means no upper offset. SelectKeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) Prepare() error