Merge branch 'master' of https://github.com/matrix-org/dendrite into add-health-endpoint

This commit is contained in:
Till Faelligen 2020-10-29 17:17:51 +01:00
commit 10d34b3c39
23 changed files with 219 additions and 77 deletions

View file

@ -1,5 +1,17 @@
# Changelog # Changelog
## Dendrite 0.2.1 (2020-10-22)
### Fixes
* Forward extremities are now calculated using only references from other extremities, rather than including outliers, which should fix cases where state can become corrupted ([#1556](https://github.com/matrix-org/dendrite/pull/1556))
* Old state events will no longer be processed by the sync API as new, which should fix some cases where clients incorrectly believe they have joined or left rooms ([#1548](https://github.com/matrix-org/dendrite/pull/1548))
* More SQLite database locking issues have been resolved in the latest events updater ([#1554](https://github.com/matrix-org/dendrite/pull/1554))
* Internal HTTP API calls are now made using H2C (HTTP/2) in polylith mode, mitigating some potential head-of-line blocking issues ([#1541](https://github.com/matrix-org/dendrite/pull/1541))
* Roomserver output events no longer incorrectly flag state rewrites ([#1557](https://github.com/matrix-org/dendrite/pull/1557))
* Notification levels are now parsed correctly in power level events ([gomatrixserverlib#228](https://github.com/matrix-org/gomatrixserverlib/pull/228), contributed by [Pestdoktor](https://github.com/Pestdoktor))
* Invalid UTF-8 is now correctly rejected when making federation requests ([gomatrixserverlib#229](https://github.com/matrix-org/gomatrixserverlib/pull/229), contributed by [Pestdoktor](https://github.com/Pestdoktor))
## Dendrite 0.2.0 (2020-10-20) ## Dendrite 0.2.0 (2020-10-20)
### Important ### Important

View file

@ -54,22 +54,24 @@ The following instructions are enough to get Dendrite started as a non-federatin
```bash ```bash
$ git clone https://github.com/matrix-org/dendrite $ git clone https://github.com/matrix-org/dendrite
$ cd dendrite $ cd dendrite
$ ./build.sh
# generate self-signed certificate and an event signing key for federation # Generate a Matrix signing key for federation (required)
$ go build ./cmd/generate-keys $ ./generate-keys --private-key matrix_key.pem
$ ./generate-keys --private-key matrix_key.pem --tls-cert server.crt --tls-key server.key
# Copy and modify the config file: # Generate a self-signed certificate (optional, but a valid TLS certificate is normally
# you'll need to set a server name and paths to the keys at the very least, along with setting # needed for Matrix federation/clients to work properly!)
# up the database filenames $ ./generate-keys --tls-cert server.crt --tls-key server.key
# Copy and modify the config file - you'll need to set a server name and paths to the keys
# at the very least, along with setting up the database connection strings.
$ cp dendrite-config.yaml dendrite.yaml $ cp dendrite-config.yaml dendrite.yaml
# build and run the server # Build and run the server:
$ go build ./cmd/dendrite-monolith-server
$ ./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml $ ./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml
``` ```
Then point your favourite Matrix client at `http://localhost:8008`. Then point your favourite Matrix client at `http://localhost:8008` or `https://localhost:8448`.
## Progress ## Progress

View file

@ -1,5 +1,6 @@
version: "3.4" version: "3.4"
services: services:
# PostgreSQL is needed for both polylith and monolith modes.
postgres: postgres:
hostname: postgres hostname: postgres
image: postgres:9.6 image: postgres:9.6
@ -15,12 +16,14 @@ services:
networks: networks:
- internal - internal
# Zookeeper is only needed for polylith mode!
zookeeper: zookeeper:
hostname: zookeeper hostname: zookeeper
image: zookeeper image: zookeeper
networks: networks:
- internal - internal
# Kafka is only needed for polylith mode!
kafka: kafka:
container_name: dendrite_kafka container_name: dendrite_kafka
hostname: kafka hostname: kafka
@ -29,8 +32,6 @@ services:
KAFKA_ADVERTISED_HOST_NAME: "kafka" KAFKA_ADVERTISED_HOST_NAME: "kafka"
KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
ports:
- 9092:9092
depends_on: depends_on:
- zookeeper - zookeeper
networks: networks:

View file

@ -7,6 +7,9 @@ services:
"--tls-cert=server.crt", "--tls-cert=server.crt",
"--tls-key=server.key" "--tls-key=server.key"
] ]
ports:
- 8008:8008
- 8448:8448
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
networks: networks:

View file

@ -36,6 +36,8 @@ global:
server_name: localhost server_name: localhost
# The path to the signing private key file, used to sign requests and events. # The path to the signing private key file, used to sign requests and events.
# Note that this is NOT the same private key as used for TLS! To generate a
# signing key, use "./bin/generate-keys --private-key matrix_key.pem".
private_key: matrix_key.pem private_key: matrix_key.pem
# The paths and expiry timestamps (as a UNIX timestamp in millisecond precision) # The paths and expiry timestamps (as a UNIX timestamp in millisecond precision)
@ -74,6 +76,12 @@ global:
# Kafka. # Kafka.
use_naffka: true use_naffka: true
# The max size a Kafka message is allowed to use.
# You only need to change this value, if you encounter issues with too large messages.
# Must be less than/equal to "max.message.bytes" configured in Kafka.
# Defaults to 8388608 bytes.
# max_message_bytes: 8388608
# Naffka database options. Not required when using Kafka. # Naffka database options. Not required when using Kafka.
naffka_database: naffka_database:
connection_string: file:naffka.db connection_string: file:naffka.db

View file

@ -12,6 +12,8 @@ Dendrite can be run in one of two configurations:
lightweight implementation called [Naffka](https://github.com/matrix-org/naffka). This lightweight implementation called [Naffka](https://github.com/matrix-org/naffka). This
will usually be the preferred model for low-volume, low-user or experimental deployments. will usually be the preferred model for low-volume, low-user or experimental deployments.
For most deployments, it is **recommended to run in monolith mode with PostgreSQL databases**.
Regardless of whether you are running in polylith or monolith mode, each Dendrite component that Regardless of whether you are running in polylith or monolith mode, each Dendrite component that
requires storage has its own database. Both Postgres and SQLite are supported and can be requires storage has its own database. Both Postgres and SQLite are supported and can be
mixed-and-matched across components as needed in the configuration file. mixed-and-matched across components as needed in the configuration file.
@ -30,23 +32,9 @@ If you want to run a polylith deployment, you also need:
* Apache Kafka 0.10.2+ * Apache Kafka 0.10.2+
## Building up a monolith deploment Please note that Kafka is **not required** for a monolith deployment.
Start by cloning the code: ## Building Dendrite
```bash
git clone https://github.com/matrix-org/dendrite
cd dendrite
```
Then build it:
```bash
go build -o bin/dendrite-monolith-server ./cmd/dendrite-monolith-server
go build -o bin/generate-keys ./cmd/generate-keys
```
## Building up a polylith deployment
Start by cloning the code: Start by cloning the code:
@ -61,6 +49,8 @@ Then build it:
./build.sh ./build.sh
``` ```
## Install Kafka (polylith only)
Install and start Kafka (c.f. [scripts/install-local-kafka.sh](scripts/install-local-kafka.sh)): Install and start Kafka (c.f. [scripts/install-local-kafka.sh](scripts/install-local-kafka.sh)):
```bash ```bash
@ -96,9 +86,9 @@ Dendrite can use the built-in SQLite database engine for small setups.
The SQLite databases do not need to be pre-built - Dendrite will The SQLite databases do not need to be pre-built - Dendrite will
create them automatically at startup. create them automatically at startup.
### Postgres database setup ### PostgreSQL database setup
Assuming that Postgres 9.6 (or later) is installed: Assuming that PostgreSQL 9.6 (or later) is installed:
* Create role, choosing a new password when prompted: * Create role, choosing a new password when prompted:
@ -118,17 +108,31 @@ Assuming that Postgres 9.6 (or later) is installed:
### Server key generation ### Server key generation
Each Dendrite server requires unique server keys. Each Dendrite installation requires:
In order for an instance to federate correctly, you should have a valid - A unique Matrix signing private key
certificate issued by a trusted authority, and private key to match. If you - A valid and trusted TLS certificate and private key
don't and just want to test locally, generate the self-signed SSL certificate
for federation and the server signing key: To generate a Matrix signing private key:
```bash ```bash
./bin/generate-keys --private-key matrix_key.pem --tls-cert server.crt --tls-key server.key ./bin/generate-keys --private-key matrix_key.pem
``` ```
**Warning:** Make sure take a safe backup of this key! You will likely need it if you want to reinstall Dendrite, or
any other Matrix homeserver, on the same domain name in the future. If you lose this key, you may have trouble joining
federated rooms.
For testing, you can generate a self-signed certificate and key, although this will not work for public federation:
```bash
./bin/generate-keys --tls-cert server.crt --tls-key server.key
```
If you have server keys from an older Synapse instance,
[convert them](serverkeyformat.md#converting-synapse-keys) to Dendrite's PEM
format and configure them as `old_private_keys` in your config.
### Configuration file ### Configuration file
Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Things that will need editing include *at least*: Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Things that will need editing include *at least*:
@ -136,9 +140,9 @@ Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Th
* The `server_name` entry to reflect the hostname of your Dendrite server * The `server_name` entry to reflect the hostname of your Dendrite server
* The `database` lines with an updated connection string based on your * The `database` lines with an updated connection string based on your
desired setup, e.g. replacing `database` with the name of the database: desired setup, e.g. replacing `database` with the name of the database:
* For Postgres: `postgres://dendrite:password@localhost/database` * For Postgres: `postgres://dendrite:password@localhost/database`, e.g. `postgres://dendrite:password@localhost/dendrite_userapi_account.db`
* For SQLite on disk: `file:component.db` or `file:///path/to/component.db` * For SQLite on disk: `file:component.db` or `file:///path/to/component.db`, e.g. `file:userapi_account.db`
* Postgres and SQLite can be mixed and matched. * Postgres and SQLite can be mixed and matched on different components as desired.
* The `use_naffka` option if using Naffka in a monolith deployment * The `use_naffka` option if using Naffka in a monolith deployment
There are other options which may be useful so review them all. In particular, There are other options which may be useful so review them all. In particular,
@ -148,7 +152,7 @@ help to improve reliability considerably by allowing your homeserver to fetch
public keys for dead homeservers from somewhere else. public keys for dead homeservers from somewhere else.
**WARNING:** Dendrite supports running all components from the same database in **WARNING:** Dendrite supports running all components from the same database in
Postgres mode, but this is **NOT** a supported configuration with SQLite. When PostgreSQL mode, but this is **NOT** a supported configuration with SQLite. When
using SQLite, all components **MUST** use their own database file. using SQLite, all components **MUST** use their own database file.
## Starting a monolith server ## Starting a monolith server
@ -160,8 +164,14 @@ Be sure to update the database username and password if needed.
The monolith server can be started as shown below. By default it listens for The monolith server can be started as shown below. By default it listens for
HTTP connections on port 8008, so you can configure your Matrix client to use HTTP connections on port 8008, so you can configure your Matrix client to use
`http://localhost:8008` as the server. If you set `--tls-cert` and `--tls-key` `http://servername:8008` as the server:
as shown below, it will also listen for HTTPS connections on port 8448.
```bash
./bin/dendrite-monolith-server
```
If you set `--tls-cert` and `--tls-key` as shown below, it will also listen
for HTTPS connections on port 8448:
```bash ```bash
./bin/dendrite-monolith-server --tls-cert=server.crt --tls-key=server.key ./bin/dendrite-monolith-server --tls-cert=server.crt --tls-key=server.key

View file

@ -0,0 +1,16 @@
VirtualHost {
...
# route requests to:
# /_matrix/client/.*/sync
# /_matrix/client/.*/user/{userId}/filter
# /_matrix/client/.*/user/{userId}/filter/{filterID}
# /_matrix/client/.*/keys/changes
# /_matrix/client/.*/rooms/{roomId}/messages
# to sync_api
ReverseProxy = /_matrix/client/.*?/(sync|user/.*?/filter/?.*|keys/changes|rooms/.*?/messages) http://localhost:8073
ReverseProxy = /_matrix/client http://localhost:8071
ReverseProxy = /_matrix/federation http://localhost:8072
ReverseProxy = /_matrix/key http://localhost:8072
ReverseProxy = /_matrix/media http://localhost:8074
...
}

29
docs/serverkeyformat.md Normal file
View file

@ -0,0 +1,29 @@
# Server Key Format
Dendrite stores the server signing key in the PEM format with the following structure.
```
-----BEGIN MATRIX PRIVATE KEY-----
Key-ID: ed25519:<Key Handle>
<Base64 Encoded Key Data>
-----END MATRIX PRIVATE KEY-----
```
## Converting Synapse Keys
If you have signing keys from a previous synapse server, you should ideally configure them as `old_private_keys` in your Dendrite config file. Synapse stores signing keys in the following format.
```
ed25519 <Key Handle> <Base64 Encoded Key Data>
```
To convert this key to Dendrite's PEM format, use the following template. **It is important to include the equals sign, as the key data needs to be padded to 32 bytes.**
```
-----BEGIN MATRIX PRIVATE KEY-----
Key-ID: ed25519:<Key Handle>
<Base64 Encoded Key Data>=
-----END MATRIX PRIVATE KEY-----
```

View file

@ -87,6 +87,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
case api.OutputTypeNewRoomEvent: case api.OutputTypeNewRoomEvent:
ev := &output.NewRoomEvent.Event ev := &output.NewRoomEvent.Event
if output.NewRoomEvent.RewritesState {
if err := s.db.PurgeRoomState(context.TODO(), ev.RoomID()); err != nil {
return fmt.Errorf("s.db.PurgeRoom: %w", err)
}
}
if err := s.processMessage(*output.NewRoomEvent); err != nil { if err := s.processMessage(*output.NewRoomEvent); err != nil {
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{

View file

@ -32,6 +32,7 @@ type Database interface {
GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
// GetJoinedHostsForRooms returns the complete set of servers in the rooms given. // GetJoinedHostsForRooms returns the complete set of servers in the rooms given.
GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
PurgeRoomState(ctx context.Context, roomID string) error
StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) StoreJSON(ctx context.Context, js string) (*shared.Receipt, error)

View file

@ -53,6 +53,9 @@ const insertJoinedHostsSQL = "" +
const deleteJoinedHostsSQL = "" + const deleteJoinedHostsSQL = "" +
"DELETE FROM federationsender_joined_hosts WHERE event_id = ANY($1)" "DELETE FROM federationsender_joined_hosts WHERE event_id = ANY($1)"
const deleteJoinedHostsForRoomSQL = "" +
"DELETE FROM federationsender_joined_hosts WHERE room_id = $1"
const selectJoinedHostsSQL = "" + const selectJoinedHostsSQL = "" +
"SELECT event_id, server_name FROM federationsender_joined_hosts" + "SELECT event_id, server_name FROM federationsender_joined_hosts" +
" WHERE room_id = $1" " WHERE room_id = $1"
@ -67,6 +70,7 @@ type joinedHostsStatements struct {
db *sql.DB db *sql.DB
insertJoinedHostsStmt *sql.Stmt insertJoinedHostsStmt *sql.Stmt
deleteJoinedHostsStmt *sql.Stmt deleteJoinedHostsStmt *sql.Stmt
deleteJoinedHostsForRoomStmt *sql.Stmt
selectJoinedHostsStmt *sql.Stmt selectJoinedHostsStmt *sql.Stmt
selectAllJoinedHostsStmt *sql.Stmt selectAllJoinedHostsStmt *sql.Stmt
selectJoinedHostsForRoomsStmt *sql.Stmt selectJoinedHostsForRoomsStmt *sql.Stmt
@ -86,6 +90,9 @@ func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err erro
if s.deleteJoinedHostsStmt, err = s.db.Prepare(deleteJoinedHostsSQL); err != nil { if s.deleteJoinedHostsStmt, err = s.db.Prepare(deleteJoinedHostsSQL); err != nil {
return return
} }
if s.deleteJoinedHostsForRoomStmt, err = s.db.Prepare(deleteJoinedHostsForRoomSQL); err != nil {
return
}
if s.selectJoinedHostsStmt, err = s.db.Prepare(selectJoinedHostsSQL); err != nil { if s.selectJoinedHostsStmt, err = s.db.Prepare(selectJoinedHostsSQL); err != nil {
return return
} }
@ -117,6 +124,14 @@ func (s *joinedHostsStatements) DeleteJoinedHosts(
return err return err
} }
func (s *joinedHostsStatements) DeleteJoinedHostsForRoom(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsForRoomStmt)
_, err := stmt.ExecContext(ctx, roomID)
return err
}
func (s *joinedHostsStatements) SelectJoinedHostsWithTx( func (s *joinedHostsStatements) SelectJoinedHostsWithTx(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
) ([]types.JoinedHost, error) { ) ([]types.JoinedHost, error) {

View file

@ -148,6 +148,20 @@ func (d *Database) StoreJSON(
}, nil }, nil
} }
func (d *Database) PurgeRoomState(
ctx context.Context, roomID string,
) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
// If the event is a create event then we'll delete all of the existing
// data for the room. The only reason that a create event would be replayed
// to us in this way is if we're about to receive the entire room state.
if err := d.FederationSenderJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.FederationSenderJoinedHosts.DeleteJoinedHosts: %w", err)
}
return nil
})
}
func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error { func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), txn, serverName) return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), txn, serverName)

View file

@ -53,6 +53,9 @@ const insertJoinedHostsSQL = "" +
const deleteJoinedHostsSQL = "" + const deleteJoinedHostsSQL = "" +
"DELETE FROM federationsender_joined_hosts WHERE event_id = $1" "DELETE FROM federationsender_joined_hosts WHERE event_id = $1"
const deleteJoinedHostsForRoomSQL = "" +
"DELETE FROM federationsender_joined_hosts WHERE room_id = $1"
const selectJoinedHostsSQL = "" + const selectJoinedHostsSQL = "" +
"SELECT event_id, server_name FROM federationsender_joined_hosts" + "SELECT event_id, server_name FROM federationsender_joined_hosts" +
" WHERE room_id = $1" " WHERE room_id = $1"
@ -67,6 +70,7 @@ type joinedHostsStatements struct {
db *sql.DB db *sql.DB
insertJoinedHostsStmt *sql.Stmt insertJoinedHostsStmt *sql.Stmt
deleteJoinedHostsStmt *sql.Stmt deleteJoinedHostsStmt *sql.Stmt
deleteJoinedHostsForRoomStmt *sql.Stmt
selectJoinedHostsStmt *sql.Stmt selectJoinedHostsStmt *sql.Stmt
selectAllJoinedHostsStmt *sql.Stmt selectAllJoinedHostsStmt *sql.Stmt
// selectJoinedHostsForRoomsStmt *sql.Stmt - prepared at runtime due to variadic // selectJoinedHostsForRoomsStmt *sql.Stmt - prepared at runtime due to variadic
@ -86,6 +90,9 @@ func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error)
if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil { if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil {
return return
} }
if s.deleteJoinedHostsForRoomStmt, err = s.db.Prepare(deleteJoinedHostsForRoomSQL); err != nil {
return
}
if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil { if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil {
return return
} }
@ -118,6 +125,14 @@ func (s *joinedHostsStatements) DeleteJoinedHosts(
return nil return nil
} }
func (s *joinedHostsStatements) DeleteJoinedHostsForRoom(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsForRoomStmt)
_, err := stmt.ExecContext(ctx, roomID)
return err
}
func (s *joinedHostsStatements) SelectJoinedHostsWithTx( func (s *joinedHostsStatements) SelectJoinedHostsWithTx(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
) ([]types.JoinedHost, error) { ) ([]types.JoinedHost, error) {

View file

@ -50,6 +50,7 @@ type FederationSenderQueueJSON interface {
type FederationSenderJoinedHosts interface { type FederationSenderJoinedHosts interface {
InsertJoinedHosts(ctx context.Context, txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName) error InsertJoinedHosts(ctx context.Context, txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName) error
DeleteJoinedHosts(ctx context.Context, txn *sql.Tx, eventIDs []string) error DeleteJoinedHosts(ctx context.Context, txn *sql.Tx, eventIDs []string) error
DeleteJoinedHostsForRoom(ctx context.Context, txn *sql.Tx, roomID string) error
SelectJoinedHostsWithTx(ctx context.Context, txn *sql.Tx, roomID string) ([]types.JoinedHost, error) SelectJoinedHostsWithTx(ctx context.Context, txn *sql.Tx, roomID string) ([]types.JoinedHost, error)
SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)

View file

@ -24,6 +24,9 @@ type Kafka struct {
UseNaffka bool `yaml:"use_naffka"` UseNaffka bool `yaml:"use_naffka"`
// The Naffka database is used internally by the naffka library, if used. // The Naffka database is used internally by the naffka library, if used.
Database DatabaseOptions `yaml:"naffka_database"` Database DatabaseOptions `yaml:"naffka_database"`
// The max size a Kafka message passed between consumer/producer can have
// Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka
MaxMessageBytes *int `yaml:"max_message_bytes"`
} }
func (k *Kafka) TopicFor(name string) string { func (k *Kafka) TopicFor(name string) string {
@ -36,6 +39,9 @@ func (c *Kafka) Defaults() {
c.Addresses = []string{"localhost:2181"} c.Addresses = []string{"localhost:2181"}
c.Database.ConnectionString = DataSource("file:naffka.db") c.Database.ConnectionString = DataSource("file:naffka.db")
c.TopicPrefix = "Dendrite" c.TopicPrefix = "Dendrite"
maxBytes := 1024 * 1024 * 8 // about 8MB
c.MaxMessageBytes = &maxBytes
} }
func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
@ -50,4 +56,5 @@ func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
} }
checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix)) checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes))
} }

View file

@ -17,12 +17,17 @@ func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProdu
// setupKafka creates kafka consumer/producer pair from the config. // setupKafka creates kafka consumer/producer pair from the config.
func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
consumer, err := sarama.NewConsumer(cfg.Addresses, nil) sCfg := sarama.NewConfig()
sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes
sCfg.Producer.Return.Successes = true
sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes)
consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg)
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to start kafka consumer") logrus.WithError(err).Panic("failed to start kafka consumer")
} }
producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg)
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to setup kafka producers") logrus.WithError(err).Panic("failed to setup kafka producers")
} }

