diff --git a/CHANGES.md b/CHANGES.md index d05b871ac..095ab9c5b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,17 @@ # Changelog +## Dendrite 0.2.1 (2020-10-22) + +### Fixes + +* Forward extremities are now calculated using only references from other extremities, rather than including outliers, which should fix cases where state can become corrupted ([#1556](https://github.com/matrix-org/dendrite/pull/1556)) +* Old state events will no longer be processed by the sync API as new, which should fix some cases where clients incorrectly believe they have joined or left rooms ([#1548](https://github.com/matrix-org/dendrite/pull/1548)) +* More SQLite database locking issues have been resolved in the latest events updater ([#1554](https://github.com/matrix-org/dendrite/pull/1554)) +* Internal HTTP API calls are now made using H2C (HTTP/2) in polylith mode, mitigating some potential head-of-line blocking issues ([#1541](https://github.com/matrix-org/dendrite/pull/1541)) +* Roomserver output events no longer incorrectly flag state rewrites ([#1557](https://github.com/matrix-org/dendrite/pull/1557)) +* Notification levels are now parsed correctly in power level events ([gomatrixserverlib#228](https://github.com/matrix-org/gomatrixserverlib/pull/228), contributed by [Pestdoktor](https://github.com/Pestdoktor)) +* Invalid UTF-8 is now correctly rejected when making federation requests ([gomatrixserverlib#229](https://github.com/matrix-org/gomatrixserverlib/pull/229), contributed by [Pestdoktor](https://github.com/Pestdoktor)) + ## Dendrite 0.2.0 (2020-10-20) ### Important diff --git a/README.md b/README.md index ea61dac13..6c84cffba 100644 --- a/README.md +++ b/README.md @@ -54,22 +54,24 @@ The following instructions are enough to get Dendrite started as a non-federatin ```bash $ git clone https://github.com/matrix-org/dendrite $ cd dendrite +$ ./build.sh -# generate self-signed certificate and an event signing key for federation -$ go build ./cmd/generate-keys -$ ./generate-keys --private-key matrix_key.pem --tls-cert server.crt --tls-key server.key +# Generate a Matrix signing key for federation (required) +$ ./generate-keys --private-key matrix_key.pem -# Copy and modify the config file: -# you'll need to set a server name and paths to the keys at the very least, along with setting -# up the database filenames +# Generate a self-signed certificate (optional, but a valid TLS certificate is normally +# needed for Matrix federation/clients to work properly!) +$ ./generate-keys --tls-cert server.crt --tls-key server.key + +# Copy and modify the config file - you'll need to set a server name and paths to the keys +# at the very least, along with setting up the database connection strings. $ cp dendrite-config.yaml dendrite.yaml -# build and run the server -$ go build ./cmd/dendrite-monolith-server +# Build and run the server: $ ./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml ``` -Then point your favourite Matrix client at `http://localhost:8008`. +Then point your favourite Matrix client at `http://localhost:8008` or `https://localhost:8448`. ## Progress diff --git a/build/docker/docker-compose.deps.yml b/build/docker/docker-compose.deps.yml index 1a27ffac0..454fddc29 100644 --- a/build/docker/docker-compose.deps.yml +++ b/build/docker/docker-compose.deps.yml @@ -1,5 +1,6 @@ version: "3.4" services: + # PostgreSQL is needed for both polylith and monolith modes. postgres: hostname: postgres image: postgres:9.6 @@ -15,12 +16,14 @@ services: networks: - internal + # Zookeeper is only needed for polylith mode! zookeeper: hostname: zookeeper image: zookeeper networks: - internal + # Kafka is only needed for polylith mode! kafka: container_name: dendrite_kafka hostname: kafka @@ -29,8 +32,6 @@ services: KAFKA_ADVERTISED_HOST_NAME: "kafka" KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - ports: - - 9092:9092 depends_on: - zookeeper networks: diff --git a/build/docker/docker-compose.monolith.yml b/build/docker/docker-compose.monolith.yml index 8fb798343..024183aa6 100644 --- a/build/docker/docker-compose.monolith.yml +++ b/build/docker/docker-compose.monolith.yml @@ -7,6 +7,9 @@ services: "--tls-cert=server.crt", "--tls-key=server.key" ] + ports: + - 8008:8008 + - 8448:8448 volumes: - ./config:/etc/dendrite networks: diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 6e87bc709..25503692b 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -36,6 +36,8 @@ global: server_name: localhost # The path to the signing private key file, used to sign requests and events. + # Note that this is NOT the same private key as used for TLS! To generate a + # signing key, use "./bin/generate-keys --private-key matrix_key.pem". private_key: matrix_key.pem # The paths and expiry timestamps (as a UNIX timestamp in millisecond precision) @@ -74,6 +76,12 @@ global: # Kafka. use_naffka: true + # The max size a Kafka message is allowed to use. + # You only need to change this value, if you encounter issues with too large messages. + # Must be less than/equal to "max.message.bytes" configured in Kafka. + # Defaults to 8388608 bytes. + # max_message_bytes: 8388608 + # Naffka database options. Not required when using Kafka. naffka_database: connection_string: file:naffka.db diff --git a/docs/INSTALL.md b/docs/INSTALL.md index f804193cd..0b3f932be 100644 --- a/docs/INSTALL.md +++ b/docs/INSTALL.md @@ -12,6 +12,8 @@ Dendrite can be run in one of two configurations: lightweight implementation called [Naffka](https://github.com/matrix-org/naffka). This will usually be the preferred model for low-volume, low-user or experimental deployments. +For most deployments, it is **recommended to run in monolith mode with PostgreSQL databases**. + Regardless of whether you are running in polylith or monolith mode, each Dendrite component that requires storage has its own database. Both Postgres and SQLite are supported and can be mixed-and-matched across components as needed in the configuration file. @@ -30,23 +32,9 @@ If you want to run a polylith deployment, you also need: * Apache Kafka 0.10.2+ -## Building up a monolith deploment +Please note that Kafka is **not required** for a monolith deployment. -Start by cloning the code: - -```bash -git clone https://github.com/matrix-org/dendrite -cd dendrite -``` - -Then build it: - -```bash -go build -o bin/dendrite-monolith-server ./cmd/dendrite-monolith-server -go build -o bin/generate-keys ./cmd/generate-keys -``` - -## Building up a polylith deployment +## Building Dendrite Start by cloning the code: @@ -61,6 +49,8 @@ Then build it: ./build.sh ``` +## Install Kafka (polylith only) + Install and start Kafka (c.f. [scripts/install-local-kafka.sh](scripts/install-local-kafka.sh)): ```bash @@ -96,9 +86,9 @@ Dendrite can use the built-in SQLite database engine for small setups. The SQLite databases do not need to be pre-built - Dendrite will create them automatically at startup. -### Postgres database setup +### PostgreSQL database setup -Assuming that Postgres 9.6 (or later) is installed: +Assuming that PostgreSQL 9.6 (or later) is installed: * Create role, choosing a new password when prompted: @@ -118,17 +108,31 @@ Assuming that Postgres 9.6 (or later) is installed: ### Server key generation -Each Dendrite server requires unique server keys. +Each Dendrite installation requires: -In order for an instance to federate correctly, you should have a valid -certificate issued by a trusted authority, and private key to match. If you -don't and just want to test locally, generate the self-signed SSL certificate -for federation and the server signing key: +- A unique Matrix signing private key +- A valid and trusted TLS certificate and private key + +To generate a Matrix signing private key: ```bash -./bin/generate-keys --private-key matrix_key.pem --tls-cert server.crt --tls-key server.key +./bin/generate-keys --private-key matrix_key.pem ``` +**Warning:** Make sure take a safe backup of this key! You will likely need it if you want to reinstall Dendrite, or +any other Matrix homeserver, on the same domain name in the future. If you lose this key, you may have trouble joining +federated rooms. + +For testing, you can generate a self-signed certificate and key, although this will not work for public federation: + +```bash +./bin/generate-keys --tls-cert server.crt --tls-key server.key +``` + +If you have server keys from an older Synapse instance, +[convert them](serverkeyformat.md#converting-synapse-keys) to Dendrite's PEM +format and configure them as `old_private_keys` in your config. + ### Configuration file Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Things that will need editing include *at least*: @@ -136,9 +140,9 @@ Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Th * The `server_name` entry to reflect the hostname of your Dendrite server * The `database` lines with an updated connection string based on your desired setup, e.g. replacing `database` with the name of the database: - * For Postgres: `postgres://dendrite:password@localhost/database` - * For SQLite on disk: `file:component.db` or `file:///path/to/component.db` - * Postgres and SQLite can be mixed and matched. + * For Postgres: `postgres://dendrite:password@localhost/database`, e.g. `postgres://dendrite:password@localhost/dendrite_userapi_account.db` + * For SQLite on disk: `file:component.db` or `file:///path/to/component.db`, e.g. `file:userapi_account.db` + * Postgres and SQLite can be mixed and matched on different components as desired. * The `use_naffka` option if using Naffka in a monolith deployment There are other options which may be useful so review them all. In particular, @@ -148,7 +152,7 @@ help to improve reliability considerably by allowing your homeserver to fetch public keys for dead homeservers from somewhere else. **WARNING:** Dendrite supports running all components from the same database in -Postgres mode, but this is **NOT** a supported configuration with SQLite. When +PostgreSQL mode, but this is **NOT** a supported configuration with SQLite. When using SQLite, all components **MUST** use their own database file. ## Starting a monolith server @@ -160,8 +164,14 @@ Be sure to update the database username and password if needed. The monolith server can be started as shown below. By default it listens for HTTP connections on port 8008, so you can configure your Matrix client to use -`http://localhost:8008` as the server. If you set `--tls-cert` and `--tls-key` -as shown below, it will also listen for HTTPS connections on port 8448. +`http://servername:8008` as the server: + +```bash +./bin/dendrite-monolith-server +``` + +If you set `--tls-cert` and `--tls-key` as shown below, it will also listen +for HTTPS connections on port 8448: ```bash ./bin/dendrite-monolith-server --tls-cert=server.crt --tls-key=server.key diff --git a/docs/hiawatha/polylith-sample.conf b/docs/hiawatha/polylith-sample.conf new file mode 100644 index 000000000..99730efdb --- /dev/null +++ b/docs/hiawatha/polylith-sample.conf @@ -0,0 +1,16 @@ +VirtualHost { + ... + # route requests to: + # /_matrix/client/.*/sync + # /_matrix/client/.*/user/{userId}/filter + # /_matrix/client/.*/user/{userId}/filter/{filterID} + # /_matrix/client/.*/keys/changes + # /_matrix/client/.*/rooms/{roomId}/messages + # to sync_api + ReverseProxy = /_matrix/client/.*?/(sync|user/.*?/filter/?.*|keys/changes|rooms/.*?/messages) http://localhost:8073 + ReverseProxy = /_matrix/client http://localhost:8071 + ReverseProxy = /_matrix/federation http://localhost:8072 + ReverseProxy = /_matrix/key http://localhost:8072 + ReverseProxy = /_matrix/media http://localhost:8074 + ... +} diff --git a/docs/serverkeyformat.md b/docs/serverkeyformat.md new file mode 100644 index 000000000..feda93454 --- /dev/null +++ b/docs/serverkeyformat.md @@ -0,0 +1,29 @@ +# Server Key Format + +Dendrite stores the server signing key in the PEM format with the following structure. + +``` +-----BEGIN MATRIX PRIVATE KEY----- +Key-ID: ed25519: + + +-----END MATRIX PRIVATE KEY----- +``` + +## Converting Synapse Keys + +If you have signing keys from a previous synapse server, you should ideally configure them as `old_private_keys` in your Dendrite config file. Synapse stores signing keys in the following format. + +``` +ed25519 +``` + +To convert this key to Dendrite's PEM format, use the following template. **It is important to include the equals sign, as the key data needs to be padded to 32 bytes.** + +``` +-----BEGIN MATRIX PRIVATE KEY----- +Key-ID: ed25519: + += +-----END MATRIX PRIVATE KEY----- +``` \ No newline at end of file diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index efeb53fa6..ef945694c 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -87,6 +87,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { case api.OutputTypeNewRoomEvent: ev := &output.NewRoomEvent.Event + if output.NewRoomEvent.RewritesState { + if err := s.db.PurgeRoomState(context.TODO(), ev.RoomID()); err != nil { + return fmt.Errorf("s.db.PurgeRoom: %w", err) + } + } + if err := s.processMessage(*output.NewRoomEvent); err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index 734b368fe..a3f5073f9 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -32,6 +32,7 @@ type Database interface { GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) // GetJoinedHostsForRooms returns the complete set of servers in the rooms given. GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) + PurgeRoomState(ctx context.Context, roomID string) error StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) diff --git a/federationsender/storage/postgres/joined_hosts_table.go b/federationsender/storage/postgres/joined_hosts_table.go index cc112b7f3..0bc9335dd 100644 --- a/federationsender/storage/postgres/joined_hosts_table.go +++ b/federationsender/storage/postgres/joined_hosts_table.go @@ -53,6 +53,9 @@ const insertJoinedHostsSQL = "" + const deleteJoinedHostsSQL = "" + "DELETE FROM federationsender_joined_hosts WHERE event_id = ANY($1)" +const deleteJoinedHostsForRoomSQL = "" + + "DELETE FROM federationsender_joined_hosts WHERE room_id = $1" + const selectJoinedHostsSQL = "" + "SELECT event_id, server_name FROM federationsender_joined_hosts" + " WHERE room_id = $1" @@ -67,6 +70,7 @@ type joinedHostsStatements struct { db *sql.DB insertJoinedHostsStmt *sql.Stmt deleteJoinedHostsStmt *sql.Stmt + deleteJoinedHostsForRoomStmt *sql.Stmt selectJoinedHostsStmt *sql.Stmt selectAllJoinedHostsStmt *sql.Stmt selectJoinedHostsForRoomsStmt *sql.Stmt @@ -86,6 +90,9 @@ func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err erro if s.deleteJoinedHostsStmt, err = s.db.Prepare(deleteJoinedHostsSQL); err != nil { return } + if s.deleteJoinedHostsForRoomStmt, err = s.db.Prepare(deleteJoinedHostsForRoomSQL); err != nil { + return + } if s.selectJoinedHostsStmt, err = s.db.Prepare(selectJoinedHostsSQL); err != nil { return } @@ -117,6 +124,14 @@ func (s *joinedHostsStatements) DeleteJoinedHosts( return err } +func (s *joinedHostsStatements) DeleteJoinedHostsForRoom( + ctx context.Context, txn *sql.Tx, roomID string, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsForRoomStmt) + _, err := stmt.ExecContext(ctx, roomID) + return err +} + func (s *joinedHostsStatements) SelectJoinedHostsWithTx( ctx context.Context, txn *sql.Tx, roomID string, ) ([]types.JoinedHost, error) { diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index 4c80c0792..d5731f31c 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -148,6 +148,20 @@ func (d *Database) StoreJSON( }, nil } +func (d *Database) PurgeRoomState( + ctx context.Context, roomID string, +) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + // If the event is a create event then we'll delete all of the existing + // data for the room. The only reason that a create event would be replayed + // to us in this way is if we're about to receive the entire room state. + if err := d.FederationSenderJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil { + return fmt.Errorf("d.FederationSenderJoinedHosts.DeleteJoinedHosts: %w", err) + } + return nil + }) +} + func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), txn, serverName) diff --git a/federationsender/storage/sqlite3/joined_hosts_table.go b/federationsender/storage/sqlite3/joined_hosts_table.go index 3bc45e7d5..1f906808d 100644 --- a/federationsender/storage/sqlite3/joined_hosts_table.go +++ b/federationsender/storage/sqlite3/joined_hosts_table.go @@ -53,6 +53,9 @@ const insertJoinedHostsSQL = "" + const deleteJoinedHostsSQL = "" + "DELETE FROM federationsender_joined_hosts WHERE event_id = $1" +const deleteJoinedHostsForRoomSQL = "" + + "DELETE FROM federationsender_joined_hosts WHERE room_id = $1" + const selectJoinedHostsSQL = "" + "SELECT event_id, server_name FROM federationsender_joined_hosts" + " WHERE room_id = $1" @@ -64,11 +67,12 @@ const selectJoinedHostsForRoomsSQL = "" + "SELECT DISTINCT server_name FROM federationsender_joined_hosts WHERE room_id IN ($1)" type joinedHostsStatements struct { - db *sql.DB - insertJoinedHostsStmt *sql.Stmt - deleteJoinedHostsStmt *sql.Stmt - selectJoinedHostsStmt *sql.Stmt - selectAllJoinedHostsStmt *sql.Stmt + db *sql.DB + insertJoinedHostsStmt *sql.Stmt + deleteJoinedHostsStmt *sql.Stmt + deleteJoinedHostsForRoomStmt *sql.Stmt + selectJoinedHostsStmt *sql.Stmt + selectAllJoinedHostsStmt *sql.Stmt // selectJoinedHostsForRoomsStmt *sql.Stmt - prepared at runtime due to variadic } @@ -86,6 +90,9 @@ func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil { return } + if s.deleteJoinedHostsForRoomStmt, err = s.db.Prepare(deleteJoinedHostsForRoomSQL); err != nil { + return + } if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil { return } @@ -118,6 +125,14 @@ func (s *joinedHostsStatements) DeleteJoinedHosts( return nil } +func (s *joinedHostsStatements) DeleteJoinedHostsForRoom( + ctx context.Context, txn *sql.Tx, roomID string, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsForRoomStmt) + _, err := stmt.ExecContext(ctx, roomID) + return err +} + func (s *joinedHostsStatements) SelectJoinedHostsWithTx( ctx context.Context, txn *sql.Tx, roomID string, ) ([]types.JoinedHost, error) { diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index c6f8a2d52..1167a212a 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -50,6 +50,7 @@ type FederationSenderQueueJSON interface { type FederationSenderJoinedHosts interface { InsertJoinedHosts(ctx context.Context, txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName) error DeleteJoinedHosts(ctx context.Context, txn *sql.Tx, eventIDs []string) error + DeleteJoinedHostsForRoom(ctx context.Context, txn *sql.Tx, roomID string) error SelectJoinedHostsWithTx(ctx context.Context, txn *sql.Tx, roomID string) ([]types.JoinedHost, error) SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) diff --git a/internal/config/config_kafka.go b/internal/config/config_kafka.go index e2bd6538e..707c92a71 100644 --- a/internal/config/config_kafka.go +++ b/internal/config/config_kafka.go @@ -24,6 +24,9 @@ type Kafka struct { UseNaffka bool `yaml:"use_naffka"` // The Naffka database is used internally by the naffka library, if used. Database DatabaseOptions `yaml:"naffka_database"` + // The max size a Kafka message passed between consumer/producer can have + // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka + MaxMessageBytes *int `yaml:"max_message_bytes"` } func (k *Kafka) TopicFor(name string) string { @@ -36,6 +39,9 @@ func (c *Kafka) Defaults() { c.Addresses = []string{"localhost:2181"} c.Database.ConnectionString = DataSource("file:naffka.db") c.TopicPrefix = "Dendrite" + + maxBytes := 1024 * 1024 * 8 // about 8MB + c.MaxMessageBytes = &maxBytes } func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { @@ -50,4 +56,5 @@ func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) } checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix)) + checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes)) } diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go index 9855ae156..091025ec7 100644 --- a/internal/setup/kafka/kafka.go +++ b/internal/setup/kafka/kafka.go @@ -17,12 +17,17 @@ func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProdu // setupKafka creates kafka consumer/producer pair from the config. func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - consumer, err := sarama.NewConsumer(cfg.Addresses, nil) + sCfg := sarama.NewConfig() + sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes + sCfg.Producer.Return.Successes = true + sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes) + + consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg) if err != nil { logrus.WithError(err).Panic("failed to start kafka consumer") } - producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) + producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg) if err != nil { logrus.WithError(err).Panic("failed to setup kafka producers") } diff --git a/internal/version.go b/internal/version.go index 040ffa32a..21f697086 100644 --- a/internal/version.go +++ b/internal/version.go @@ -17,7 +17,7 @@ var build string const ( VersionMajor = 0 VersionMinor = 2 - VersionPatch = 0 + VersionPatch = 1 VersionTag = "" // example: "rc1" ) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index d340ac218..99c15f77a 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -102,7 +102,13 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er Value: sarama.ByteEncoder(value), } } - return r.Producer.SendMessages(messages) + errs := r.Producer.SendMessages(messages) + if errs != nil { + for _, err := range errs.(sarama.ProducerErrors) { + log.WithError(err).WithField("message_bytes", err.Msg.Value.Length()).Error("Write to kafka failed") + } + } + return errs } // InputRoomEvents implements api.RoomserverInternalAPI diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index dc758e9b0..2bf6b9f8a 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -116,7 +116,6 @@ type latestEventsUpdater struct { func (u *latestEventsUpdater) doUpdateLatestEvents() error { u.lastEventIDSent = u.updater.LastEventIDSent() - u.oldStateNID = u.updater.CurrentStateSnapshotNID() // If we are doing a regular event update then we will get the // previous latest events to use as a part of the calculation. If @@ -125,7 +124,8 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // then start with an empty set - none of the forward extremities // that we knew about before matter anymore. oldLatest := []types.StateAtEventAndReference{} - if !u.stateAtEvent.Overwrite { + if !u.rewritesState { + u.oldStateNID = u.updater.CurrentStateSnapshotNID() oldLatest = u.updater.LatestEvents() } @@ -153,7 +153,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // Now that we know what the latest events are, it's time to get the // latest state. var updates []api.OutputEvent - if extremitiesChanged { + if extremitiesChanged || u.rewritesState { if err = u.latestState(); err != nil { return fmt.Errorf("u.latestState: %w", err) } @@ -324,7 +324,6 @@ func (u *latestEventsUpdater) calculateLatest( } func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { - latestEventIDs := make([]string, len(u.latest)) for i := range u.latest { latestEventIDs[i] = u.latest[i].EventID @@ -365,11 +364,6 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) return nil, fmt.Errorf("failed to load add_state_events from db: %w", err) } } - // State is rewritten if the input room event HasState and we actually produced a delta on state events. - // Without this check, /get_missing_events which produce events with associated (but not complete) state - // will incorrectly purge the room and set it to no state. TODO: This is likely flakey, as if /gme produced - // a state conflict res which just so happens to include 2+ events we might purge the room state downstream. - ore.RewritesState = len(ore.AddsStateEventIDs) > 1 return &api.OutputEvent{ Type: api.OutputTypeNewRoomEvent, diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index c8e60efa3..41cbd2637 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -379,7 +379,7 @@ func TestOutputRewritesState(t *testing.T) { if len(producer.producedMessages) != 1 { t.Fatalf("Rewritten events got output, want only 1 got %d", len(producer.producedMessages)) } - outputEvent := producer.producedMessages[0] + outputEvent := producer.producedMessages[len(producer.producedMessages)-1] if !outputEvent.NewRoomEvent.RewritesState { t.Errorf("RewritesState flag not set on output event") } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 373baea54..ac1128c11 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -149,7 +149,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } if msg.RewritesState { - if err = s.db.PurgeRoom(ctx, ev.RoomID()); err != nil { + if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil { return fmt.Errorf("s.db.PurgeRoom: %w", err) } } @@ -189,14 +189,20 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( ) error { ev := msg.Event + // TODO: The state key check when excluding from sync is designed + // to stop us from lying to clients with old state, whilst still + // allowing normal timeline events through. This is an absolute + // hack but until we have some better strategy for dealing with + // old events in the sync API, this should at least prevent us + // from confusing clients into thinking they've joined/left rooms. pduPos, err := s.db.WriteEvent( ctx, &ev, []gomatrixserverlib.HeaderedEvent{}, - []string{}, // adds no state - []string{}, // removes no state - nil, // no transaction - false, // not excluded from sync + []string{}, // adds no state + []string{}, // removes no state + nil, // no transaction + ev.StateKey() != nil, // exclude from sync? ) if err != nil { // panic rather than continue with an inconsistent database diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index ce7f1c152..e12a1166e 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -43,9 +43,9 @@ type Database interface { // Returns an error if there was a problem inserting this event. WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []gomatrixserverlib.HeaderedEvent, addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error) - // PurgeRoom completely purges room state from the sync API. This is done when + // PurgeRoomState completely purges room state from the sync API. This is done when // receiving an output event that completely resets the state. - PurgeRoom(ctx context.Context, roomID string) error + PurgeRoomState(ctx context.Context, roomID string) error // GetStateEvent returns the Matrix state event of a given type for a given room with a given state key // If no event could be found, returns nil // If there was an issue during the retrieval, returns an error diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index b9f21913e..a7c07f943 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -276,7 +276,7 @@ func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, e return nil } -func (d *Database) PurgeRoom( +func (d *Database) PurgeRoomState( ctx context.Context, roomID string, ) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { @@ -286,15 +286,6 @@ func (d *Database) PurgeRoom( if err := d.CurrentRoomState.DeleteRoomStateForRoom(ctx, txn, roomID); err != nil { return fmt.Errorf("d.CurrentRoomState.DeleteRoomStateForRoom: %w", err) } - if err := d.OutputEvents.DeleteEventsForRoom(ctx, txn, roomID); err != nil { - return fmt.Errorf("d.Events.DeleteEventsForRoom: %w", err) - } - if err := d.Topology.DeleteTopologyForRoom(ctx, txn, roomID); err != nil { - return fmt.Errorf("d.Topology.DeleteTopologyForRoom: %w", err) - } - if err := d.BackwardExtremities.DeleteBackwardExtremitiesForRoom(ctx, txn, roomID); err != nil { - return fmt.Errorf("d.BackwardExtremities.DeleteBackwardExtremitiesForRoom: %w", err) - } return nil }) }