Merge branch 'master' of https://github.com/matrix-org/dendrite into add-receipts-2

This commit is contained in:
Till Faelligen 2020-11-06 20:41:01 +01:00
commit fbd3fc626b
50 changed files with 765 additions and 216 deletions

View file

@ -1,5 +1,17 @@
# Changelog
## Dendrite 0.2.1 (2020-10-22)
### Fixes
* Forward extremities are now calculated using only references from other extremities, rather than including outliers, which should fix cases where state can become corrupted ([#1556](https://github.com/matrix-org/dendrite/pull/1556))
* Old state events will no longer be processed by the sync API as new, which should fix some cases where clients incorrectly believe they have joined or left rooms ([#1548](https://github.com/matrix-org/dendrite/pull/1548))
* More SQLite database locking issues have been resolved in the latest events updater ([#1554](https://github.com/matrix-org/dendrite/pull/1554))
* Internal HTTP API calls are now made using H2C (HTTP/2) in polylith mode, mitigating some potential head-of-line blocking issues ([#1541](https://github.com/matrix-org/dendrite/pull/1541))
* Roomserver output events no longer incorrectly flag state rewrites ([#1557](https://github.com/matrix-org/dendrite/pull/1557))
* Notification levels are now parsed correctly in power level events ([gomatrixserverlib#228](https://github.com/matrix-org/gomatrixserverlib/pull/228), contributed by [Pestdoktor](https://github.com/Pestdoktor))
* Invalid UTF-8 is now correctly rejected when making federation requests ([gomatrixserverlib#229](https://github.com/matrix-org/gomatrixserverlib/pull/229), contributed by [Pestdoktor](https://github.com/Pestdoktor))
## Dendrite 0.2.0 (2020-10-20)
### Important

View file

@ -54,22 +54,24 @@ The following instructions are enough to get Dendrite started as a non-federatin
```bash
$ git clone https://github.com/matrix-org/dendrite
$ cd dendrite
$ ./build.sh
# generate self-signed certificate and an event signing key for federation
$ go build ./cmd/generate-keys
$ ./generate-keys --private-key matrix_key.pem --tls-cert server.crt --tls-key server.key
# Generate a Matrix signing key for federation (required)
$ ./generate-keys --private-key matrix_key.pem
# Copy and modify the config file:
# you'll need to set a server name and paths to the keys at the very least, along with setting
# up the database filenames
# Generate a self-signed certificate (optional, but a valid TLS certificate is normally
# needed for Matrix federation/clients to work properly!)
$ ./generate-keys --tls-cert server.crt --tls-key server.key
# Copy and modify the config file - you'll need to set a server name and paths to the keys
# at the very least, along with setting up the database connection strings.
$ cp dendrite-config.yaml dendrite.yaml
# build and run the server
$ go build ./cmd/dendrite-monolith-server
# Build and run the server:
$ ./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml
```
Then point your favourite Matrix client at `http://localhost:8008`.
Then point your favourite Matrix client at `http://localhost:8008` or `https://localhost:8448`.
## Progress

View file

@ -1,5 +1,6 @@
version: "3.4"
services:
# PostgreSQL is needed for both polylith and monolith modes.
postgres:
hostname: postgres
image: postgres:9.6
@ -15,12 +16,14 @@ services:
networks:
- internal
# Zookeeper is only needed for polylith mode!
zookeeper:
hostname: zookeeper
image: zookeeper
networks:
- internal
# Kafka is only needed for polylith mode!
kafka:
container_name: dendrite_kafka
hostname: kafka
@ -29,8 +32,6 @@ services:
KAFKA_ADVERTISED_HOST_NAME: "kafka"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
ports:
- 9092:9092
depends_on:
- zookeeper
networks:

View file

@ -7,6 +7,9 @@ services:
"--tls-cert=server.crt",
"--tls-key=server.key"
]
ports:
- 8008:8008
- 8448:8448
volumes:
- ./config:/etc/dendrite
networks:

View file

@ -407,3 +407,47 @@ func checkMemberInRoom(ctx context.Context, rsAPI api.RoomserverInternalAPI, use
}
return nil
}
func SendForget(
req *http.Request, device *userapi.Device,
roomID string, rsAPI roomserverAPI.RoomserverInternalAPI,
) util.JSONResponse {
ctx := req.Context()
logger := util.GetLogger(ctx).WithField("roomID", roomID).WithField("userID", device.UserID)
var membershipRes api.QueryMembershipForUserResponse
membershipReq := api.QueryMembershipForUserRequest{
RoomID: roomID,
UserID: device.UserID,
}
err := rsAPI.QueryMembershipForUser(ctx, &membershipReq, &membershipRes)
if err != nil {
logger.WithError(err).Error("QueryMembershipForUser: could not query membership for user")
return jsonerror.InternalServerError()
}
if membershipRes.IsInRoom {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.Forbidden("user is still a member of the room"),
}
}
if !membershipRes.HasBeenInRoom {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.Forbidden("user did not belong to room"),
}
}
request := api.PerformForgetRequest{
RoomID: roomID,
UserID: device.UserID,
}
response := api.PerformForgetResponse{}
if err := rsAPI.PerformForget(ctx, &request, &response); err != nil {
logger.WithError(err).Error("PerformForget: unable to forget room")
return jsonerror.InternalServerError()
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}

View file

@ -709,6 +709,19 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/forget",
httputil.MakeAuthAPI("rooms_forget", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return SendForget(req, device, vars["roomID"], rsAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/devices",
httputil.MakeAuthAPI("get_devices", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return GetDevicesByLocalpart(req, userAPI, device)

View file

@ -36,6 +36,8 @@ global:
server_name: localhost
# The path to the signing private key file, used to sign requests and events.
# Note that this is NOT the same private key as used for TLS! To generate a
# signing key, use "./bin/generate-keys --private-key matrix_key.pem".
private_key: matrix_key.pem
# The paths and expiry timestamps (as a UNIX timestamp in millisecond precision)
@ -74,6 +76,12 @@ global:
# Kafka.
use_naffka: true
# The max size a Kafka message is allowed to use.
# You only need to change this value, if you encounter issues with too large messages.
# Must be less than/equal to "max.message.bytes" configured in Kafka.
# Defaults to 8388608 bytes.
# max_message_bytes: 8388608
# Naffka database options. Not required when using Kafka.
naffka_database:
connection_string: file:naffka.db

View file

@ -12,6 +12,8 @@ Dendrite can be run in one of two configurations:
lightweight implementation called [Naffka](https://github.com/matrix-org/naffka). This
will usually be the preferred model for low-volume, low-user or experimental deployments.
For most deployments, it is **recommended to run in monolith mode with PostgreSQL databases**.
Regardless of whether you are running in polylith or monolith mode, each Dendrite component that
requires storage has its own database. Both Postgres and SQLite are supported and can be
mixed-and-matched across components as needed in the configuration file.
@ -30,23 +32,9 @@ If you want to run a polylith deployment, you also need:
* Apache Kafka 0.10.2+
## Building up a monolith deploment
Please note that Kafka is **not required** for a monolith deployment.
Start by cloning the code:
```bash
git clone https://github.com/matrix-org/dendrite
cd dendrite
```
Then build it:
```bash
go build -o bin/dendrite-monolith-server ./cmd/dendrite-monolith-server
go build -o bin/generate-keys ./cmd/generate-keys
```
## Building up a polylith deployment
## Building Dendrite
Start by cloning the code:
@ -61,6 +49,8 @@ Then build it:
./build.sh
```
## Install Kafka (polylith only)
Install and start Kafka (c.f. [scripts/install-local-kafka.sh](scripts/install-local-kafka.sh)):
```bash
@ -96,9 +86,9 @@ Dendrite can use the built-in SQLite database engine for small setups.
The SQLite databases do not need to be pre-built - Dendrite will
create them automatically at startup.
### Postgres database setup
### PostgreSQL database setup
Assuming that Postgres 9.6 (or later) is installed:
Assuming that PostgreSQL 9.6 (or later) is installed:
* Create role, choosing a new password when prompted:
@ -118,17 +108,31 @@ Assuming that Postgres 9.6 (or later) is installed:
### Server key generation
Each Dendrite server requires unique server keys.
Each Dendrite installation requires:
In order for an instance to federate correctly, you should have a valid
certificate issued by a trusted authority, and private key to match. If you
don't and just want to test locally, generate the self-signed SSL certificate
for federation and the server signing key:
- A unique Matrix signing private key
- A valid and trusted TLS certificate and private key
To generate a Matrix signing private key:
```bash
./bin/generate-keys --private-key matrix_key.pem --tls-cert server.crt --tls-key server.key
./bin/generate-keys --private-key matrix_key.pem
```
**Warning:** Make sure take a safe backup of this key! You will likely need it if you want to reinstall Dendrite, or
any other Matrix homeserver, on the same domain name in the future. If you lose this key, you may have trouble joining
federated rooms.
For testing, you can generate a self-signed certificate and key, although this will not work for public federation:
```bash
./bin/generate-keys --tls-cert server.crt --tls-key server.key
```
If you have server keys from an older Synapse instance,
[convert them](serverkeyformat.md#converting-synapse-keys) to Dendrite's PEM
format and configure them as `old_private_keys` in your config.
### Configuration file
Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Things that will need editing include *at least*:
@ -136,9 +140,9 @@ Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Th
* The `server_name` entry to reflect the hostname of your Dendrite server
* The `database` lines with an updated connection string based on your
desired setup, e.g. replacing `database` with the name of the database:
* For Postgres: `postgres://dendrite:password@localhost/database`
* For SQLite on disk: `file:component.db` or `file:///path/to/component.db`
* Postgres and SQLite can be mixed and matched.
* For Postgres: `postgres://dendrite:password@localhost/database`, e.g. `postgres://dendrite:password@localhost/dendrite_userapi_account.db`
* For SQLite on disk: `file:component.db` or `file:///path/to/component.db`, e.g. `file:userapi_account.db`
* Postgres and SQLite can be mixed and matched on different components as desired.
* The `use_naffka` option if using Naffka in a monolith deployment
There are other options which may be useful so review them all. In particular,
@ -148,7 +152,7 @@ help to improve reliability considerably by allowing your homeserver to fetch
public keys for dead homeservers from somewhere else.
**WARNING:** Dendrite supports running all components from the same database in
Postgres mode, but this is **NOT** a supported configuration with SQLite. When
PostgreSQL mode, but this is **NOT** a supported configuration with SQLite. When
using SQLite, all components **MUST** use their own database file.
## Starting a monolith server
@ -160,8 +164,14 @@ Be sure to update the database username and password if needed.
The monolith server can be started as shown below. By default it listens for
HTTP connections on port 8008, so you can configure your Matrix client to use
`http://localhost:8008` as the server. If you set `--tls-cert` and `--tls-key`
as shown below, it will also listen for HTTPS connections on port 8448.
`http://servername:8008` as the server:
```bash
./bin/dendrite-monolith-server
```
If you set `--tls-cert` and `--tls-key` as shown below, it will also listen
for HTTPS connections on port 8448:
```bash
./bin/dendrite-monolith-server --tls-cert=server.crt --tls-key=server.key

View file

@ -0,0 +1,16 @@
VirtualHost {
...
# route requests to:
# /_matrix/client/.*/sync
# /_matrix/client/.*/user/{userId}/filter
# /_matrix/client/.*/user/{userId}/filter/{filterID}
# /_matrix/client/.*/keys/changes
# /_matrix/client/.*/rooms/{roomId}/messages
# to sync_api
ReverseProxy = /_matrix/client/.*?/(sync|user/.*?/filter/?.*|keys/changes|rooms/.*?/messages) http://localhost:8073
ReverseProxy = /_matrix/client http://localhost:8071
ReverseProxy = /_matrix/federation http://localhost:8072
ReverseProxy = /_matrix/key http://localhost:8072
ReverseProxy = /_matrix/media http://localhost:8074
...
}

29
docs/serverkeyformat.md Normal file
View file

@ -0,0 +1,29 @@
# Server Key Format
Dendrite stores the server signing key in the PEM format with the following structure.
```
-----BEGIN MATRIX PRIVATE KEY-----
Key-ID: ed25519:<Key Handle>
<Base64 Encoded Key Data>
-----END MATRIX PRIVATE KEY-----
```
## Converting Synapse Keys
If you have signing keys from a previous synapse server, you should ideally configure them as `old_private_keys` in your Dendrite config file. Synapse stores signing keys in the following format.
```
ed25519 <Key Handle> <Base64 Encoded Key Data>
```
To convert this key to Dendrite's PEM format, use the following template. **It is important to include the equals sign, as the key data needs to be padded to 32 bytes.**
```
-----BEGIN MATRIX PRIVATE KEY-----
Key-ID: ed25519:<Key Handle>
<Base64 Encoded Key Data>=
-----END MATRIX PRIVATE KEY-----
```

View file

@ -92,6 +92,10 @@ type testRoomserverAPI struct {
queryLatestEventsAndState func(*api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse
}
func (t *testRoomserverAPI) PerformForget(ctx context.Context, req *api.PerformForgetRequest, resp *api.PerformForgetResponse) error {
return nil
}
func (t *testRoomserverAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) {}
func (t *testRoomserverAPI) InputRoomEvents(

View file

@ -87,6 +87,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
case api.OutputTypeNewRoomEvent:
ev := &output.NewRoomEvent.Event
if output.NewRoomEvent.RewritesState {
if err := s.db.PurgeRoomState(context.TODO(), ev.RoomID()); err != nil {
return fmt.Errorf("s.db.PurgeRoom: %w", err)
}
}
if err := s.processMessage(*output.NewRoomEvent); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{

View file

@ -32,6 +32,7 @@ type Database interface {
GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
// GetJoinedHostsForRooms returns the complete set of servers in the rooms given.
GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
PurgeRoomState(ctx context.Context, roomID string) error
StoreJSON(ctx context.Context, js string) (*shared.Receipt, error)

View file

@ -53,6 +53,9 @@ const insertJoinedHostsSQL = "" +
const deleteJoinedHostsSQL = "" +
"DELETE FROM federationsender_joined_hosts WHERE event_id = ANY($1)"
const deleteJoinedHostsForRoomSQL = "" +
"DELETE FROM federationsender_joined_hosts WHERE room_id = $1"
const selectJoinedHostsSQL = "" +
"SELECT event_id, server_name FROM federationsender_joined_hosts" +
" WHERE room_id = $1"
@ -67,6 +70,7 @@ type joinedHostsStatements struct {
db *sql.DB
insertJoinedHostsStmt *sql.Stmt
deleteJoinedHostsStmt *sql.Stmt
deleteJoinedHostsForRoomStmt *sql.Stmt
selectJoinedHostsStmt *sql.Stmt
selectAllJoinedHostsStmt *sql.Stmt
selectJoinedHostsForRoomsStmt *sql.Stmt
@ -86,6 +90,9 @@ func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err erro
if s.deleteJoinedHostsStmt, err = s.db.Prepare(deleteJoinedHostsSQL); err != nil {
return
}
if s.deleteJoinedHostsForRoomStmt, err = s.db.Prepare(deleteJoinedHostsForRoomSQL); err != nil {
return
}
if s.selectJoinedHostsStmt, err = s.db.Prepare(selectJoinedHostsSQL); err != nil {
return
}
@ -117,6 +124,14 @@ func (s *joinedHostsStatements) DeleteJoinedHosts(
return err
}
func (s *joinedHostsStatements) DeleteJoinedHostsForRoom(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsForRoomStmt)
_, err := stmt.ExecContext(ctx, roomID)
return err
}
func (s *joinedHostsStatements) SelectJoinedHostsWithTx(
ctx context.Context, txn *sql.Tx, roomID string,
) ([]types.JoinedHost, error) {

View file

@ -148,6 +148,20 @@ func (d *Database) StoreJSON(
}, nil
}
func (d *Database) PurgeRoomState(
ctx context.Context, roomID string,
) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
// If the event is a create event then we'll delete all of the existing
// data for the room. The only reason that a create event would be replayed
// to us in this way is if we're about to receive the entire room state.
if err := d.FederationSenderJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.FederationSenderJoinedHosts.DeleteJoinedHosts: %w", err)
}
return nil
})
}
func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), txn, serverName)

View file

@ -53,6 +53,9 @@ const insertJoinedHostsSQL = "" +
const deleteJoinedHostsSQL = "" +
"DELETE FROM federationsender_joined_hosts WHERE event_id = $1"
const deleteJoinedHostsForRoomSQL = "" +
"DELETE FROM federationsender_joined_hosts WHERE room_id = $1"
const selectJoinedHostsSQL = "" +
"SELECT event_id, server_name FROM federationsender_joined_hosts" +
" WHERE room_id = $1"
@ -64,11 +67,12 @@ const selectJoinedHostsForRoomsSQL = "" +
"SELECT DISTINCT server_name FROM federationsender_joined_hosts WHERE room_id IN ($1)"
type joinedHostsStatements struct {
db *sql.DB
insertJoinedHostsStmt *sql.Stmt
deleteJoinedHostsStmt *sql.Stmt
selectJoinedHostsStmt *sql.Stmt
selectAllJoinedHostsStmt *sql.Stmt
db *sql.DB
insertJoinedHostsStmt *sql.Stmt
deleteJoinedHostsStmt *sql.Stmt
deleteJoinedHostsForRoomStmt *sql.Stmt
selectJoinedHostsStmt *sql.Stmt
selectAllJoinedHostsStmt *sql.Stmt
// selectJoinedHostsForRoomsStmt *sql.Stmt - prepared at runtime due to variadic
}
@ -86,6 +90,9 @@ func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error)
if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil {
return
}
if s.deleteJoinedHostsForRoomStmt, err = s.db.Prepare(deleteJoinedHostsForRoomSQL); err != nil {
return
}
if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil {
return
}
@ -118,6 +125,14 @@ func (s *joinedHostsStatements) DeleteJoinedHosts(
return nil
}
func (s *joinedHostsStatements) DeleteJoinedHostsForRoom(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsForRoomStmt)
_, err := stmt.ExecContext(ctx, roomID)
return err
}
func (s *joinedHostsStatements) SelectJoinedHostsWithTx(
ctx context.Context, txn *sql.Tx, roomID string,
) ([]types.JoinedHost, error) {

View file

@ -50,6 +50,7 @@ type FederationSenderQueueJSON interface {
type FederationSenderJoinedHosts interface {
InsertJoinedHosts(ctx context.Context, txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName) error
DeleteJoinedHosts(ctx context.Context, txn *sql.Tx, eventIDs []string) error
DeleteJoinedHostsForRoom(ctx context.Context, txn *sql.Tx, roomID string) error
SelectJoinedHostsWithTx(ctx context.Context, txn *sql.Tx, roomID string) ([]types.JoinedHost, error)
SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)

2
go.mod
View file

@ -33,7 +33,7 @@ require (
github.com/pressly/goose v2.7.0-rc5+incompatible
github.com/prometheus/client_golang v1.7.1
github.com/sirupsen/logrus v1.6.0
github.com/tidwall/gjson v1.6.1
github.com/tidwall/gjson v1.6.3
github.com/tidwall/sjson v1.1.1
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible

4
go.sum
View file

@ -812,8 +812,8 @@ github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpP
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/tidwall/gjson v1.6.0 h1:9VEQWz6LLMUsUl6PueE49ir4Ka6CzLymOAZDxpFsTDc=
github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/gjson v1.6.1 h1:LRbvNuNuvAiISWg6gxLEFuCe72UKy5hDqhxW/8183ws=
github.com/tidwall/gjson v1.6.1/go.mod h1:BaHyNc5bjzYkPqgLq7mdVzeiRtULKULXLgZFKsxEHI0=
github.com/tidwall/gjson v1.6.3 h1:aHoiiem0dr7GHkW001T1SMTJ7X5PvyekH5WX0whWGnI=
github.com/tidwall/gjson v1.6.3/go.mod h1:BaHyNc5bjzYkPqgLq7mdVzeiRtULKULXLgZFKsxEHI0=
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=

View file

@ -25,6 +25,9 @@ type Kafka struct {
UseNaffka bool `yaml:"use_naffka"`
// The Naffka database is used internally by the naffka library, if used.
Database DatabaseOptions `yaml:"naffka_database"`
// The max size a Kafka message passed between consumer/producer can have
// Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka
MaxMessageBytes *int `yaml:"max_message_bytes"`
}
func (k *Kafka) TopicFor(name string) string {
@ -37,6 +40,9 @@ func (c *Kafka) Defaults() {
c.Addresses = []string{"localhost:2181"}
c.Database.ConnectionString = DataSource("file:naffka.db")
c.TopicPrefix = "Dendrite"
maxBytes := 1024 * 1024 * 8 // about 8MB
c.MaxMessageBytes = &maxBytes
}
func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
@ -51,4 +57,5 @@ func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
}
checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes))
}

View file

@ -17,12 +17,17 @@ func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProdu
// setupKafka creates kafka consumer/producer pair from the config.
func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
consumer, err := sarama.NewConsumer(cfg.Addresses, nil)
sCfg := sarama.NewConfig()
sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes
sCfg.Producer.Return.Successes = true
sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes)
consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg)
if err != nil {
logrus.WithError(err).Panic("failed to start kafka consumer")
}
producer, err := sarama.NewSyncProducer(cfg.Addresses, nil)
producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg)
if err != nil {
logrus.WithError(err).Panic("failed to setup kafka producers")
}

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 2
VersionPatch = 0
VersionPatch = 1
VersionTag = "" // example: "rc1"
)

View file

@ -147,6 +147,9 @@ type RoomserverInternalAPI interface {
response *PerformBackfillResponse,
) error
// PerformForget forgets a rooms history for a specific user
PerformForget(ctx context.Context, req *PerformForgetRequest, resp *PerformForgetResponse) error
// Asks for the default room version as preferred by the server.
QueryRoomVersionCapabilities(
ctx context.Context,

View file

@ -194,6 +194,16 @@ func (t *RoomserverInternalAPITrace) PerformBackfill(
return err
}
func (t *RoomserverInternalAPITrace) PerformForget(
ctx context.Context,
req *PerformForgetRequest,
res *PerformForgetResponse,
) error {
err := t.Impl.PerformForget(ctx, req, res)
util.GetLogger(ctx).WithError(err).Infof("PerformForget req=%+v res=%+v", js(req), js(res))
return err
}
func (t *RoomserverInternalAPITrace) QueryRoomVersionCapabilities(
ctx context.Context,
req *QueryRoomVersionCapabilitiesRequest,

View file

@ -159,3 +159,11 @@ type PerformPublishResponse struct {
// If non-nil, the publish request failed. Contains more information why it failed.
Error *PerformError
}
// PerformForgetRequest is a request to PerformForget
type PerformForgetRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
}
type PerformForgetResponse struct{}

View file

@ -140,7 +140,9 @@ type QueryMembershipForUserResponse struct {
// True if the user is in room.
IsInRoom bool `json:"is_in_room"`
// The current membership
Membership string
Membership string `json:"membership"`
// True if the user asked to forget this room.
IsRoomForgotten bool `json:"is_room_forgotten"`
}
// QueryMembershipsForRoomRequest is a request to QueryMembershipsForRoom
@ -160,6 +162,8 @@ type QueryMembershipsForRoomResponse struct {
// True if the user has been in room before and has either stayed in it or
// left it.
HasBeenInRoom bool `json:"has_been_in_room"`
// True if the user asked to forget this room.
IsRoomForgotten bool `json:"is_room_forgotten"`
}
// QueryServerJoinedToRoomRequest is a request to QueryServerJoinedToRoom

View file

@ -26,6 +26,7 @@ type RoomserverInternalAPI struct {
*perform.Leaver
*perform.Publisher
*perform.Backfiller
*perform.Forgetter
DB storage.Database
Cfg *config.RoomServer
Producer sarama.SyncProducer
@ -112,6 +113,9 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
// than trying random servers
PreferServers: r.PerspectiveServerNames,
}
r.Forgetter = &perform.Forgetter{
DB: r.DB,
}
}
func (r *RoomserverInternalAPI) PerformInvite(
@ -143,3 +147,11 @@ func (r *RoomserverInternalAPI) PerformLeave(
}
return r.WriteOutputEvents(req.RoomID, outputEvents)
}
func (r *RoomserverInternalAPI) PerformForget(
ctx context.Context,
req *api.PerformForgetRequest,
resp *api.PerformForgetResponse,
) error {
return r.Forgetter.PerformForget(ctx, req, resp)
}

View file

@ -102,7 +102,13 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er
Value: sarama.ByteEncoder(value),
}
}
return r.Producer.SendMessages(messages)
errs := r.Producer.SendMessages(messages)
if errs != nil {
for _, err := range errs.(sarama.ProducerErrors) {
log.WithError(err).WithField("message_bytes", err.Msg.Value.Length()).Error("Write to kafka failed")
}
}
return errs
}
// InputRoomEvents implements api.RoomserverInternalAPI

View file

@ -116,7 +116,6 @@ type latestEventsUpdater struct {
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
u.lastEventIDSent = u.updater.LastEventIDSent()
u.oldStateNID = u.updater.CurrentStateSnapshotNID()
// If we are doing a regular event update then we will get the
// previous latest events to use as a part of the calculation. If
@ -125,7 +124,8 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// then start with an empty set - none of the forward extremities
// that we knew about before matter anymore.
oldLatest := []types.StateAtEventAndReference{}
if !u.stateAtEvent.Overwrite {
if !u.rewritesState {
u.oldStateNID = u.updater.CurrentStateSnapshotNID()
oldLatest = u.updater.LatestEvents()
}
@ -153,7 +153,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// Now that we know what the latest events are, it's time to get the
// latest state.
var updates []api.OutputEvent
if extremitiesChanged {
if extremitiesChanged || u.rewritesState {
if err = u.latestState(); err != nil {
return fmt.Errorf("u.latestState: %w", err)
}
@ -324,7 +324,6 @@ func (u *latestEventsUpdater) calculateLatest(
}
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
latestEventIDs := make([]string, len(u.latest))
for i := range u.latest {
latestEventIDs[i] = u.latest[i].EventID
@ -365,11 +364,6 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
}
}
// State is rewritten if the input room event HasState and we actually produced a delta on state events.
// Without this check, /get_missing_events which produce events with associated (but not complete) state
// will incorrectly purge the room and set it to no state. TODO: This is likely flakey, as if /gme produced
// a state conflict res which just so happens to include 2+ events we might purge the room state downstream.
ore.RewritesState = len(ore.AddsStateEventIDs) > 1
return &api.OutputEvent{
Type: api.OutputTypeNewRoomEvent,

View file

@ -0,0 +1,35 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package perform
import (
"context"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage"
)
type Forgetter struct {
DB storage.Database
}
// PerformForget implements api.RoomServerQueryAPI
func (f *Forgetter) PerformForget(
ctx context.Context,
request *api.PerformForgetRequest,
response *api.PerformForgetResponse,
) error {
return f.DB.ForgetRoom(ctx, request.UserID, request.RoomID, true)
}

View file

@ -86,7 +86,7 @@ func (r *Inviter) PerformInvite(
var isAlreadyJoined bool
if info != nil {
_, isAlreadyJoined, err = r.DB.GetMembership(ctx, info.RoomNID, *event.StateKey())
_, isAlreadyJoined, _, err = r.DB.GetMembership(ctx, info.RoomNID, *event.StateKey())
if err != nil {
return nil, fmt.Errorf("r.DB.GetMembership: %w", err)
}

View file

@ -204,11 +204,13 @@ func (r *Queryer) QueryMembershipForUser(
return fmt.Errorf("QueryMembershipForUser: unknown room %s", request.RoomID)
}
membershipEventNID, stillInRoom, err := r.DB.GetMembership(ctx, info.RoomNID, request.UserID)
membershipEventNID, stillInRoom, isRoomforgotten, err := r.DB.GetMembership(ctx, info.RoomNID, request.UserID)
if err != nil {
return err
}
response.IsRoomForgotten = isRoomforgotten
if membershipEventNID == 0 {
response.HasBeenInRoom = false
return nil
@ -241,11 +243,13 @@ func (r *Queryer) QueryMembershipsForRoom(
return err
}
membershipEventNID, stillInRoom, err := r.DB.GetMembership(ctx, info.RoomNID, request.Sender)
membershipEventNID, stillInRoom, isRoomforgotten, err := r.DB.GetMembership(ctx, info.RoomNID, request.Sender)
if err != nil {
return err
}
response.IsRoomForgotten = isRoomforgotten
if membershipEventNID == 0 {
response.HasBeenInRoom = false
response.JoinEvents = nil

View file

@ -31,6 +31,7 @@ const (
RoomserverPerformLeavePath = "/roomserver/performLeave"
RoomserverPerformBackfillPath = "/roomserver/performBackfill"
RoomserverPerformPublishPath = "/roomserver/performPublish"
RoomserverPerformForgetPath = "/roomserver/performForget"
// Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@ -492,3 +493,12 @@ func (h *httpRoomserverInternalAPI) QueryServerBannedFromRoom(
apiURL := h.roomserverURL + RoomserverQueryServerBannedFromRoomPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
}
func (h *httpRoomserverInternalAPI) PerformForget(ctx context.Context, req *api.PerformForgetRequest, res *api.PerformForgetResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformForget")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverPerformForgetPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
}

View file

@ -251,6 +251,20 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(
RoomserverPerformForgetPath,
httputil.MakeInternalAPI("PerformForget", func(req *http.Request) util.JSONResponse {
var request api.PerformForgetRequest
var response api.PerformForgetResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := r.PerformForget(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(
RoomserverQueryRoomVersionCapabilitiesPath,
httputil.MakeInternalAPI("QueryRoomVersionCapabilities", func(req *http.Request) util.JSONResponse {

View file

@ -379,7 +379,7 @@ func TestOutputRewritesState(t *testing.T) {
if len(producer.producedMessages) != 1 {
t.Fatalf("Rewritten events got output, want only 1 got %d", len(producer.producedMessages))
}
outputEvent := producer.producedMessages[0]
outputEvent := producer.producedMessages[len(producer.producedMessages)-1]
if !outputEvent.NewRoomEvent.RewritesState {
t.Errorf("RewritesState flag not set on output event")
}

View file

@ -126,7 +126,7 @@ type Database interface {
// in this room, along a boolean set to true if the user is still in this room,
// false if not.
// Returns an error if there was a problem talking to the database.
GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error)
GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom, isRoomForgotten bool, err error)
// Lookup the membership event numeric IDs for all user that are or have
// been members of a given room. Only lookup events of "join" membership if
// joinOnly is set to true.
@ -158,4 +158,6 @@ type Database interface {
GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error)
// GetKnownRooms returns a list of all rooms we know about.
GetKnownRooms(ctx context.Context) ([]string, error)
// ForgetRoom sets a flag in the membership table, that the user wishes to forget a specific room
ForgetRoom(ctx context.Context, userID, roomID string, forget bool) error
}

View file

@ -0,0 +1,47 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deltas
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
}
func LoadAddForgottenColumn(m *sqlutil.Migrations) {
m.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
}
func UpAddForgottenColumn(tx *sql.Tx) error {
_, err := tx.Exec(`ALTER TABLE roomserver_membership ADD COLUMN IF NOT EXISTS forgotten BOOLEAN NOT NULL DEFAULT false;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownAddForgottenColumn(tx *sql.Tx) error {
_, err := tx.Exec(`ALTER TABLE roomserver_membership DROP COLUMN IF EXISTS forgotten;`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
return nil
}

View file

@ -60,13 +60,15 @@ CREATE TABLE IF NOT EXISTS roomserver_membership (
-- a federated one. This is an optimisation for resetting state on federated
-- room joins.
target_local BOOLEAN NOT NULL DEFAULT false,
forgotten BOOLEAN NOT NULL DEFAULT FALSE,
UNIQUE (room_nid, target_nid)
);
`
var selectJoinedUsersSetForRoomsSQL = "" +
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership WHERE room_nid = ANY($1) AND" +
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " GROUP BY target_nid"
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" +
" GROUP BY target_nid"
// Insert a row in to membership table so that it can be locked by the
// SELECT FOR UPDATE
@ -76,37 +78,41 @@ const insertMembershipSQL = "" +
" ON CONFLICT DO NOTHING"
const selectMembershipFromRoomAndTargetSQL = "" +
"SELECT membership_nid, event_nid FROM roomserver_membership" +
"SELECT membership_nid, event_nid, forgotten FROM roomserver_membership" +
" WHERE room_nid = $1 AND target_nid = $2"
const selectMembershipsFromRoomAndMembershipSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1 AND membership_nid = $2"
" WHERE room_nid = $1 AND membership_nid = $2 and forgotten = false"
const selectLocalMembershipsFromRoomAndMembershipSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1 AND membership_nid = $2" +
" AND target_local = true"
" AND target_local = true and forgotten = false"
const selectMembershipsFromRoomSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1"
" WHERE room_nid = $1 and forgotten = false"
const selectLocalMembershipsFromRoomSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1" +
" AND target_local = true"
" AND target_local = true and forgotten = false"
const selectMembershipForUpdateSQL = "" +
"SELECT membership_nid FROM roomserver_membership" +
" WHERE room_nid = $1 AND target_nid = $2 FOR UPDATE"
const updateMembershipSQL = "" +
"UPDATE roomserver_membership SET sender_nid = $3, membership_nid = $4, event_nid = $5" +
"UPDATE roomserver_membership SET sender_nid = $3, membership_nid = $4, event_nid = $5, forgotten = $6" +
" WHERE room_nid = $1 AND target_nid = $2"
const updateMembershipForgetRoom = "" +
"UPDATE roomserver_membership SET forgotten = $3" +
" WHERE room_nid = $1 AND target_nid = $2"
const selectRoomsWithMembershipSQL = "" +
"SELECT room_nid FROM roomserver_membership WHERE membership_nid = $1 AND target_nid = $2"
"SELECT room_nid FROM roomserver_membership WHERE membership_nid = $1 AND target_nid = $2 and forgotten = false"
// selectKnownUsersSQL uses a sub-select statement here to find rooms that the user is
// joined to. Since this information is used to populate the user directory, we will
@ -130,6 +136,7 @@ type membershipStatements struct {
selectRoomsWithMembershipStmt *sql.Stmt
selectJoinedUsersSetForRoomsStmt *sql.Stmt
selectKnownUsersStmt *sql.Stmt
updateMembershipForgetRoomStmt *sql.Stmt
}
func NewPostgresMembershipTable(db *sql.DB) (tables.Membership, error) {
@ -151,9 +158,15 @@ func NewPostgresMembershipTable(db *sql.DB) (tables.Membership, error) {
{&s.selectRoomsWithMembershipStmt, selectRoomsWithMembershipSQL},
{&s.selectJoinedUsersSetForRoomsStmt, selectJoinedUsersSetForRoomsSQL},
{&s.selectKnownUsersStmt, selectKnownUsersSQL},
{&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom},
}.Prepare(db)
}
func (s *membershipStatements) execSchema(db *sql.DB) error {
_, err := db.Exec(membershipSchema)
return err
}
func (s *membershipStatements) InsertMembership(
ctx context.Context,
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
@ -177,10 +190,10 @@ func (s *membershipStatements) SelectMembershipForUpdate(
func (s *membershipStatements) SelectMembershipFromRoomAndTarget(
ctx context.Context,
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
) (eventNID types.EventNID, membership tables.MembershipState, err error) {
) (eventNID types.EventNID, membership tables.MembershipState, forgotten bool, err error) {
err = s.selectMembershipFromRoomAndTargetStmt.QueryRowContext(
ctx, roomNID, targetUserNID,
).Scan(&membership, &eventNID)
).Scan(&membership, &eventNID, &forgotten)
return
}
@ -238,12 +251,11 @@ func (s *membershipStatements) SelectMembershipsFromRoomAndMembership(
func (s *membershipStatements) UpdateMembership(
ctx context.Context,
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
senderUserNID types.EventStateKeyNID, membership tables.MembershipState,
eventNID types.EventNID,
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, senderUserNID types.EventStateKeyNID, membership tables.MembershipState,
eventNID types.EventNID, forgotten bool,
) error {
_, err := sqlutil.TxStmt(txn, s.updateMembershipStmt).ExecContext(
ctx, roomNID, targetUserNID, senderUserNID, membership, eventNID,
ctx, roomNID, targetUserNID, senderUserNID, membership, eventNID, forgotten,
)
return err
}
@ -305,3 +317,14 @@ func (s *membershipStatements) SelectKnownUsers(ctx context.Context, userID type
}
return result, rows.Err()
}
func (s *membershipStatements) UpdateForgetMembership(
ctx context.Context,
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
forget bool,
) error {
_, err := sqlutil.TxStmt(txn, s.updateMembershipForgetRoomStmt).ExecContext(
ctx, roomNID, targetUserNID, forget,
)
return err
}

View file

@ -18,12 +18,13 @@ package postgres
import (
"database/sql"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
)
@ -33,7 +34,6 @@ type Database struct {
}
// Open a postgres database.
// nolint: gocyclo
func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) {
var d Database
var db *sql.DB
@ -41,61 +41,82 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
if db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
// Create tables before executing migrations so we don't fail if the table is missing,
// and THEN prepare statements so we don't fail due to referencing new columns
ms := membershipStatements{}
if err := ms.execSchema(db); err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadAddForgottenColumn(m)
if err := m.RunDeltas(db, dbProperties); err != nil {
return nil, err
}
if err := d.prepare(db, cache); err != nil {
return nil, err
}
return &d, nil
}
// nolint: gocyclo
func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) (err error) {
eventStateKeys, err := NewPostgresEventStateKeysTable(db)
if err != nil {
return nil, err
return err
}
eventTypes, err := NewPostgresEventTypesTable(db)
if err != nil {
return nil, err
return err
}
eventJSON, err := NewPostgresEventJSONTable(db)
if err != nil {
return nil, err
return err
}
events, err := NewPostgresEventsTable(db)
if err != nil {
return nil, err
return err
}
rooms, err := NewPostgresRoomsTable(db)
if err != nil {
return nil, err
return err
}
transactions, err := NewPostgresTransactionsTable(db)
if err != nil {
return nil, err
return err
}
stateBlock, err := NewPostgresStateBlockTable(db)
if err != nil {
return nil, err
return err
}
stateSnapshot, err := NewPostgresStateSnapshotTable(db)
if err != nil {
return nil, err
return err
}
roomAliases, err := NewPostgresRoomAliasesTable(db)
if err != nil {
return nil, err
return err
}
prevEvents, err := NewPostgresPreviousEventsTable(db)
if err != nil {
return nil, err
return err
}
invites, err := NewPostgresInvitesTable(db)
if err != nil {
return nil, err
return err
}
membership, err := NewPostgresMembershipTable(db)
if err != nil {
return nil, err
return err
}
published, err := NewPostgresPublishedTable(db)
if err != nil {
return nil, err
return err
}
redactions, err := NewPostgresRedactionsTable(db)
if err != nil {
return nil, err
return err
}
d.Database = shared.Database{
DB: db,
@ -116,5 +137,5 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
PublishedTable: published,
RedactionsTable: redactions,
}
return &d, nil
return nil
}

View file

@ -101,9 +101,7 @@ func (u *MembershipUpdater) SetToInvite(event gomatrixserverlib.Event) (bool, er
return fmt.Errorf("u.d.InvitesTable.InsertInviteEvent: %w", err)
}
if u.membership != tables.MembershipStateInvite {
if err = u.d.MembershipTable.UpdateMembership(
u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateInvite, 0,
); err != nil {
if err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateInvite, 0, false); err != nil {
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
}
}
@ -139,10 +137,7 @@ func (u *MembershipUpdater) SetToJoin(senderUserID string, eventID string, isUpd
}
if u.membership != tables.MembershipStateJoin || isUpdate {
if err = u.d.MembershipTable.UpdateMembership(
u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID,
tables.MembershipStateJoin, nIDs[eventID],
); err != nil {
if err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateJoin, nIDs[eventID], false); err != nil {
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
}
}
@ -176,10 +171,7 @@ func (u *MembershipUpdater) SetToLeave(senderUserID string, eventID string) ([]s
}
if u.membership != tables.MembershipStateLeaveOrBan {
if err = u.d.MembershipTable.UpdateMembership(
u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID,
tables.MembershipStateLeaveOrBan, nIDs[eventID],
); err != nil {
if err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateLeaveOrBan, nIDs[eventID], false); err != nil {
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
}
}

View file

@ -258,30 +258,28 @@ func (d *Database) RemoveRoomAlias(ctx context.Context, alias string) error {
})
}
func (d *Database) GetMembership(
ctx context.Context, roomNID types.RoomNID, requestSenderUserID string,
) (membershipEventNID types.EventNID, stillInRoom bool, err error) {
func (d *Database) GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom, isRoomforgotten bool, err error) {
var requestSenderUserNID types.EventStateKeyNID
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
requestSenderUserNID, err = d.assignStateKeyNID(ctx, txn, requestSenderUserID)
return err
})
if err != nil {
return 0, false, fmt.Errorf("d.assignStateKeyNID: %w", err)
return 0, false, false, fmt.Errorf("d.assignStateKeyNID: %w", err)
}
senderMembershipEventNID, senderMembership, err :=
senderMembershipEventNID, senderMembership, isRoomforgotten, err :=
d.MembershipTable.SelectMembershipFromRoomAndTarget(
ctx, roomNID, requestSenderUserNID,
)
if err == sql.ErrNoRows {
// The user has never been a member of that room
return 0, false, nil
return 0, false, false, nil
} else if err != nil {
return
}
return senderMembershipEventNID, senderMembership == tables.MembershipStateJoin, nil
return senderMembershipEventNID, senderMembership == tables.MembershipStateJoin, isRoomforgotten, nil
}
func (d *Database) GetMembershipEventNIDsForRoom(
@ -992,6 +990,25 @@ func (d *Database) GetKnownRooms(ctx context.Context) ([]string, error) {
return d.RoomsTable.SelectRoomIDs(ctx)
}
// ForgetRoom sets a users room to forgotten
func (d *Database) ForgetRoom(ctx context.Context, userID, roomID string, forget bool) error {
roomNIDs, err := d.RoomsTable.BulkSelectRoomNIDs(ctx, []string{roomID})
if err != nil {
return err
}
if len(roomNIDs) > 1 {
return fmt.Errorf("expected one room, got %d", len(roomNIDs))
}
stateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, userID)
if err != nil {
return err
}
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.MembershipTable.UpdateForgetMembership(ctx, nil, roomNIDs[0], stateKeyNID, forget)
})
}
// FIXME TODO: Remove all this - horrible dupe with roomserver/state. Can't use the original impl because of circular loops
// it should live in this package!

View file

@ -0,0 +1,82 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deltas
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
}
func LoadAddForgottenColumn(m *sqlutil.Migrations) {
m.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
}
func UpAddForgottenColumn(tx *sql.Tx) error {
_, err := tx.Exec(` ALTER TABLE roomserver_membership RENAME TO roomserver_membership_tmp;
CREATE TABLE IF NOT EXISTS roomserver_membership (
room_nid INTEGER NOT NULL,
target_nid INTEGER NOT NULL,
sender_nid INTEGER NOT NULL DEFAULT 0,
membership_nid INTEGER NOT NULL DEFAULT 1,
event_nid INTEGER NOT NULL DEFAULT 0,
target_local BOOLEAN NOT NULL DEFAULT false,
forgotten BOOLEAN NOT NULL DEFAULT false,
UNIQUE (room_nid, target_nid)
);
INSERT
INTO roomserver_membership (
room_nid, target_nid, sender_nid, membership_nid, event_nid, target_local
) SELECT
room_nid, target_nid, sender_nid, membership_nid, event_nid, target_local
FROM roomserver_membership_tmp
;
DROP TABLE roomserver_membership_tmp;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownAddForgottenColumn(tx *sql.Tx) error {
_, err := tx.Exec(` ALTER TABLE roomserver_membership RENAME TO roomserver_membership_tmp;
CREATE TABLE IF NOT EXISTS roomserver_membership (
room_nid INTEGER NOT NULL,
target_nid INTEGER NOT NULL,
sender_nid INTEGER NOT NULL DEFAULT 0,
membership_nid INTEGER NOT NULL DEFAULT 1,
event_nid INTEGER NOT NULL DEFAULT 0,
target_local BOOLEAN NOT NULL DEFAULT false,
UNIQUE (room_nid, target_nid)
);
INSERT
INTO roomserver_membership (
room_nid, target_nid, sender_nid, membership_nid, event_nid, target_local
) SELECT
room_nid, target_nid, sender_nid, membership_nid, event_nid, target_local
FROM roomserver_membership_tmp
;
DROP TABLE roomserver_membership_tmp;`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
return nil
}

View file

@ -36,13 +36,15 @@ const membershipSchema = `
membership_nid INTEGER NOT NULL DEFAULT 1,
event_nid INTEGER NOT NULL DEFAULT 0,
target_local BOOLEAN NOT NULL DEFAULT false,
forgotten BOOLEAN NOT NULL DEFAULT false,
UNIQUE (room_nid, target_nid)
);
`
var selectJoinedUsersSetForRoomsSQL = "" +
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership WHERE room_nid IN ($1) AND" +
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " GROUP BY target_nid"
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" +
" GROUP BY target_nid"
// Insert a row in to membership table so that it can be locked by the
// SELECT FOR UPDATE
@ -52,37 +54,41 @@ const insertMembershipSQL = "" +
" ON CONFLICT DO NOTHING"
const selectMembershipFromRoomAndTargetSQL = "" +
"SELECT membership_nid, event_nid FROM roomserver_membership" +
"SELECT membership_nid, event_nid, forgotten FROM roomserver_membership" +
" WHERE room_nid = $1 AND target_nid = $2"
const selectMembershipsFromRoomAndMembershipSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1 AND membership_nid = $2"
" WHERE room_nid = $1 AND membership_nid = $2 and forgotten = false"
const selectLocalMembershipsFromRoomAndMembershipSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1 AND membership_nid = $2" +
" AND target_local = true"
" AND target_local = true and forgotten = false"
const selectMembershipsFromRoomSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1"
" WHERE room_nid = $1 and forgotten = false"
const selectLocalMembershipsFromRoomSQL = "" +
"SELECT event_nid FROM roomserver_membership" +
" WHERE room_nid = $1" +
" AND target_local = true"
" AND target_local = true and forgotten = false"
const selectMembershipForUpdateSQL = "" +
"SELECT membership_nid FROM roomserver_membership" +
" WHERE room_nid = $1 AND target_nid = $2"
const updateMembershipSQL = "" +
"UPDATE roomserver_membership SET sender_nid = $1, membership_nid = $2, event_nid = $3" +
" WHERE room_nid = $4 AND target_nid = $5"
"UPDATE roomserver_membership SET sender_nid = $1, membership_nid = $2, event_nid = $3, forgotten = $4" +
" WHERE room_nid = $5 AND target_nid = $6"
const updateMembershipForgetRoom = "" +
"UPDATE roomserver_membership SET forgotten = $1" +
" WHERE room_nid = $2 AND target_nid = $3"
const selectRoomsWithMembershipSQL = "" +
"SELECT room_nid FROM roomserver_membership WHERE membership_nid = $1 AND target_nid = $2"
"SELECT room_nid FROM roomserver_membership WHERE membership_nid = $1 AND target_nid = $2 and forgotten = false"
// selectKnownUsersSQL uses a sub-select statement here to find rooms that the user is
// joined to. Since this information is used to populate the user directory, we will
@ -106,16 +112,13 @@ type membershipStatements struct {
selectRoomsWithMembershipStmt *sql.Stmt
updateMembershipStmt *sql.Stmt
selectKnownUsersStmt *sql.Stmt
updateMembershipForgetRoomStmt *sql.Stmt
}
func NewSqliteMembershipTable(db *sql.DB) (tables.Membership, error) {
s := &membershipStatements{
db: db,
}
_, err := db.Exec(membershipSchema)
if err != nil {
return nil, err
}
return s, shared.StatementList{
{&s.insertMembershipStmt, insertMembershipSQL},
@ -128,9 +131,15 @@ func NewSqliteMembershipTable(db *sql.DB) (tables.Membership, error) {
{&s.updateMembershipStmt, updateMembershipSQL},
{&s.selectRoomsWithMembershipStmt, selectRoomsWithMembershipSQL},
{&s.selectKnownUsersStmt, selectKnownUsersSQL},
{&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom},
}.Prepare(db)
}
func (s *membershipStatements) execSchema(db *sql.DB) error {
_, err := db.Exec(membershipSchema)
return err
}
func (s *membershipStatements) InsertMembership(
ctx context.Context, txn *sql.Tx,
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
@ -155,10 +164,10 @@ func (s *membershipStatements) SelectMembershipForUpdate(
func (s *membershipStatements) SelectMembershipFromRoomAndTarget(
ctx context.Context,
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
) (eventNID types.EventNID, membership tables.MembershipState, err error) {
) (eventNID types.EventNID, membership tables.MembershipState, forgotten bool, err error) {
err = s.selectMembershipFromRoomAndTargetStmt.QueryRowContext(
ctx, roomNID, targetUserNID,
).Scan(&membership, &eventNID)
).Scan(&membership, &eventNID, &forgotten)
return
}
@ -216,13 +225,12 @@ func (s *membershipStatements) SelectMembershipsFromRoomAndMembership(
func (s *membershipStatements) UpdateMembership(
ctx context.Context, txn *sql.Tx,
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
senderUserNID types.EventStateKeyNID, membership tables.MembershipState,
eventNID types.EventNID,
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, senderUserNID types.EventStateKeyNID, membership tables.MembershipState,
eventNID types.EventNID, forgotten bool,
) error {
stmt := sqlutil.TxStmt(txn, s.updateMembershipStmt)
_, err := stmt.ExecContext(
ctx, senderUserNID, membership, eventNID, roomNID, targetUserNID,
ctx, senderUserNID, membership, eventNID, forgotten, roomNID, targetUserNID,
)
return err
}
@ -285,3 +293,14 @@ func (s *membershipStatements) SelectKnownUsers(ctx context.Context, userID type
}
return result, rows.Err()
}
func (s *membershipStatements) UpdateForgetMembership(
ctx context.Context,
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
forget bool,
) error {
_, err := sqlutil.TxStmt(txn, s.updateMembershipForgetRoomStmt).ExecContext(
ctx, forget, roomNID, targetUserNID,
)
return err
}

View file

@ -19,127 +19,138 @@ import (
"context"
"database/sql"
_ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
_ "github.com/mattn/go-sqlite3"
)
// A Database is used to store room events and stream offsets.
type Database struct {
shared.Database
events tables.Events
eventJSON tables.EventJSON
eventTypes tables.EventTypes
eventStateKeys tables.EventStateKeys
rooms tables.Rooms
transactions tables.Transactions
prevEvents tables.PreviousEvents
invites tables.Invites
membership tables.Membership
db *sql.DB
writer sqlutil.Writer
}
// Open a sqlite database.
// nolint: gocyclo
func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) {
var d Database
var db *sql.DB
var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil {
if db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
d.writer = sqlutil.NewExclusiveWriter()
//d.db.Exec("PRAGMA journal_mode=WAL;")
//d.db.Exec("PRAGMA read_uncommitted = true;")
//db.Exec("PRAGMA journal_mode=WAL;")
//db.Exec("PRAGMA read_uncommitted = true;")
// FIXME: We are leaking connections somewhere. Setting this to 2 will eventually
// cause the roomserver to be unresponsive to new events because something will
// acquire the global mutex and never unlock it because it is waiting for a connection
// which it will never obtain.
d.db.SetMaxOpenConns(20)
db.SetMaxOpenConns(20)
d.eventStateKeys, err = NewSqliteEventStateKeysTable(d.db)
if err != nil {
// Create tables before executing migrations so we don't fail if the table is missing,
// and THEN prepare statements so we don't fail due to referencing new columns
ms := membershipStatements{}
if err := ms.execSchema(db); err != nil {
return nil, err
}
d.eventTypes, err = NewSqliteEventTypesTable(d.db)
if err != nil {
m := sqlutil.NewMigrations()
deltas.LoadAddForgottenColumn(m)
if err := m.RunDeltas(db, dbProperties); err != nil {
return nil, err
}
d.eventJSON, err = NewSqliteEventJSONTable(d.db)
if err != nil {
if err := d.prepare(db, cache); err != nil {
return nil, err
}
d.events, err = NewSqliteEventsTable(d.db)
return &d, nil
}
// nolint: gocyclo
func (d *Database) prepare(db *sql.DB, cache caching.RoomServerCaches) error {
var err error
eventStateKeys, err := NewSqliteEventStateKeysTable(db)
if err != nil {
return nil, err
return err
}
d.rooms, err = NewSqliteRoomsTable(d.db)
eventTypes, err := NewSqliteEventTypesTable(db)
if err != nil {
return nil, err
return err
}
d.transactions, err = NewSqliteTransactionsTable(d.db)
eventJSON, err := NewSqliteEventJSONTable(db)
if err != nil {
return nil, err
return err
}
stateBlock, err := NewSqliteStateBlockTable(d.db)
events, err := NewSqliteEventsTable(db)
if err != nil {
return nil, err
return err
}
stateSnapshot, err := NewSqliteStateSnapshotTable(d.db)
rooms, err := NewSqliteRoomsTable(db)
if err != nil {
return nil, err
return err
}
d.prevEvents, err = NewSqlitePrevEventsTable(d.db)
transactions, err := NewSqliteTransactionsTable(db)
if err != nil {
return nil, err
return err
}
roomAliases, err := NewSqliteRoomAliasesTable(d.db)
stateBlock, err := NewSqliteStateBlockTable(db)
if err != nil {
return nil, err
return err
}
d.invites, err = NewSqliteInvitesTable(d.db)
stateSnapshot, err := NewSqliteStateSnapshotTable(db)
if err != nil {
return nil, err
return err
}
d.membership, err = NewSqliteMembershipTable(d.db)
prevEvents, err := NewSqlitePrevEventsTable(db)
if err != nil {
return nil, err
return err
}
published, err := NewSqlitePublishedTable(d.db)
roomAliases, err := NewSqliteRoomAliasesTable(db)
if err != nil {
return nil, err
return err
}
redactions, err := NewSqliteRedactionsTable(d.db)
invites, err := NewSqliteInvitesTable(db)
if err != nil {
return nil, err
return err
}
membership, err := NewSqliteMembershipTable(db)
if err != nil {
return err
}
published, err := NewSqlitePublishedTable(db)
if err != nil {
return err
}
redactions, err := NewSqliteRedactionsTable(db)
if err != nil {
return err
}
d.Database = shared.Database{
DB: d.db,
DB: db,
Cache: cache,
Writer: d.writer,
EventsTable: d.events,
EventTypesTable: d.eventTypes,
EventStateKeysTable: d.eventStateKeys,
EventJSONTable: d.eventJSON,
RoomsTable: d.rooms,
TransactionsTable: d.transactions,
Writer: sqlutil.NewExclusiveWriter(),
EventsTable: events,
EventTypesTable: eventTypes,
EventStateKeysTable: eventStateKeys,
EventJSONTable: eventJSON,
RoomsTable: rooms,
TransactionsTable: transactions,
StateBlockTable: stateBlock,
StateSnapshotTable: stateSnapshot,
PrevEventsTable: d.prevEvents,
PrevEventsTable: prevEvents,
RoomAliasesTable: roomAliases,
InvitesTable: d.invites,
MembershipTable: d.membership,
InvitesTable: invites,
MembershipTable: membership,
PublishedTable: published,
RedactionsTable: redactions,
GetLatestEventsForUpdateFn: d.GetLatestEventsForUpdate,
}
return &d, nil
return nil
}
func (d *Database) SupportsConcurrentRoomInputs() bool {

View file

@ -123,15 +123,16 @@ const (
type Membership interface {
InsertMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, localTarget bool) error
SelectMembershipForUpdate(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (MembershipState, error)
SelectMembershipFromRoomAndTarget(ctx context.Context, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (types.EventNID, MembershipState, error)
SelectMembershipFromRoomAndTarget(ctx context.Context, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (types.EventNID, MembershipState, bool, error)
SelectMembershipsFromRoom(ctx context.Context, roomNID types.RoomNID, localOnly bool) (eventNIDs []types.EventNID, err error)
SelectMembershipsFromRoomAndMembership(ctx context.Context, roomNID types.RoomNID, membership MembershipState, localOnly bool) (eventNIDs []types.EventNID, err error)
UpdateMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, senderUserNID types.EventStateKeyNID, membership MembershipState, eventNID types.EventNID) error
UpdateMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, senderUserNID types.EventStateKeyNID, membership MembershipState, eventNID types.EventNID, forgotten bool) error
SelectRoomsWithMembership(ctx context.Context, userID types.EventStateKeyNID, membershipState MembershipState) ([]types.RoomNID, error)
// SelectJoinedUsersSetForRooms returns the set of all users in the rooms who are joined to any of these rooms, along with the
// counts of how many rooms they are joined.
SelectJoinedUsersSetForRooms(ctx context.Context, roomNIDs []types.RoomNID) (map[types.EventStateKeyNID]int, error)
SelectKnownUsers(ctx context.Context, userID types.EventStateKeyNID, searchString string, limit int) ([]string, error)
UpdateForgetMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, forget bool) error
}
type Published interface {

View file

@ -149,7 +149,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}
if msg.RewritesState {
if err = s.db.PurgeRoom(ctx, ev.RoomID()); err != nil {
if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil {
return fmt.Errorf("s.db.PurgeRoom: %w", err)
}
}
@ -189,14 +189,20 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
) error {
ev := msg.Event
// TODO: The state key check when excluding from sync is designed
// to stop us from lying to clients with old state, whilst still
// allowing normal timeline events through. This is an absolute
// hack but until we have some better strategy for dealing with
// old events in the sync API, this should at least prevent us
// from confusing clients into thinking they've joined/left rooms.
pduPos, err := s.db.WriteEvent(
ctx,
&ev,
[]gomatrixserverlib.HeaderedEvent{},
[]string{}, // adds no state
[]string{}, // removes no state
nil, // no transaction
false, // not excluded from sync
[]string{}, // adds no state
[]string{}, // removes no state
nil, // no transaction
ev.StateKey() != nil, // exclude from sync?
)
if err != nil {
// panic rather than continue with an inconsistent database

View file

@ -59,6 +59,7 @@ const defaultMessagesLimit = 10
// OnIncomingMessagesRequest implements the /messages endpoint from the
// client-server API.
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
// nolint:gocyclo
func OnIncomingMessagesRequest(
req *http.Request, db storage.Database, roomID string, device *userapi.Device,
federation *gomatrixserverlib.FederationClient,
@ -67,6 +68,19 @@ func OnIncomingMessagesRequest(
) util.JSONResponse {
var err error
// check if the user has already forgotten about this room
isForgotten, err := checkIsRoomForgotten(req.Context(), roomID, device.UserID, rsAPI)
if err != nil {
return jsonerror.InternalServerError()
}
if isForgotten {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("user already forgot about this room"),
}
}
// Extract parameters from the request's URL.
// Pagination tokens.
var fromStream *types.StreamingToken
@ -182,6 +196,19 @@ func OnIncomingMessagesRequest(
}
}
func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.RoomserverInternalAPI) (bool, error) {
req := api.QueryMembershipForUserRequest{
RoomID: roomID,
UserID: userID,
}
resp := api.QueryMembershipForUserResponse{}
if err := rsAPI.QueryMembershipForUser(ctx, &req, &resp); err != nil {
return false, err
}
return resp.IsRoomForgotten, nil
}
// retrieveEvents retrieves events from the local database for a request on
// /messages. If there's not enough events to retrieve, it asks another
// homeserver in the room for older events.

View file

@ -45,9 +45,9 @@ type Database interface {
// Returns an error if there was a problem inserting this event.
WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []gomatrixserverlib.HeaderedEvent,
addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error)
// PurgeRoom completely purges room state from the sync API. This is done when
// PurgeRoomState completely purges room state from the sync API. This is done when
// receiving an output event that completely resets the state.
PurgeRoom(ctx context.Context, roomID string) error
PurgeRoomState(ctx context.Context, roomID string) error
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
// If no event could be found, returns nil
// If there was an issue during the retrieval, returns an error

View file

@ -280,7 +280,7 @@ func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, e
return nil
}
func (d *Database) PurgeRoom(
func (d *Database) PurgeRoomState(
ctx context.Context, roomID string,
) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
@ -290,15 +290,6 @@ func (d *Database) PurgeRoom(
if err := d.CurrentRoomState.DeleteRoomStateForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.CurrentRoomState.DeleteRoomStateForRoom: %w", err)
}
if err := d.OutputEvents.DeleteEventsForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.Events.DeleteEventsForRoom: %w", err)
}
if err := d.Topology.DeleteTopologyForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.Topology.DeleteTopologyForRoom: %w", err)
}
if err := d.BackwardExtremities.DeleteBackwardExtremitiesForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.BackwardExtremities.DeleteBackwardExtremitiesForRoom: %w", err)
}
return nil
})
}

View file

@ -492,3 +492,7 @@ Inbound federation rejects receipts from wrong remote
Should not be able to take over the room by pretending there is no PL event
Can get rooms/{roomId}/state for a departed room (SPEC-216)
Users cannot set notifications powerlevel higher than their own
Forgotten room messages cannot be paginated
Forgetting room does not show up in v2 /sync
Can forget room you've been kicked from
Can re-join room if re-invited