View file

@ -17,7 +17,7 @@ var build string
const ( const (
VersionMajor = 0 VersionMajor = 0
VersionMinor = 2 VersionMinor = 2
VersionPatch = 0 VersionPatch = 1
VersionTag = "" // example: "rc1" VersionTag = "" // example: "rc1"
) )

View file

@ -102,7 +102,13 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er
Value: sarama.ByteEncoder(value), Value: sarama.ByteEncoder(value),
} }
} }
return r.Producer.SendMessages(messages) errs := r.Producer.SendMessages(messages)
if errs != nil {
for _, err := range errs.(sarama.ProducerErrors) {
log.WithError(err).WithField("message_bytes", err.Msg.Value.Length()).Error("Write to kafka failed")
}
}
return errs
} }
// InputRoomEvents implements api.RoomserverInternalAPI // InputRoomEvents implements api.RoomserverInternalAPI

View file

@ -116,7 +116,6 @@ type latestEventsUpdater struct {
func (u *latestEventsUpdater) doUpdateLatestEvents() error { func (u *latestEventsUpdater) doUpdateLatestEvents() error {
u.lastEventIDSent = u.updater.LastEventIDSent() u.lastEventIDSent = u.updater.LastEventIDSent()
u.oldStateNID = u.updater.CurrentStateSnapshotNID()
// If we are doing a regular event update then we will get the // If we are doing a regular event update then we will get the
// previous latest events to use as a part of the calculation. If // previous latest events to use as a part of the calculation. If
@ -125,7 +124,8 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// then start with an empty set - none of the forward extremities // then start with an empty set - none of the forward extremities
// that we knew about before matter anymore. // that we knew about before matter anymore.
oldLatest := []types.StateAtEventAndReference{} oldLatest := []types.StateAtEventAndReference{}
if !u.stateAtEvent.Overwrite { if !u.rewritesState {
u.oldStateNID = u.updater.CurrentStateSnapshotNID()
oldLatest = u.updater.LatestEvents() oldLatest = u.updater.LatestEvents()
} }
@ -153,7 +153,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// Now that we know what the latest events are, it's time to get the // Now that we know what the latest events are, it's time to get the
// latest state. // latest state.
var updates []api.OutputEvent var updates []api.OutputEvent
if extremitiesChanged { if extremitiesChanged || u.rewritesState {
if err = u.latestState(); err != nil { if err = u.latestState(); err != nil {
return fmt.Errorf("u.latestState: %w", err) return fmt.Errorf("u.latestState: %w", err)
} }
@ -324,7 +324,6 @@ func (u *latestEventsUpdater) calculateLatest(
} }
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
latestEventIDs := make([]string, len(u.latest)) latestEventIDs := make([]string, len(u.latest))
for i := range u.latest { for i := range u.latest {
latestEventIDs[i] = u.latest[i].EventID latestEventIDs[i] = u.latest[i].EventID
@ -365,11 +364,6 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err) return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
} }
} }
// State is rewritten if the input room event HasState and we actually produced a delta on state events.
// Without this check, /get_missing_events which produce events with associated (but not complete) state
// will incorrectly purge the room and set it to no state. TODO: This is likely flakey, as if /gme produced
// a state conflict res which just so happens to include 2+ events we might purge the room state downstream.
ore.RewritesState = len(ore.AddsStateEventIDs) > 1
return &api.OutputEvent{ return &api.OutputEvent{
Type: api.OutputTypeNewRoomEvent, Type: api.OutputTypeNewRoomEvent,

View file

@ -379,7 +379,7 @@ func TestOutputRewritesState(t *testing.T) {
if len(producer.producedMessages) != 1 { if len(producer.producedMessages) != 1 {
t.Fatalf("Rewritten events got output, want only 1 got %d", len(producer.producedMessages)) t.Fatalf("Rewritten events got output, want only 1 got %d", len(producer.producedMessages))
} }
outputEvent := producer.producedMessages[0] outputEvent := producer.producedMessages[len(producer.producedMessages)-1]
if !outputEvent.NewRoomEvent.RewritesState { if !outputEvent.NewRoomEvent.RewritesState {
t.Errorf("RewritesState flag not set on output event") t.Errorf("RewritesState flag not set on output event")
} }

View file

@ -149,7 +149,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
} }
if msg.RewritesState { if msg.RewritesState {
if err = s.db.PurgeRoom(ctx, ev.RoomID()); err != nil { if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil {
return fmt.Errorf("s.db.PurgeRoom: %w", err) return fmt.Errorf("s.db.PurgeRoom: %w", err)
} }
} }
@ -189,6 +189,12 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
) error { ) error {
ev := msg.Event ev := msg.Event
// TODO: The state key check when excluding from sync is designed
// to stop us from lying to clients with old state, whilst still
// allowing normal timeline events through. This is an absolute
// hack but until we have some better strategy for dealing with
// old events in the sync API, this should at least prevent us
// from confusing clients into thinking they've joined/left rooms.
pduPos, err := s.db.WriteEvent( pduPos, err := s.db.WriteEvent(
ctx, ctx,
&ev, &ev,
@ -196,7 +202,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
[]string{}, // adds no state []string{}, // adds no state
[]string{}, // removes no state []string{}, // removes no state
nil, // no transaction nil, // no transaction
false, // not excluded from sync ev.StateKey() != nil, // exclude from sync?
) )
if err != nil { if err != nil {
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database

View file

@ -43,9 +43,9 @@ type Database interface {
// Returns an error if there was a problem inserting this event. // Returns an error if there was a problem inserting this event.
WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []gomatrixserverlib.HeaderedEvent, WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []gomatrixserverlib.HeaderedEvent,
addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error) addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error)
// PurgeRoom completely purges room state from the sync API. This is done when // PurgeRoomState completely purges room state from the sync API. This is done when
// receiving an output event that completely resets the state. // receiving an output event that completely resets the state.
PurgeRoom(ctx context.Context, roomID string) error PurgeRoomState(ctx context.Context, roomID string) error
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key // GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
// If no event could be found, returns nil // If no event could be found, returns nil
// If there was an issue during the retrieval, returns an error // If there was an issue during the retrieval, returns an error

View file

@ -276,7 +276,7 @@ func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, e
return nil return nil
} }
func (d *Database) PurgeRoom( func (d *Database) PurgeRoomState(
ctx context.Context, roomID string, ctx context.Context, roomID string,
) error { ) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
@ -286,15 +286,6 @@ func (d *Database) PurgeRoom(
if err := d.CurrentRoomState.DeleteRoomStateForRoom(ctx, txn, roomID); err != nil { if err := d.CurrentRoomState.DeleteRoomStateForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.CurrentRoomState.DeleteRoomStateForRoom: %w", err) return fmt.Errorf("d.CurrentRoomState.DeleteRoomStateForRoom: %w", err)
} }
if err := d.OutputEvents.DeleteEventsForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.Events.DeleteEventsForRoom: %w", err)
}
if err := d.Topology.DeleteTopologyForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.Topology.DeleteTopologyForRoom: %w", err)
}
if err := d.BackwardExtremities.DeleteBackwardExtremitiesForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.BackwardExtremities.DeleteBackwardExtremitiesForRoom: %w", err)
}
return nil return nil
}) })
} }