From 8336ce972ec75b07b62f7b382c0e1e405d549545 Mon Sep 17 00:00:00 2001 From: S7evinK <2353100+S7evinK@users.noreply.github.com> Date: Mon, 21 Mar 2022 10:47:41 +0100 Subject: [PATCH 1/2] Remove unused partition_offset_table (#2288) --- appservice/storage/postgres/storage.go | 4 - appservice/storage/sqlite3/storage.go | 4 - federationapi/storage/postgres/storage.go | 4 - federationapi/storage/sqlite3/storage.go | 4 - internal/sqlutil/partition_offset_table.go | 133 --------------------- keyserver/storage/postgres/storage.go | 3 - keyserver/storage/shared/storage.go | 1 - keyserver/storage/sqlite3/storage.go | 3 - syncapi/storage/postgres/syncserver.go | 4 - syncapi/storage/sqlite3/syncserver.go | 8 +- 10 files changed, 2 insertions(+), 166 deletions(-) delete mode 100644 internal/sqlutil/partition_offset_table.go diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go index d2c3e261e..eaf947ff3 100644 --- a/appservice/storage/postgres/storage.go +++ b/appservice/storage/postgres/storage.go @@ -28,7 +28,6 @@ import ( // Database stores events intended to be later sent to application services type Database struct { - sqlutil.PartitionOffsetStatements events eventsStatements txnID txnStatements db *sql.DB @@ -46,9 +45,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if err = result.prepare(); err != nil { return nil, err } - if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil { - return nil, err - } return &result, nil } diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go index 51bfe7109..9260c7fe7 100644 --- a/appservice/storage/sqlite3/storage.go +++ b/appservice/storage/sqlite3/storage.go @@ -27,7 +27,6 @@ import ( // Database stores events intended to be later sent to application services type Database struct { - sqlutil.PartitionOffsetStatements events eventsStatements txnID txnStatements db *sql.DB @@ -45,9 +44,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if err = result.prepare(); err != nil { return nil, err } - if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil { - return nil, err - } return &result, nil } diff --git a/federationapi/storage/postgres/storage.go b/federationapi/storage/postgres/storage.go index 2e2c08911..b2aea6929 100644 --- a/federationapi/storage/postgres/storage.go +++ b/federationapi/storage/postgres/storage.go @@ -30,7 +30,6 @@ import ( // Database stores information needed by the federation sender type Database struct { shared.Database - sqlutil.PartitionOffsetStatements db *sql.DB writer sqlutil.Writer } @@ -104,8 +103,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationC NotaryServerKeysMetadata: notaryMetadata, ServerSigningKeys: serverSigningKeys, } - if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { - return nil, err - } return &d, nil } diff --git a/federationapi/storage/sqlite3/storage.go b/federationapi/storage/sqlite3/storage.go index 978dd7136..c2e83211e 100644 --- a/federationapi/storage/sqlite3/storage.go +++ b/federationapi/storage/sqlite3/storage.go @@ -29,7 +29,6 @@ import ( // Database stores information needed by the federation sender type Database struct { shared.Database - sqlutil.PartitionOffsetStatements db *sql.DB writer sqlutil.Writer } @@ -103,8 +102,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationC NotaryServerKeysMetadata: notaryKeysMetadata, ServerSigningKeys: serverSigningKeys, } - if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { - return nil, err - } return &d, nil } diff --git a/internal/sqlutil/partition_offset_table.go b/internal/sqlutil/partition_offset_table.go deleted file mode 100644 index e19a092f9..000000000 --- a/internal/sqlutil/partition_offset_table.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sqlutil - -import ( - "context" - "database/sql" - "strings" -) - -// A PartitionOffset is the offset into a partition of the input log. -type PartitionOffset struct { - // The ID of the partition. - Partition int32 - // The offset into the partition. - Offset int64 -} - -const partitionOffsetsSchema = ` --- The offsets that the server has processed up to. -CREATE TABLE IF NOT EXISTS ${prefix}_partition_offsets ( - -- The name of the topic. - topic TEXT NOT NULL, - -- The 32-bit partition ID - partition INTEGER NOT NULL, - -- The 64-bit offset. - partition_offset BIGINT NOT NULL, - UNIQUE (topic, partition) -); -` - -const selectPartitionOffsetsSQL = "" + - "SELECT partition, partition_offset FROM ${prefix}_partition_offsets WHERE topic = $1" - -const upsertPartitionOffsetsSQL = "" + - "INSERT INTO ${prefix}_partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" + - " ON CONFLICT (topic, partition)" + - " DO UPDATE SET partition_offset = $3" - -// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table. -type PartitionOffsetStatements struct { - db *sql.DB - writer Writer - selectPartitionOffsetsStmt *sql.Stmt - upsertPartitionOffsetStmt *sql.Stmt -} - -// Prepare converts the raw SQL statements into prepared statements. -// Takes a prefix to prepend to the table name used to store the partition offsets. -// This allows multiple components to share the same database schema. -func (s *PartitionOffsetStatements) Prepare(db *sql.DB, writer Writer, prefix string) (err error) { - s.db = db - s.writer = writer - _, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1)) - if err != nil { - return - } - if s.selectPartitionOffsetsStmt, err = db.Prepare( - strings.Replace(selectPartitionOffsetsSQL, "${prefix}", prefix, -1), - ); err != nil { - return - } - if s.upsertPartitionOffsetStmt, err = db.Prepare( - strings.Replace(upsertPartitionOffsetsSQL, "${prefix}", prefix, -1), - ); err != nil { - return - } - return -} - -// PartitionOffsets implements PartitionStorer -func (s *PartitionOffsetStatements) PartitionOffsets( - ctx context.Context, topic string, -) ([]PartitionOffset, error) { - return s.selectPartitionOffsets(ctx, topic) -} - -// SetPartitionOffset implements PartitionStorer -func (s *PartitionOffsetStatements) SetPartitionOffset( - ctx context.Context, topic string, partition int32, offset int64, -) error { - return s.upsertPartitionOffset(ctx, topic, partition, offset) -} - -// selectPartitionOffsets returns all the partition offsets for the given topic. -func (s *PartitionOffsetStatements) selectPartitionOffsets( - ctx context.Context, topic string, -) (results []PartitionOffset, err error) { - rows, err := s.selectPartitionOffsetsStmt.QueryContext(ctx, topic) - if err != nil { - return nil, err - } - defer checkNamedErr(rows.Close, &err) - for rows.Next() { - var offset PartitionOffset - if err = rows.Scan(&offset.Partition, &offset.Offset); err != nil { - return nil, err - } - results = append(results, offset) - } - err = rows.Err() - return results, err -} - -// checkNamedErr calls fn and overwrite err if it was nil and fn returned non-nil -func checkNamedErr(fn func() error, err *error) { - if e := fn(); e != nil && *err == nil { - *err = e - } -} - -// UpsertPartitionOffset updates or inserts the partition offset for the given topic. -func (s *PartitionOffsetStatements) upsertPartitionOffset( - ctx context.Context, topic string, partition int32, offset int64, -) error { - return s.writer.Do(s.db, nil, func(txn *sql.Tx) error { - stmt := TxStmt(txn, s.upsertPartitionOffsetStmt) - _, err := stmt.ExecContext(ctx, topic, partition, offset) - return err - }) -} diff --git a/keyserver/storage/postgres/storage.go b/keyserver/storage/postgres/storage.go index b71cc1a7a..136986885 100644 --- a/keyserver/storage/postgres/storage.go +++ b/keyserver/storage/postgres/storage.go @@ -70,8 +70,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) CrossSigningKeysTable: csk, CrossSigningSigsTable: css, } - if err = d.PartitionOffsetStatements.Prepare(db, d.Writer, "keyserver"); err != nil { - return nil, err - } return d, nil } diff --git a/keyserver/storage/shared/storage.go b/keyserver/storage/shared/storage.go index 03215b93b..7ba0b3ea1 100644 --- a/keyserver/storage/shared/storage.go +++ b/keyserver/storage/shared/storage.go @@ -36,7 +36,6 @@ type Database struct { StaleDeviceListsTable tables.StaleDeviceLists CrossSigningKeysTable tables.CrossSigningKeys CrossSigningSigsTable tables.CrossSigningSigs - sqlutil.PartitionOffsetStatements } func (d *Database) ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error) { diff --git a/keyserver/storage/sqlite3/storage.go b/keyserver/storage/sqlite3/storage.go index 50ce00d05..0e0adceef 100644 --- a/keyserver/storage/sqlite3/storage.go +++ b/keyserver/storage/sqlite3/storage.go @@ -69,8 +69,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) CrossSigningKeysTable: csk, CrossSigningSigsTable: css, } - if err = d.PartitionOffsetStatements.Prepare(db, d.Writer, "keyserver"); err != nil { - return nil, err - } return d, nil } diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 60fe5b54d..4e4b5c0bb 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -32,7 +32,6 @@ type SyncServerDatasource struct { shared.Database db *sql.DB writer sqlutil.Writer - sqlutil.PartitionOffsetStatements } // NewDatabase creates a new sync server database @@ -43,9 +42,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e return nil, err } d.writer = sqlutil.NewDummyWriter() - if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { - return nil, err - } accountData, err := NewPostgresAccountDataTable(d.db) if err != nil { return nil, err diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index f5ae9fdd7..cb7e3b46f 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -28,9 +28,8 @@ import ( // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { shared.Database - db *sql.DB - writer sqlutil.Writer - sqlutil.PartitionOffsetStatements + db *sql.DB + writer sqlutil.Writer streamID streamIDStatements } @@ -50,9 +49,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e } func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (err error) { - if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { - return err - } if err = d.streamID.prepare(d.db); err != nil { return err } From 9572f5ed19abc0b635092108aa6956eaebc60578 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 21 Mar 2022 10:32:34 +0000 Subject: [PATCH 2/2] Wait for safe shutdown of NATS Server (#2289) --- appservice/appservice.go | 2 +- clientapi/clientapi.go | 4 +++- .../personalities/clientapi.go | 6 +++--- eduserver/eduserver.go | 2 +- federationapi/federationapi.go | 2 +- keyserver/keyserver.go | 2 +- roomserver/roomserver.go | 2 +- setup/jetstream/nats.go | 14 ++++++++++++-- setup/monolith.go | 2 +- syncapi/syncapi.go | 2 +- userapi/userapi.go | 2 +- 11 files changed, 26 insertions(+), 14 deletions(-) diff --git a/appservice/appservice.go b/appservice/appservice.go index 3e19e09b2..b99091866 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -59,7 +59,7 @@ func NewInternalAPI( }, }, } - js, _ := jetstream.Prepare(&base.Cfg.Global.JetStream) + js, _ := jetstream.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index e4279c220..75184d3b7 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -27,6 +27,7 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" userdb "github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/gomatrixserverlib" @@ -34,6 +35,7 @@ import ( // AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component. func AddPublicRoutes( + process *process.ProcessContext, router *mux.Router, synapseAdminRouter *mux.Router, cfg *config.ClientAPI, @@ -49,7 +51,7 @@ func AddPublicRoutes( extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, ) { - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ JetStream: js, diff --git a/cmd/dendrite-polylith-multi/personalities/clientapi.go b/cmd/dendrite-polylith-multi/personalities/clientapi.go index bd9f7a109..5e67acd03 100644 --- a/cmd/dendrite-polylith-multi/personalities/clientapi.go +++ b/cmd/dendrite-polylith-multi/personalities/clientapi.go @@ -33,9 +33,9 @@ func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { keyAPI := base.KeyServerHTTPClient() clientapi.AddPublicRoutes( - base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, accountDB, federation, - rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil, - &cfg.MSCs, + base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, + accountDB, federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, + keyAPI, nil, &cfg.MSCs, ) base.SetupAndServeHTTP( diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 6882399da..9fe8704cf 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -42,7 +42,7 @@ func NewInternalAPI( ) api.EDUServerInputAPI { cfg := &base.Cfg.EDUServer - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) return &input.EDUServerInputAPI{ Cache: eduCache, diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 9f149d973..b7f93ecb9 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -92,7 +92,7 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) queues := queue.NewOutgoingQueues( federationDB, base.ProcessContext, diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 8a0ce6178..cf66bd387 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -39,7 +39,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { func NewInternalAPI( base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) db, err := storage.NewDatabase(&cfg.Database) if err != nil { diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 1992ac335..896773bab 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -50,7 +50,7 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to room server db") } - js, nc := jetstream.Prepare(&cfg.Matrix.JetStream) + js, nc := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) return internal.NewRoomserverAPI( base.ProcessContext, cfg, roomserverDB, js, nc, diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 37597d584..43cc0331d 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -6,6 +6,7 @@ import ( "time" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" "github.com/sirupsen/logrus" natsserver "github.com/nats-io/nats-server/v2/server" @@ -15,7 +16,7 @@ import ( var natsServer *natsserver.Server var natsServerMutex sync.Mutex -func Prepare(cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { +func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { return setupNATS(cfg, nil) @@ -35,7 +36,16 @@ func Prepare(cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Co panic(err) } natsServer.ConfigureLogger() - go natsServer.Start() + go func() { + process.ComponentStarted() + natsServer.Start() + }() + go func() { + <-process.WaitForShutdown() + natsServer.Shutdown() + natsServer.WaitForShutdown() + process.ComponentFinished() + }() } natsServerMutex.Unlock() if !natsServer.ReadyForConnections(time.Second * 10) { diff --git a/setup/monolith.go b/setup/monolith.go index 7dbd2eeaa..fa6d962c4 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -57,7 +57,7 @@ type Monolith struct { // AddAllPublicRoutes attaches all public paths to the given router func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux *mux.Router) { clientapi.AddPublicRoutes( - csMux, synapseMux, &m.Config.ClientAPI, m.AccountDB, + process, csMux, synapseMux, &m.Config.ClientAPI, m.AccountDB, m.FedClient, m.RoomserverAPI, m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), m.FederationAPI, m.UserAPI, m.KeyAPI, diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 41635c911..f1f827221 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -49,7 +49,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { diff --git a/userapi/userapi.go b/userapi/userapi.go index 1e4ebcb2e..97bdf7b2d 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -46,7 +46,7 @@ func NewInternalAPI( appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI, rsAPI rsapi.RoomserverInternalAPI, pgClient pushgateway.Client, ) api.UserInternalAPI { - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) syncProducer := producers.NewSyncAPI( db, js,