mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-10 23:53:09 -06:00
Merge branch 'main' into jb-add-client-well-known
This commit is contained in:
commit
dd5216d1b3
|
|
@ -129,7 +129,7 @@ app_service_api:
|
|||
|
||||
# Database configuration for this component.
|
||||
database:
|
||||
connection_string: postgresql://username@password:hostname/dendrite_appservice?sslmode=disable
|
||||
connection_string: postgresql://username:password@hostname/dendrite_appservice?sslmode=disable
|
||||
max_open_conns: 10
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
|
@ -203,7 +203,7 @@ federation_api:
|
|||
external_api:
|
||||
listen: http://[::]:8072
|
||||
database:
|
||||
connection_string: postgresql://username@password:hostname/dendrite_federationapi?sslmode=disable
|
||||
connection_string: postgresql://username:password@hostname/dendrite_federationapi?sslmode=disable
|
||||
max_open_conns: 10
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
|
@ -240,7 +240,7 @@ key_server:
|
|||
listen: http://[::]:7779 # The listen address for incoming API requests
|
||||
connect: http://key_server:7779 # The connect address for other components to use
|
||||
database:
|
||||
connection_string: postgresql://username@password:hostname/dendrite_keyserver?sslmode=disable
|
||||
connection_string: postgresql://username:password@hostname/dendrite_keyserver?sslmode=disable
|
||||
max_open_conns: 10
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
|
@ -253,7 +253,7 @@ media_api:
|
|||
external_api:
|
||||
listen: http://[::]:8074
|
||||
database:
|
||||
connection_string: postgresql://username@password:hostname/dendrite_mediaapi?sslmode=disable
|
||||
connection_string: postgresql://username:password@hostname/dendrite_mediaapi?sslmode=disable
|
||||
max_open_conns: 5
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
|
@ -290,7 +290,7 @@ mscs:
|
|||
# - msc2836 # (Threading, see https://github.com/matrix-org/matrix-doc/pull/2836)
|
||||
# - msc2946 # (Spaces Summary, see https://github.com/matrix-org/matrix-doc/pull/2946)
|
||||
database:
|
||||
connection_string: postgresql://username@password:hostname/dendrite_mscs?sslmode=disable
|
||||
connection_string: postgresql://username:password@hostname/dendrite_mscs?sslmode=disable
|
||||
max_open_conns: 5
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
|
@ -301,7 +301,7 @@ room_server:
|
|||
listen: http://[::]:7770 # The listen address for incoming API requests
|
||||
connect: http://room_server:7770 # The connect address for other components to use
|
||||
database:
|
||||
connection_string: postgresql://username@password:hostname/dendrite_roomserver?sslmode=disable
|
||||
connection_string: postgresql://username:password@hostname/dendrite_roomserver?sslmode=disable
|
||||
max_open_conns: 10
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
|
@ -314,7 +314,7 @@ sync_api:
|
|||
external_api:
|
||||
listen: http://[::]:8073
|
||||
database:
|
||||
connection_string: postgresql://username@password:hostname/dendrite_syncapi?sslmode=disable
|
||||
connection_string: postgresql://username:password@hostname/dendrite_syncapi?sslmode=disable
|
||||
max_open_conns: 10
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
|
@ -330,7 +330,7 @@ user_api:
|
|||
listen: http://[::]:7781 # The listen address for incoming API requests
|
||||
connect: http://user_api:7781 # The connect address for other components to use
|
||||
account_database:
|
||||
connection_string: postgresql://username@password:hostname/dendrite_userapi?sslmode=disable
|
||||
connection_string: postgresql://username:password@hostname/dendrite_userapi?sslmode=disable
|
||||
max_open_conns: 10
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ Unfortunately we can't accept contributions without it.
|
|||
|
||||
## Getting up and running
|
||||
|
||||
See the [Installation](INSTALL.md) section for information on how to build an
|
||||
See the [Installation](installation) section for information on how to build an
|
||||
instance of Dendrite. You will likely need this in order to test your changes.
|
||||
|
||||
## Code style
|
||||
|
|
|
|||
|
|
@ -86,9 +86,12 @@ would be a huge help too, as that will help us to understand where the memory us
|
|||
|
||||
You may need to revisit the connection limit of your PostgreSQL server and/or make changes to the `max_connections` lines in your Dendrite configuration. Be aware that each Dendrite component opens its own database connections and has its own connection limit, even in monolith mode!
|
||||
|
||||
## What is being reported when enabling anonymous stats?
|
||||
## What is being reported when enabling phone-home statistics?
|
||||
|
||||
If anonymous stats reporting is enabled, the following data is send to the defined endpoint.
|
||||
Phone-home statistics contain your server's domain name, some configuration information about
|
||||
your deployment and aggregated information about active users on your deployment. They are sent
|
||||
to the endpoint URL configured in your Dendrite configuration file only. The following is an
|
||||
example of the data that is sent:
|
||||
|
||||
```json
|
||||
{
|
||||
|
|
@ -106,7 +109,7 @@ If anonymous stats reporting is enabled, the following data is send to the defin
|
|||
"go_arch": "amd64",
|
||||
"go_os": "linux",
|
||||
"go_version": "go1.16.13",
|
||||
"homeserver": "localhost:8800",
|
||||
"homeserver": "my.domain.com",
|
||||
"log_level": "trace",
|
||||
"memory_rss": 93452,
|
||||
"monolith": true,
|
||||
|
|
|
|||
|
|
@ -233,6 +233,8 @@ GEM
|
|||
multipart-post (2.1.1)
|
||||
nokogiri (1.13.6-arm64-darwin)
|
||||
racc (~> 1.4)
|
||||
nokogiri (1.13.6-x86_64-linux)
|
||||
racc (~> 1.4)
|
||||
octokit (4.22.0)
|
||||
faraday (>= 0.9)
|
||||
sawyer (~> 0.8.0, >= 0.5.3)
|
||||
|
|
@ -263,7 +265,7 @@ GEM
|
|||
thread_safe (0.3.6)
|
||||
typhoeus (1.4.0)
|
||||
ethon (>= 0.9.0)
|
||||
tzinfo (1.2.9)
|
||||
tzinfo (1.2.10)
|
||||
thread_safe (~> 0.1)
|
||||
unf (0.1.4)
|
||||
unf_ext
|
||||
|
|
@ -273,11 +275,11 @@ GEM
|
|||
|
||||
PLATFORMS
|
||||
arm64-darwin-21
|
||||
x86_64-linux
|
||||
|
||||
DEPENDENCIES
|
||||
github-pages (~> 226)
|
||||
jekyll-feed (~> 0.15.1)
|
||||
minima (~> 2.5.1)
|
||||
|
||||
BUNDLED WITH
|
||||
2.3.7
|
||||
|
|
|
|||
|
|
@ -1,68 +0,0 @@
|
|||
{
|
||||
# debug
|
||||
admin off
|
||||
email example@example.com
|
||||
default_sni example.com
|
||||
# Debug endpoint
|
||||
# acme_ca https://acme-staging-v02.api.letsencrypt.org/directory
|
||||
}
|
||||
|
||||
#######################################################################
|
||||
# Snippets
|
||||
#______________________________________________________________________
|
||||
|
||||
(handle_errors_maintenance) {
|
||||
handle_errors {
|
||||
@maintenance expression {http.error.status_code} == 502
|
||||
rewrite @maintenance maintenance.html
|
||||
root * "/path/to/service/pages"
|
||||
file_server
|
||||
}
|
||||
}
|
||||
|
||||
(matrix-well-known-header) {
|
||||
# Headers
|
||||
header Access-Control-Allow-Origin "*"
|
||||
header Access-Control-Allow-Methods "GET, POST, PUT, DELETE, OPTIONS"
|
||||
header Access-Control-Allow-Headers "Origin, X-Requested-With, Content-Type, Accept, Authorization"
|
||||
header Content-Type "application/json"
|
||||
}
|
||||
|
||||
#######################################################################
|
||||
|
||||
example.com {
|
||||
|
||||
# ...
|
||||
|
||||
handle /.well-known/matrix/server {
|
||||
import matrix-well-known-header
|
||||
respond `{ "m.server": "matrix.example.com:443" }` 200
|
||||
}
|
||||
|
||||
handle /.well-known/matrix/client {
|
||||
import matrix-well-known-header
|
||||
respond `{ "m.homeserver": { "base_url": "https://matrix.example.com" } }` 200
|
||||
}
|
||||
|
||||
import handle_errors_maintenance
|
||||
}
|
||||
|
||||
example.com:8448 {
|
||||
# server<->server HTTPS traffic
|
||||
reverse_proxy http://dendrite-host:8008
|
||||
}
|
||||
|
||||
matrix.example.com {
|
||||
|
||||
handle /_matrix/* {
|
||||
# client<->server HTTPS traffic
|
||||
reverse_proxy http://dendrite-host:8008
|
||||
}
|
||||
|
||||
handle_path /* {
|
||||
# Client webapp (Element SPA or ...)
|
||||
file_server {
|
||||
root /path/to/www/example.com/matrix-web-client/
|
||||
}
|
||||
}
|
||||
}
|
||||
57
docs/caddy/monolith/Caddyfile
Normal file
57
docs/caddy/monolith/Caddyfile
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
# Sample Caddyfile for using Caddy in front of Dendrite.
|
||||
#
|
||||
# Customize email address and domain names.
|
||||
# Optional settings commented out.
|
||||
#
|
||||
# BE SURE YOUR DOMAINS ARE POINTED AT YOUR SERVER FIRST.
|
||||
# Documentation: https://caddyserver.com/docs/
|
||||
#
|
||||
# Bonus tip: If your IP address changes, use Caddy's
|
||||
# dynamic DNS plugin to update your DNS records to
|
||||
# point to your new IP automatically:
|
||||
# https://github.com/mholt/caddy-dynamicdns
|
||||
#
|
||||
|
||||
|
||||
# Global options block
|
||||
{
|
||||
# In case there is a problem with your certificates.
|
||||
# email example@example.com
|
||||
|
||||
# Turn off the admin endpoint if you don't need graceful config
|
||||
# changes and/or are running untrusted code on your machine.
|
||||
# admin off
|
||||
|
||||
# Enable this if your clients don't send ServerName in TLS handshakes.
|
||||
# default_sni example.com
|
||||
|
||||
# Enable debug mode for verbose logging.
|
||||
# debug
|
||||
|
||||
# Use Let's Encrypt's staging endpoint for testing.
|
||||
# acme_ca https://acme-staging-v02.api.letsencrypt.org/directory
|
||||
|
||||
# If you're port-forwarding HTTP/HTTPS ports from 80/443 to something
|
||||
# else, enable these and put the alternate port numbers here.
|
||||
# http_port 8080
|
||||
# https_port 8443
|
||||
}
|
||||
|
||||
# The server name of your matrix homeserver. This example shows
|
||||
# "well-known delegation" from the registered domain to a subdomain,
|
||||
# which is only needed if your server_name doesn't match your Matrix
|
||||
# homeserver URL (i.e. you can show users a vanity domain that looks
|
||||
# nice and is easy to remember but still have your Matrix server on
|
||||
# its own subdomain or hosted service).
|
||||
example.com {
|
||||
header /.well-known/matrix/* Content-Type application/json
|
||||
header /.well-known/matrix/* Access-Control-Allow-Origin *
|
||||
respond /.well-known/matrix/server `{"m.server": "matrix.example.com:443"}`
|
||||
respond /.well-known/matrix/client `{"m.homeserver": {"base_url": "https://matrix.example.com"}}`
|
||||
}
|
||||
|
||||
# The actual domain name whereby your Matrix server is accessed.
|
||||
matrix.example.com {
|
||||
# Set localhost:8008 to the address of your Dendrite server, if different
|
||||
reverse_proxy /_matrix/* localhost:8008
|
||||
}
|
||||
66
docs/caddy/polylith/Caddyfile
Normal file
66
docs/caddy/polylith/Caddyfile
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
# Sample Caddyfile for using Caddy in front of Dendrite.
|
||||
#
|
||||
# Customize email address and domain names.
|
||||
# Optional settings commented out.
|
||||
#
|
||||
# BE SURE YOUR DOMAINS ARE POINTED AT YOUR SERVER FIRST.
|
||||
# Documentation: https://caddyserver.com/docs/
|
||||
#
|
||||
# Bonus tip: If your IP address changes, use Caddy's
|
||||
# dynamic DNS plugin to update your DNS records to
|
||||
# point to your new IP automatically:
|
||||
# https://github.com/mholt/caddy-dynamicdns
|
||||
#
|
||||
|
||||
|
||||
# Global options block
|
||||
{
|
||||
# In case there is a problem with your certificates.
|
||||
# email example@example.com
|
||||
|
||||
# Turn off the admin endpoint if you don't need graceful config
|
||||
# changes and/or are running untrusted code on your machine.
|
||||
# admin off
|
||||
|
||||
# Enable this if your clients don't send ServerName in TLS handshakes.
|
||||
# default_sni example.com
|
||||
|
||||
# Enable debug mode for verbose logging.
|
||||
# debug
|
||||
|
||||
# Use Let's Encrypt's staging endpoint for testing.
|
||||
# acme_ca https://acme-staging-v02.api.letsencrypt.org/directory
|
||||
|
||||
# If you're port-forwarding HTTP/HTTPS ports from 80/443 to something
|
||||
# else, enable these and put the alternate port numbers here.
|
||||
# http_port 8080
|
||||
# https_port 8443
|
||||
}
|
||||
|
||||
# The server name of your matrix homeserver. This example shows
|
||||
# "well-known delegation" from the registered domain to a subdomain,
|
||||
# which is only needed if your server_name doesn't match your Matrix
|
||||
# homeserver URL (i.e. you can show users a vanity domain that looks
|
||||
# nice and is easy to remember but still have your Matrix server on
|
||||
# its own subdomain or hosted service).
|
||||
example.com {
|
||||
header /.well-known/matrix/* Content-Type application/json
|
||||
header /.well-known/matrix/* Access-Control-Allow-Origin *
|
||||
respond /.well-known/matrix/server `{"m.server": "matrix.example.com:443"}`
|
||||
respond /.well-known/matrix/client `{"m.homeserver": {"base_url": "https://matrix.example.com"}}`
|
||||
}
|
||||
|
||||
# The actual domain name whereby your Matrix server is accessed.
|
||||
matrix.example.com {
|
||||
# Change the end of each reverse_proxy line to the correct
|
||||
# address for your various services.
|
||||
@sync_api {
|
||||
path_regexp /_matrix/client/.*?/(sync|user/.*?/filter/?.*|keys/changes|rooms/.*?/messages)$
|
||||
}
|
||||
reverse_proxy @sync_api sync_api:8073
|
||||
|
||||
reverse_proxy /_matrix/client* client_api:8071
|
||||
reverse_proxy /_matrix/federation* federation_api:8071
|
||||
reverse_proxy /_matrix/key* federation_api:8071
|
||||
reverse_proxy /_matrix/media* media_api:8071
|
||||
}
|
||||
|
|
@ -2,7 +2,7 @@
|
|||
title: Starting the polylith
|
||||
parent: Installation
|
||||
has_toc: true
|
||||
nav_order: 9
|
||||
nav_order: 10
|
||||
permalink: /installation/start/polylith
|
||||
---
|
||||
|
||||
|
|
@ -2,7 +2,7 @@
|
|||
title: Optimise your installation
|
||||
parent: Installation
|
||||
has_toc: true
|
||||
nav_order: 10
|
||||
nav_order: 11
|
||||
permalink: /installation/start/optimisation
|
||||
---
|
||||
|
||||
|
|
@ -95,12 +95,13 @@ enabled.
|
|||
To do so, follow the [NATS Server installation instructions](https://docs.nats.io/running-a-nats-service/introduction/installation) and then [start your NATS deployment](https://docs.nats.io/running-a-nats-service/introduction/running). JetStream must be enabled, either by passing the `-js` flag to `nats-server`,
|
||||
or by specifying the `store_dir` option in the the `jetstream` configuration.
|
||||
|
||||
### Reverse proxy (polylith deployments)
|
||||
### Reverse proxy
|
||||
|
||||
Polylith deployments require a reverse proxy, such as [NGINX](https://www.nginx.com) or
|
||||
[HAProxy](http://www.haproxy.org). Configuring those is not covered in this documentation,
|
||||
although a [sample configuration for NGINX](https://github.com/matrix-org/dendrite/blob/main/docs/nginx/polylith-sample.conf)
|
||||
is provided.
|
||||
A reverse proxy such as [Caddy](https://caddyserver.com), [NGINX](https://www.nginx.com) or
|
||||
[HAProxy](http://www.haproxy.org) is required for polylith deployments and is useful for monolith
|
||||
deployments. Configuring those is not covered in this documentation, although sample configurations
|
||||
for [Caddy](https://github.com/matrix-org/dendrite/blob/main/docs/caddy) and
|
||||
[NGINX](https://github.com/matrix-org/dendrite/blob/main/docs/nginx) are provided.
|
||||
|
||||
### Windows
|
||||
|
||||
|
|
|
|||
|
|
@ -14,27 +14,38 @@ that take the format `@user:example.com`.
|
|||
For federation to work, the server name must be resolvable by other homeservers on the internet
|
||||
— that is, the domain must be registered and properly configured with the relevant DNS records.
|
||||
|
||||
Matrix servers discover each other when federating using the following methods:
|
||||
Matrix servers usually discover each other when federating using the following methods:
|
||||
|
||||
1. If a well-known delegation exists on `example.com`, use the path server from the
|
||||
1. If a well-known delegation exists on `example.com`, use the domain and port from the
|
||||
well-known file to connect to the remote homeserver;
|
||||
2. If a DNS SRV delegation exists on `example.com`, use the hostname and port from the DNS SRV
|
||||
2. If a DNS SRV delegation exists on `example.com`, use the IP address and port from the DNS SRV
|
||||
record to connect to the remote homeserver;
|
||||
3. If neither well-known or DNS SRV delegation are configured, attempt to connect to the remote
|
||||
homeserver by connecting to `example.com` port TCP/8448 using HTTPS.
|
||||
|
||||
The exact details of how server name resolution works can be found in
|
||||
[the spec](https://spec.matrix.org/v1.3/server-server-api/#resolving-server-names).
|
||||
|
||||
## TLS certificates
|
||||
|
||||
Matrix federation requires that valid TLS certificates are present on the domain. You must
|
||||
obtain certificates from a publicly accepted Certificate Authority (CA). [LetsEncrypt](https://letsencrypt.org)
|
||||
is an example of such a CA that can be used. Self-signed certificates are not suitable for
|
||||
federation and will typically not be accepted by other homeservers.
|
||||
obtain certificates from a publicly-trusted certificate authority (CA). [Let's Encrypt](https://letsencrypt.org)
|
||||
is a popular choice of CA because the certificates are publicly-trusted, free, and automated
|
||||
via the ACME protocol. (Self-signed certificates are not suitable for federation and will typically
|
||||
not be accepted by other homeservers.)
|
||||
|
||||
A common practice to help ease the management of certificates is to install a reverse proxy in
|
||||
front of Dendrite which manages the TLS certificates and HTTPS proxying itself. Software such as
|
||||
[NGINX](https://www.nginx.com) and [HAProxy](http://www.haproxy.org) can be used for the task.
|
||||
Although the finer details of configuring these are not described here, you must reverse proxy
|
||||
all `/_matrix` paths to your Dendrite server.
|
||||
Automating the renewal of TLS certificates is best practice. There are many tools for this,
|
||||
but the simplest way to achieve TLS automation is to have your reverse proxy do it for you.
|
||||
[Caddy](https://caddyserver.com) is recommended as a production-grade reverse proxy with
|
||||
automatic TLS which is commonly used in front of Dendrite. It obtains and renews TLS certificates
|
||||
automatically and by default as long as your domain name is pointed at your server first.
|
||||
Although the finer details of [configuring Caddy](https://caddyserver.com/docs/) is not described
|
||||
here, in general, you must reverse proxy all `/_matrix` paths to your Dendrite server. For example,
|
||||
with Caddy:
|
||||
|
||||
```
|
||||
reverse_proxy /_matrix/* localhost:8008
|
||||
```
|
||||
|
||||
It is possible for the reverse proxy to listen on the standard HTTPS port TCP/443 so long as your
|
||||
domain delegation is configured to point to port TCP/443.
|
||||
|
|
@ -51,17 +62,12 @@ you will be able to delegate from `example.com` to `matrix.example.com` so that
|
|||
|
||||
Delegation can be performed in one of two ways:
|
||||
|
||||
* **Well-known delegation**: A well-known text file is served over HTTPS on the domain name
|
||||
that you want to use, pointing to your server on `matrix.example.com` port 8448;
|
||||
* **DNS SRV delegation**: A DNS SRV record is created on the domain name that you want to
|
||||
use, pointing to your server on `matrix.example.com` port TCP/8448.
|
||||
* **Well-known delegation (preferred)**: A well-known text file is served over HTTPS on the domain
|
||||
name that you want to use, pointing to your server on `matrix.example.com` port 8448;
|
||||
* **DNS SRV delegation (not recommended)**: See the SRV delegation section below for details.
|
||||
|
||||
If you are using a reverse proxy to forward `/_matrix` to Dendrite, your well-known or DNS SRV
|
||||
delegation must refer to the hostname and port that the reverse proxy is listening on instead.
|
||||
|
||||
Well-known delegation is typically easier to set up and usually preferred. However, you can use
|
||||
either or both methods to delegate. If you configure both methods of delegation, it is important
|
||||
that they both agree and refer to the same hostname and port.
|
||||
If you are using a reverse proxy to forward `/_matrix` to Dendrite, your well-known or delegation
|
||||
must refer to the hostname and port that the reverse proxy is listening on instead.
|
||||
|
||||
## Well-known delegation
|
||||
|
||||
|
|
@ -74,20 +80,46 @@ and contain the following JSON document:
|
|||
|
||||
```json
|
||||
{
|
||||
"m.server": "https://matrix.example.com:8448"
|
||||
"m.server": "matrix.example.com:8448"
|
||||
}
|
||||
```
|
||||
|
||||
For example, this can be done with the following Caddy config:
|
||||
|
||||
```
|
||||
handle /.well-known/matrix/client {
|
||||
header Content-Type application/json
|
||||
header Access-Control-Allow-Origin *
|
||||
respond `{"m.homeserver": {"base_url": "https://matrix.example.com:8448"}}`
|
||||
}
|
||||
```
|
||||
|
||||
You can also serve `.well-known` with Dendrite itself by setting the `well_known_server_name` config
|
||||
option to the value you want for `m.server`. This is primarily useful if Dendrite is exposed on
|
||||
`example.com:443` and you don't want to set up a separate webserver just for serving the `.well-known`
|
||||
file.
|
||||
|
||||
```yaml
|
||||
global:
|
||||
...
|
||||
well_known_server_name: "example.com:443"
|
||||
```
|
||||
|
||||
## DNS SRV delegation
|
||||
|
||||
Using DNS SRV delegation requires creating DNS SRV records on the `example.com` zone which
|
||||
refer to your Dendrite installation.
|
||||
This method is not recommended, as the behavior of SRV records in Matrix is rather unintuitive:
|
||||
SRV records will only change the IP address and port that other servers connect to, they won't
|
||||
affect the domain name. In technical terms, the `Host` header and TLS SNI of federation requests
|
||||
will still be `example.com` even if the SRV record points at `matrix.example.com`.
|
||||
|
||||
Assuming that your Dendrite installation is listening for HTTPS connections at `matrix.example.com`
|
||||
port 8448, the DNS SRV record must have the following fields:
|
||||
In practice, this means that the server must be configured with valid TLS certificates for
|
||||
`example.com`, rather than `matrix.example.com` as one might intuitively expect. If there's a
|
||||
reverse proxy in between, the proxy configuration must be written as if it's `example.com`, as the
|
||||
proxy will never see the name `matrix.example.com` in incoming requests.
|
||||
|
||||
* Name: `@` (or whichever term your DNS provider uses to signal the root)
|
||||
* Service: `_matrix`
|
||||
* Protocol: `_tcp`
|
||||
* Port: `8448`
|
||||
* Target: `matrix.example.com`
|
||||
This behavior also means that if `example.com` and `matrix.example.com` point at the same IP
|
||||
address, there is no reason to have a SRV record pointing at `matrix.example.com`. It can still
|
||||
be used to change the port number, but it won't do anything else.
|
||||
|
||||
If you understand how SRV records work and still want to use them, the service name is `_matrix` and
|
||||
the protocol is `_tcp`.
|
||||
|
|
|
|||
38
docs/installation/3_build.md
Normal file
38
docs/installation/3_build.md
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
---
|
||||
title: Building Dendrite
|
||||
parent: Installation
|
||||
has_toc: true
|
||||
nav_order: 3
|
||||
permalink: /installation/build
|
||||
---
|
||||
|
||||
# Build all Dendrite commands
|
||||
|
||||
Dendrite has numerous utility commands in addition to the actual server binaries.
|
||||
Build them all from the root of the source repo with `build.sh` (Linux/Mac):
|
||||
|
||||
```sh
|
||||
./build.sh
|
||||
```
|
||||
|
||||
or `build.cmd` (Windows):
|
||||
|
||||
```powershell
|
||||
build.cmd
|
||||
```
|
||||
|
||||
The resulting binaries will be placed in the `bin` subfolder.
|
||||
|
||||
# Installing as a monolith
|
||||
|
||||
You can install the Dendrite monolith binary into `$GOPATH/bin` by using `go install`:
|
||||
|
||||
```sh
|
||||
go install ./cmd/dendrite-monolith-server
|
||||
```
|
||||
|
||||
Alternatively, you can specify a custom path for the binary to be written to using `go build`:
|
||||
|
||||
```sh
|
||||
go build -o /usr/local/bin/ ./cmd/dendrite-monolith-server
|
||||
```
|
||||
|
|
@ -17,7 +17,9 @@ filenames in the Dendrite configuration file and start Dendrite. The databases w
|
|||
and populated automatically.
|
||||
|
||||
Note that Dendrite **cannot share a single SQLite database across multiple components**. Each
|
||||
component must be configured with its own SQLite database filename.
|
||||
component must be configured with its own SQLite database filename. You will have to remove
|
||||
the `global.database` section from your Dendrite config and add it to each individual section
|
||||
instead in order to use SQLite.
|
||||
|
||||
### Connection strings
|
||||
|
||||
|
|
@ -29,5 +29,6 @@ Polylith deployments require a reverse proxy in order to ensure that requests ar
|
|||
sent to the correct endpoint. You must ensure that a suitable reverse proxy is installed
|
||||
and configured.
|
||||
|
||||
A [sample configuration file](https://github.com/matrix-org/dendrite/blob/main/docs/nginx/polylith-sample.conf)
|
||||
is provided for [NGINX](https://www.nginx.com).
|
||||
Sample configurations are provided
|
||||
for [Caddy](https://github.com/matrix-org/dendrite/blob/main/docs/caddy/polylith/Caddyfile)
|
||||
and [NGINX](https://github.com/matrix-org/dendrite/blob/main/docs/nginx/polylith-sample.conf).
|
||||
|
|
@ -1,13 +1,13 @@
|
|||
---
|
||||
title: Populate the configuration
|
||||
title: Configuring Dendrite
|
||||
parent: Installation
|
||||
nav_order: 7
|
||||
permalink: /installation/configuration
|
||||
---
|
||||
|
||||
# Populate the configuration
|
||||
# Configuring Dendrite
|
||||
|
||||
The configuration file is used to configure Dendrite. Sample configuration files are
|
||||
A YAML configuration file is used to configure Dendrite. Sample configuration files are
|
||||
present in the top level of the Dendrite repository:
|
||||
|
||||
* [`dendrite-sample.monolith.yaml`](https://github.com/matrix-org/dendrite/blob/main/dendrite-sample.monolith.yaml)
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
---
|
||||
title: Generating signing keys
|
||||
parent: Installation
|
||||
nav_order: 4
|
||||
nav_order: 8
|
||||
permalink: /installation/signingkeys
|
||||
---
|
||||
|
||||
|
|
@ -15,8 +15,9 @@ you can start your Dendrite monolith deployment by starting the `dendrite-monoli
|
|||
./dendrite-monolith-server -config /path/to/dendrite.yaml
|
||||
```
|
||||
|
||||
If you want to change the addresses or ports that Dendrite listens on, you
|
||||
can use the `-http-bind-address` and `-https-bind-address` command line arguments:
|
||||
By default, Dendrite will listen HTTP on port 8008. If you want to change the addresses
|
||||
or ports that Dendrite listens on, you can use the `-http-bind-address` and
|
||||
`-https-bind-address` command line arguments:
|
||||
|
||||
```bash
|
||||
./dendrite-monolith-server -config /path/to/dendrite.yaml \
|
||||
|
|
@ -26,7 +26,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// InviteV2 implements /_matrix/federation/v2/invite/{roomID}/{eventID}
|
||||
|
|
@ -144,7 +143,6 @@ func processInvite(
|
|||
// Check that the event is signed by the server sending the request.
|
||||
redacted, err := gomatrixserverlib.RedactEventJSON(event.JSON(), event.Version())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("XXX: invite.go")
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.BadJSON("The event JSON could not be redacted"),
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -25,7 +25,7 @@ require (
|
|||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220718085240-f08f98af7d2d
|
||||
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||
github.com/mattn/go-sqlite3 v1.14.13
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -341,8 +341,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
|
|||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46 h1:5X/kXY3nwqKOwwrE9tnMKrjbsi3PHigQYvrvDBSntO8=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220718085240-f08f98af7d2d h1:BWInUURXVOW+OiifMapoRIS7i122KWdEKj6fnDFXgBo=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220718085240-f08f98af7d2d/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a h1:DdG8vXMlZ65EAtc4V+3t7zHZ2Gqs24pSnyXS+4BRHUs=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
|
||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||
|
|
|
|||
18
internal/caching/cache_eventstatekeys.go
Normal file
18
internal/caching/cache_eventstatekeys.go
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
package caching
|
||||
|
||||
import "github.com/matrix-org/dendrite/roomserver/types"
|
||||
|
||||
// EventStateKeyCache contains the subset of functions needed for
|
||||
// a room event state key cache.
|
||||
type EventStateKeyCache interface {
|
||||
GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool)
|
||||
StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string)
|
||||
}
|
||||
|
||||
func (c Caches) GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool) {
|
||||
return c.RoomServerStateKeys.Get(eventStateKeyNID)
|
||||
}
|
||||
|
||||
func (c Caches) StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string) {
|
||||
c.RoomServerStateKeys.Set(eventStateKeyNID, eventStateKey)
|
||||
}
|
||||
|
|
@ -9,6 +9,7 @@ type RoomServerCaches interface {
|
|||
RoomVersionCache
|
||||
RoomInfoCache
|
||||
RoomServerEventsCache
|
||||
EventStateKeyCache
|
||||
}
|
||||
|
||||
// RoomServerNIDsCache contains the subset of functions needed for
|
||||
|
|
@ -19,9 +20,9 @@ type RoomServerNIDsCache interface {
|
|||
}
|
||||
|
||||
func (c Caches) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) {
|
||||
return c.RoomServerRoomIDs.Get(int64(roomNID))
|
||||
return c.RoomServerRoomIDs.Get(roomNID)
|
||||
}
|
||||
|
||||
func (c Caches) StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) {
|
||||
c.RoomServerRoomIDs.Set(int64(roomNID), roomID)
|
||||
c.RoomServerRoomIDs.Set(roomNID, roomID)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,16 +23,17 @@ import (
|
|||
// different implementations as long as they satisfy the Cache
|
||||
// interface.
|
||||
type Caches struct {
|
||||
RoomVersions Cache[string, gomatrixserverlib.RoomVersion] // room ID -> room version
|
||||
ServerKeys Cache[string, gomatrixserverlib.PublicKeyLookupResult] // server name -> server keys
|
||||
RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID
|
||||
RoomServerRoomIDs Cache[int64, string] // room NID -> room ID
|
||||
RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event
|
||||
RoomInfos Cache[string, *types.RoomInfo] // room ID -> room info
|
||||
FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU
|
||||
FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU
|
||||
SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response
|
||||
LazyLoading Cache[lazyLoadingCacheKey, string] // composite key -> event ID
|
||||
RoomVersions Cache[string, gomatrixserverlib.RoomVersion] // room ID -> room version
|
||||
ServerKeys Cache[string, gomatrixserverlib.PublicKeyLookupResult] // server name -> server keys
|
||||
RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID
|
||||
RoomServerRoomIDs Cache[types.RoomNID, string] // room NID -> room ID
|
||||
RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event
|
||||
RoomServerStateKeys Cache[types.EventStateKeyNID, string] // event NID -> event state key
|
||||
RoomInfos Cache[string, *types.RoomInfo] // room ID -> room info
|
||||
FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU
|
||||
FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU
|
||||
SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response
|
||||
LazyLoading Cache[lazyLoadingCacheKey, string] // composite key -> event ID
|
||||
}
|
||||
|
||||
// Cache is the interface that an implementation must satisfy.
|
||||
|
|
@ -44,7 +45,7 @@ type Cache[K keyable, T any] interface {
|
|||
|
||||
type keyable interface {
|
||||
// from https://github.com/dgraph-io/ristretto/blob/8e850b710d6df0383c375ec6a7beae4ce48fc8d5/z/z.go#L34
|
||||
uint64 | string | []byte | byte | int | int32 | uint32 | int64 | lazyLoadingCacheKey
|
||||
~uint64 | ~string | []byte | byte | ~int | ~int32 | ~uint32 | ~int64 | lazyLoadingCacheKey
|
||||
}
|
||||
|
||||
type costable interface {
|
||||
|
|
|
|||
|
|
@ -40,13 +40,14 @@ const (
|
|||
federationEDUsCache
|
||||
spaceSummaryRoomsCache
|
||||
lazyLoadingCache
|
||||
eventStateKeyCache
|
||||
)
|
||||
|
||||
func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enablePrometheus bool) *Caches {
|
||||
cache, err := ristretto.NewCache(&ristretto.Config{
|
||||
NumCounters: 1e5, // 10x number of expected cache items, affects bloom filter size, gives us room for 10,000 currently
|
||||
BufferItems: 64, // recommended by the ristretto godocs as a sane buffer size value
|
||||
MaxCost: int64(maxCost),
|
||||
NumCounters: int64((maxCost / 1024) * 10), // 10 counters per 1KB data, affects bloom filter size
|
||||
BufferItems: 64, // recommended by the ristretto godocs as a sane buffer size value
|
||||
MaxCost: int64(maxCost), // max cost is in bytes, as per the Dendrite config
|
||||
Metrics: true,
|
||||
KeyToHash: func(key interface{}) (uint64, uint64) {
|
||||
return z.KeyToHash(key)
|
||||
|
|
@ -88,7 +89,7 @@ func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enableProm
|
|||
Prefix: roomNIDsCache,
|
||||
MaxAge: maxAge,
|
||||
},
|
||||
RoomServerRoomIDs: &RistrettoCachePartition[int64, string]{ // room NID -> room ID
|
||||
RoomServerRoomIDs: &RistrettoCachePartition[types.RoomNID, string]{ // room NID -> room ID
|
||||
cache: cache,
|
||||
Prefix: roomIDsCache,
|
||||
MaxAge: maxAge,
|
||||
|
|
@ -100,6 +101,11 @@ func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enableProm
|
|||
MaxAge: maxAge,
|
||||
},
|
||||
},
|
||||
RoomServerStateKeys: &RistrettoCachePartition[types.EventStateKeyNID, string]{ // event NID -> event state key
|
||||
cache: cache,
|
||||
Prefix: eventStateKeyCache,
|
||||
MaxAge: maxAge,
|
||||
},
|
||||
RoomInfos: &RistrettoCachePartition[string, *types.RoomInfo]{ // room ID -> room info
|
||||
cache: cache,
|
||||
Prefix: roomInfosCache,
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/state"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
|
|
@ -21,14 +22,14 @@ import (
|
|||
// Move these to a more sensible place.
|
||||
|
||||
func UpdateToInviteMembership(
|
||||
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||
mu *shared.MembershipUpdater, add *types.Event, updates []api.OutputEvent,
|
||||
roomVersion gomatrixserverlib.RoomVersion,
|
||||
) ([]api.OutputEvent, error) {
|
||||
// We may have already sent the invite to the user, either because we are
|
||||
// reprocessing this event, or because the we received this invite from a
|
||||
// remote server via the federation invite API. In those cases we don't need
|
||||
// to send the event.
|
||||
needsSending, err := mu.SetToInvite(add)
|
||||
needsSending, retired, err := mu.Update(tables.MembershipStateInvite, add)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -38,13 +39,23 @@ func UpdateToInviteMembership(
|
|||
// room event stream. This ensures that the consumers only have to
|
||||
// consider a single stream of events when determining whether a user
|
||||
// is invited, rather than having to combine multiple streams themselves.
|
||||
onie := api.OutputNewInviteEvent{
|
||||
Event: add.Headered(roomVersion),
|
||||
RoomVersion: roomVersion,
|
||||
}
|
||||
updates = append(updates, api.OutputEvent{
|
||||
Type: api.OutputTypeNewInviteEvent,
|
||||
NewInviteEvent: &onie,
|
||||
Type: api.OutputTypeNewInviteEvent,
|
||||
NewInviteEvent: &api.OutputNewInviteEvent{
|
||||
Event: add.Headered(roomVersion),
|
||||
RoomVersion: roomVersion,
|
||||
},
|
||||
})
|
||||
}
|
||||
for _, eventID := range retired {
|
||||
updates = append(updates, api.OutputEvent{
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &api.OutputRetireInviteEvent{
|
||||
EventID: eventID,
|
||||
Membership: gomatrixserverlib.Join,
|
||||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return updates, nil
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
|
|
@ -60,20 +61,14 @@ func (r *Inputer) updateMemberships(
|
|||
var updates []api.OutputEvent
|
||||
|
||||
for _, change := range changes {
|
||||
var ae *gomatrixserverlib.Event
|
||||
var re *gomatrixserverlib.Event
|
||||
var ae *types.Event
|
||||
var re *types.Event
|
||||
targetUserNID := change.EventStateKeyNID
|
||||
if change.removedEventNID != 0 {
|
||||
ev, _ := helpers.EventMap(events).Lookup(change.removedEventNID)
|
||||
if ev != nil {
|
||||
re = ev.Event
|
||||
}
|
||||
re, _ = helpers.EventMap(events).Lookup(change.removedEventNID)
|
||||
}
|
||||
if change.addedEventNID != 0 {
|
||||
ev, _ := helpers.EventMap(events).Lookup(change.addedEventNID)
|
||||
if ev != nil {
|
||||
ae = ev.Event
|
||||
}
|
||||
ae, _ = helpers.EventMap(events).Lookup(change.addedEventNID)
|
||||
}
|
||||
if updates, err = r.updateMembership(updater, targetUserNID, re, ae, updates); err != nil {
|
||||
return nil, err
|
||||
|
|
@ -85,30 +80,27 @@ func (r *Inputer) updateMemberships(
|
|||
func (r *Inputer) updateMembership(
|
||||
updater *shared.RoomUpdater,
|
||||
targetUserNID types.EventStateKeyNID,
|
||||
remove, add *gomatrixserverlib.Event,
|
||||
remove, add *types.Event,
|
||||
updates []api.OutputEvent,
|
||||
) ([]api.OutputEvent, error) {
|
||||
var err error
|
||||
// Default the membership to Leave if no event was added or removed.
|
||||
oldMembership := gomatrixserverlib.Leave
|
||||
newMembership := gomatrixserverlib.Leave
|
||||
|
||||
if remove != nil {
|
||||
oldMembership, err = remove.Membership()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if add != nil {
|
||||
newMembership, err = add.Membership()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if oldMembership == newMembership && newMembership != gomatrixserverlib.Join {
|
||||
// If the membership is the same then nothing changed and we can return
|
||||
// immediately, unless it's a Join update (e.g. profile update).
|
||||
return updates, nil
|
||||
|
||||
var targetLocal bool
|
||||
if add != nil {
|
||||
targetLocal = r.isLocalTarget(add)
|
||||
}
|
||||
|
||||
mu, err := updater.MembershipUpdater(targetUserNID, targetLocal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// In an ideal world, we shouldn't ever have "add" be nil and "remove" be
|
||||
|
|
@ -120,17 +112,10 @@ func (r *Inputer) updateMembership(
|
|||
// after a state reset, often thinking that the user was still joined to
|
||||
// the room even though the room state said otherwise, and this would prevent
|
||||
// the user from being able to attempt to rejoin the room without modifying
|
||||
// the database. So instead what we'll do is we'll just update the membership
|
||||
// table to say that the user is "leave" and we'll use the old event to
|
||||
// avoid nil pointer exceptions on the code path that follows.
|
||||
if add == nil {
|
||||
add = remove
|
||||
newMembership = gomatrixserverlib.Leave
|
||||
}
|
||||
|
||||
mu, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// the database. So instead we're going to remove the membership from the
|
||||
// database altogether, so that it doesn't create future problems.
|
||||
if add == nil && remove != nil {
|
||||
return nil, mu.Delete()
|
||||
}
|
||||
|
||||
switch newMembership {
|
||||
|
|
@ -149,7 +134,7 @@ func (r *Inputer) updateMembership(
|
|||
}
|
||||
}
|
||||
|
||||
func (r *Inputer) isLocalTarget(event *gomatrixserverlib.Event) bool {
|
||||
func (r *Inputer) isLocalTarget(event *types.Event) bool {
|
||||
isTargetLocalUser := false
|
||||
if statekey := event.StateKey(); statekey != nil {
|
||||
_, domain, _ := gomatrixserverlib.SplitID('@', *statekey)
|
||||
|
|
@ -159,81 +144,61 @@ func (r *Inputer) isLocalTarget(event *gomatrixserverlib.Event) bool {
|
|||
}
|
||||
|
||||
func updateToJoinMembership(
|
||||
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||
mu *shared.MembershipUpdater, add *types.Event, updates []api.OutputEvent,
|
||||
) ([]api.OutputEvent, error) {
|
||||
// If the user is already marked as being joined, we call SetToJoin to update
|
||||
// the event ID then we can return immediately. Retired is ignored as there
|
||||
// is no invite event to retire.
|
||||
if mu.IsJoin() {
|
||||
_, err := mu.SetToJoin(add.Sender(), add.EventID(), true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
// When we mark a user as being joined we will invalidate any invites that
|
||||
// are active for that user. We notify the consumers that the invites have
|
||||
// been retired using a special event, even though they could infer this
|
||||
// by studying the state changes in the room event stream.
|
||||
retired, err := mu.SetToJoin(add.Sender(), add.EventID(), false)
|
||||
_, retired, err := mu.Update(tables.MembershipStateJoin, add)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, eventID := range retired {
|
||||
orie := api.OutputRetireInviteEvent{
|
||||
EventID: eventID,
|
||||
Membership: gomatrixserverlib.Join,
|
||||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
}
|
||||
updates = append(updates, api.OutputEvent{
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &orie,
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &api.OutputRetireInviteEvent{
|
||||
EventID: eventID,
|
||||
Membership: gomatrixserverlib.Join,
|
||||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
|
||||
func updateToLeaveMembership(
|
||||
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event,
|
||||
mu *shared.MembershipUpdater, add *types.Event,
|
||||
newMembership string, updates []api.OutputEvent,
|
||||
) ([]api.OutputEvent, error) {
|
||||
// If the user is already neither joined, nor invited to the room then we
|
||||
// can return immediately.
|
||||
if mu.IsLeave() {
|
||||
return updates, nil
|
||||
}
|
||||
// When we mark a user as having left we will invalidate any invites that
|
||||
// are active for that user. We notify the consumers that the invites have
|
||||
// been retired using a special event, even though they could infer this
|
||||
// by studying the state changes in the room event stream.
|
||||
retired, err := mu.SetToLeave(add.Sender(), add.EventID())
|
||||
_, retired, err := mu.Update(tables.MembershipStateLeaveOrBan, add)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, eventID := range retired {
|
||||
orie := api.OutputRetireInviteEvent{
|
||||
EventID: eventID,
|
||||
Membership: newMembership,
|
||||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
}
|
||||
updates = append(updates, api.OutputEvent{
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &orie,
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &api.OutputRetireInviteEvent{
|
||||
EventID: eventID,
|
||||
Membership: newMembership,
|
||||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
|
||||
func updateToKnockMembership(
|
||||
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||
mu *shared.MembershipUpdater, add *types.Event, updates []api.OutputEvent,
|
||||
) ([]api.OutputEvent, error) {
|
||||
if mu.IsLeave() {
|
||||
_, err := mu.SetToKnock(add)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, _, err := mu.Update(tables.MembershipStateKnock, add); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,11 +39,13 @@ type Inviter struct {
|
|||
Inputer *input.Inputer
|
||||
}
|
||||
|
||||
// nolint:gocyclo
|
||||
func (r *Inviter) PerformInvite(
|
||||
ctx context.Context,
|
||||
req *api.PerformInviteRequest,
|
||||
res *api.PerformInviteResponse,
|
||||
) ([]api.OutputEvent, error) {
|
||||
var outputUpdates []api.OutputEvent
|
||||
event := req.Event
|
||||
if event.StateKey() == nil {
|
||||
return nil, fmt.Errorf("invite must be a state event")
|
||||
|
|
@ -66,6 +68,13 @@ func (r *Inviter) PerformInvite(
|
|||
}
|
||||
isTargetLocal := domain == r.Cfg.Matrix.ServerName
|
||||
isOriginLocal := event.Origin() == r.Cfg.Matrix.ServerName
|
||||
if !isOriginLocal && !isTargetLocal {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: "The invite must be either from or to a local user",
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
logger := util.GetLogger(ctx).WithFields(map[string]interface{}{
|
||||
"inviter": event.Sender(),
|
||||
|
|
@ -97,6 +106,34 @@ func (r *Inviter) PerformInvite(
|
|||
}
|
||||
}
|
||||
|
||||
updateMembershipTableManually := func() ([]api.OutputEvent, error) {
|
||||
var updater *shared.MembershipUpdater
|
||||
if updater, err = r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion); err != nil {
|
||||
return nil, fmt.Errorf("r.DB.MembershipUpdater: %w", err)
|
||||
}
|
||||
outputUpdates, err = helpers.UpdateToInviteMembership(updater, &types.Event{
|
||||
EventNID: 0,
|
||||
Event: event.Unwrap(),
|
||||
}, outputUpdates, req.Event.RoomVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("updateToInviteMembership: %w", err)
|
||||
}
|
||||
if err = updater.Commit(); err != nil {
|
||||
return nil, fmt.Errorf("updater.Commit: %w", err)
|
||||
}
|
||||
logger.Debugf("updated membership to invite and sending invite OutputEvent")
|
||||
return outputUpdates, nil
|
||||
}
|
||||
|
||||
if (info == nil || info.IsStub) && !isOriginLocal && isTargetLocal {
|
||||
// The invite came in over federation for a room that we don't know about
|
||||
// yet. We need to handle this a bit differently to most invites because
|
||||
// we don't know the room state, therefore the roomserver can't process
|
||||
// an input event. Instead we will update the membership table with the
|
||||
// new invite and generate an output event.
|
||||
return updateMembershipTableManually()
|
||||
}
|
||||
|
||||
var isAlreadyJoined bool
|
||||
if info != nil {
|
||||
_, isAlreadyJoined, _, err = r.DB.GetMembership(ctx, info.RoomNID, *event.StateKey())
|
||||
|
|
@ -140,31 +177,13 @@ func (r *Inviter) PerformInvite(
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// If the invite originated remotely then we can't send an
|
||||
// InputRoomEvent for the invite as it will never pass auth checks
|
||||
// due to lacking room state, but we still need to tell the client
|
||||
// about the invite so we can accept it, hence we return an output
|
||||
// event to send to the Sync API.
|
||||
if !isOriginLocal {
|
||||
// The invite originated over federation. Process the membership
|
||||
// update, which will notify the sync API etc about the incoming
|
||||
// invite. We do NOT send an InputRoomEvent for the invite as it
|
||||
// will never pass auth checks due to lacking room state, but we
|
||||
// still need to tell the client about the invite so we can accept
|
||||
// it, hence we return an output event to send to the sync api.
|
||||
var updater *shared.MembershipUpdater
|
||||
updater, err = r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("r.DB.MembershipUpdater: %w", err)
|
||||
}
|
||||
|
||||
unwrapped := event.Unwrap()
|
||||
var outputUpdates []api.OutputEvent
|
||||
outputUpdates, err = helpers.UpdateToInviteMembership(updater, unwrapped, nil, req.Event.RoomVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("updateToInviteMembership: %w", err)
|
||||
}
|
||||
|
||||
if err = updater.Commit(); err != nil {
|
||||
return nil, fmt.Errorf("updater.Commit: %w", err)
|
||||
}
|
||||
logger.Debugf("updated membership to invite and sending invite OutputEvent")
|
||||
return outputUpdates, nil
|
||||
return updateMembershipTableManually()
|
||||
}
|
||||
|
||||
// The invite originated locally. Therefore we have a responsibility to
|
||||
|
|
@ -229,12 +248,11 @@ func (r *Inviter) PerformInvite(
|
|||
Code: api.PerformErrorNotAllowed,
|
||||
}
|
||||
logger.WithError(err).WithField("event_id", event.EventID()).Error("r.InputRoomEvents failed")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Don't notify the sync api of this event in the same way as a federated invite so the invitee
|
||||
// gets the invite, as the roomserver will do this when it processes the m.room.member invite.
|
||||
return nil, nil
|
||||
return outputUpdates, nil
|
||||
}
|
||||
|
||||
func buildInviteStrippedState(
|
||||
|
|
|
|||
|
|
@ -268,21 +268,19 @@ func (r *Joiner) performJoinRoomByID(
|
|||
case nil:
|
||||
// The room join is local. Send the new join event into the
|
||||
// roomserver. First of all check that the user isn't already
|
||||
// a member of the room.
|
||||
alreadyJoined := false
|
||||
for _, se := range buildRes.StateEvents {
|
||||
if !se.StateKeyEquals(userID) {
|
||||
continue
|
||||
}
|
||||
if membership, merr := se.Membership(); merr == nil {
|
||||
alreadyJoined = (membership == gomatrixserverlib.Join)
|
||||
break
|
||||
}
|
||||
// a member of the room. This is best-effort (as in we won't
|
||||
// fail if we can't find the existing membership) because there
|
||||
// is really no harm in just sending another membership event.
|
||||
membershipReq := &api.QueryMembershipForUserRequest{
|
||||
RoomID: req.RoomIDOrAlias,
|
||||
UserID: userID,
|
||||
}
|
||||
membershipRes := &api.QueryMembershipForUserResponse{}
|
||||
_ = r.Queryer.QueryMembershipForUser(ctx, membershipReq, membershipRes)
|
||||
|
||||
// If we haven't already joined the room then send an event
|
||||
// into the room changing our membership status.
|
||||
if !alreadyJoined {
|
||||
if !membershipRes.RoomExists || !membershipRes.IsInRoom {
|
||||
inputReq := rsAPI.InputRoomEventsRequest{
|
||||
InputRoomEvents: []rsAPI.InputRoomEvent{
|
||||
{
|
||||
|
|
|
|||
|
|
@ -228,14 +228,14 @@ func (r *Leaver) performFederatedRejectInvite(
|
|||
util.GetLogger(ctx).WithError(err).Errorf("failed to get MembershipUpdater, still retiring invite event")
|
||||
}
|
||||
if updater != nil {
|
||||
if _, err = updater.SetToLeave(req.UserID, eventID); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to set membership to leave, still retiring invite event")
|
||||
if err = updater.Delete(); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to delete membership, still retiring invite event")
|
||||
if err = updater.Rollback(); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to rollback membership leave, still retiring invite event")
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to rollback deleting membership, still retiring invite event")
|
||||
}
|
||||
} else {
|
||||
if err = updater.Commit(); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to commit membership update, still retiring invite event")
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to commit deleting membership, still retiring invite event")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package query
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
|
@ -225,6 +226,9 @@ func (r *Queryer) QueryMembershipsForRoom(
|
|||
var eventNIDs []types.EventNID
|
||||
eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, request.JoinedOnly, request.LocalOnly)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("r.DB.GetMembershipEventNIDsForRoom: %w", err)
|
||||
}
|
||||
events, err = r.DB.Events(ctx, eventNIDs)
|
||||
|
|
@ -260,6 +264,9 @@ func (r *Queryer) QueryMembershipsForRoom(
|
|||
var eventNIDs []types.EventNID
|
||||
eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, request.JoinedOnly, false)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -86,24 +86,24 @@ const insertMembershipSQL = "" +
|
|||
|
||||
const selectMembershipFromRoomAndTargetSQL = "" +
|
||||
"SELECT membership_nid, event_nid, forgotten FROM roomserver_membership" +
|
||||
" WHERE room_nid = $1 AND target_nid = $2"
|
||||
" WHERE room_nid = $1 AND event_nid != 0 AND target_nid = $2"
|
||||
|
||||
const selectMembershipsFromRoomAndMembershipSQL = "" +
|
||||
"SELECT event_nid FROM roomserver_membership" +
|
||||
" WHERE room_nid = $1 AND membership_nid = $2 and forgotten = false"
|
||||
" WHERE room_nid = $1 AND event_nid != 0 AND membership_nid = $2 and forgotten = false"
|
||||
|
||||
const selectLocalMembershipsFromRoomAndMembershipSQL = "" +
|
||||
"SELECT event_nid FROM roomserver_membership" +
|
||||
" WHERE room_nid = $1 AND membership_nid = $2" +
|
||||
" WHERE room_nid = $1 AND event_nid != 0 AND membership_nid = $2" +
|
||||
" AND target_local = true and forgotten = false"
|
||||
|
||||
const selectMembershipsFromRoomSQL = "" +
|
||||
"SELECT event_nid FROM roomserver_membership" +
|
||||
" WHERE room_nid = $1 and forgotten = false"
|
||||
" WHERE room_nid = $1 AND event_nid != 0 and forgotten = false"
|
||||
|
||||
const selectLocalMembershipsFromRoomSQL = "" +
|
||||
"SELECT event_nid FROM roomserver_membership" +
|
||||
" WHERE room_nid = $1" +
|
||||
" WHERE room_nid = $1 AND event_nid != 0" +
|
||||
" AND target_local = true and forgotten = false"
|
||||
|
||||
const selectMembershipForUpdateSQL = "" +
|
||||
|
|
@ -118,6 +118,9 @@ const updateMembershipForgetRoom = "" +
|
|||
"UPDATE roomserver_membership SET forgotten = $3" +
|
||||
" WHERE room_nid = $1 AND target_nid = $2"
|
||||
|
||||
const deleteMembershipSQL = "" +
|
||||
"DELETE FROM roomserver_membership WHERE room_nid = $1 AND target_nid = $2"
|
||||
|
||||
const selectRoomsWithMembershipSQL = "" +
|
||||
"SELECT room_nid FROM roomserver_membership WHERE membership_nid = $1 AND target_nid = $2 and forgotten = false"
|
||||
|
||||
|
|
@ -165,6 +168,7 @@ type membershipStatements struct {
|
|||
updateMembershipForgetRoomStmt *sql.Stmt
|
||||
selectLocalServerInRoomStmt *sql.Stmt
|
||||
selectServerInRoomStmt *sql.Stmt
|
||||
deleteMembershipStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateMembershipTable(db *sql.DB) error {
|
||||
|
|
@ -191,6 +195,7 @@ func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) {
|
|||
{&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom},
|
||||
{&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL},
|
||||
{&s.selectServerInRoomStmt, selectServerInRoomSQL},
|
||||
{&s.deleteMembershipStmt, deleteMembershipSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
|
@ -412,3 +417,13 @@ func (s *membershipStatements) SelectServerInRoom(
|
|||
}
|
||||
return roomNID == nid, nil
|
||||
}
|
||||
|
||||
func (s *membershipStatements) DeleteMembership(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.deleteMembershipStmt).ExecContext(
|
||||
ctx, roomNID, targetUserNID,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ type MembershipUpdater struct {
|
|||
d *Database
|
||||
roomNID types.RoomNID
|
||||
targetUserNID types.EventStateKeyNID
|
||||
membership tables.MembershipState
|
||||
oldMembership tables.MembershipState
|
||||
}
|
||||
|
||||
func NewMembershipUpdater(
|
||||
|
|
@ -30,7 +30,6 @@ func NewMembershipUpdater(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
targetUserNID, err = d.assignStateKeyNID(ctx, targetUserID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -73,139 +72,62 @@ func (d *Database) membershipUpdaterTxn(
|
|||
|
||||
// IsInvite implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) IsInvite() bool {
|
||||
return u.membership == tables.MembershipStateInvite
|
||||
return u.oldMembership == tables.MembershipStateInvite
|
||||
}
|
||||
|
||||
// IsJoin implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) IsJoin() bool {
|
||||
return u.membership == tables.MembershipStateJoin
|
||||
return u.oldMembership == tables.MembershipStateJoin
|
||||
}
|
||||
|
||||
// IsLeave implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) IsLeave() bool {
|
||||
return u.membership == tables.MembershipStateLeaveOrBan
|
||||
return u.oldMembership == tables.MembershipStateLeaveOrBan
|
||||
}
|
||||
|
||||
// IsKnock implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) IsKnock() bool {
|
||||
return u.membership == tables.MembershipStateKnock
|
||||
return u.oldMembership == tables.MembershipStateKnock
|
||||
}
|
||||
|
||||
// SetToInvite implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) SetToInvite(event *gomatrixserverlib.Event) (bool, error) {
|
||||
var inserted bool
|
||||
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
func (u *MembershipUpdater) Delete() error {
|
||||
if _, err := u.d.InvitesTable.UpdateInviteRetired(u.ctx, u.txn, u.roomNID, u.targetUserNID); err != nil {
|
||||
return err
|
||||
}
|
||||
return u.d.MembershipTable.DeleteMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID)
|
||||
}
|
||||
|
||||
func (u *MembershipUpdater) Update(newMembership tables.MembershipState, event *types.Event) (bool, []string, error) {
|
||||
var inserted bool // Did the query result in a membership change?
|
||||
var retired []string // Did we retire any updates in the process?
|
||||
return inserted, retired, u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, event.Sender())
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
|
||||
}
|
||||
inserted, err = u.d.InvitesTable.InsertInviteEvent(
|
||||
u.ctx, u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(),
|
||||
)
|
||||
inserted, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, newMembership, event.EventNID, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.InvitesTable.InsertInviteEvent: %w", err)
|
||||
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
|
||||
}
|
||||
if u.membership != tables.MembershipStateInvite {
|
||||
if inserted, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateInvite, 0, false); err != nil {
|
||||
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
|
||||
if !inserted {
|
||||
return nil
|
||||
}
|
||||
switch {
|
||||
case u.oldMembership != tables.MembershipStateInvite && newMembership == tables.MembershipStateInvite:
|
||||
inserted, err = u.d.InvitesTable.InsertInviteEvent(
|
||||
u.ctx, u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.InvitesTable.InsertInviteEvent: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return inserted, err
|
||||
}
|
||||
|
||||
// SetToJoin implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) SetToJoin(senderUserID string, eventID string, isUpdate bool) ([]string, error) {
|
||||
var inviteEventIDs []string
|
||||
|
||||
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, senderUserID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
|
||||
}
|
||||
|
||||
// If this is a join event update, there is no invite to update
|
||||
if !isUpdate {
|
||||
inviteEventIDs, err = u.d.InvitesTable.UpdateInviteRetired(
|
||||
case u.oldMembership == tables.MembershipStateInvite && newMembership != tables.MembershipStateInvite:
|
||||
retired, err = u.d.InvitesTable.UpdateInviteRetired(
|
||||
u.ctx, u.txn, u.roomNID, u.targetUserNID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.InvitesTables.UpdateInviteRetired: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Look up the NID of the new join event
|
||||
nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{eventID}, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.EventNIDs: %w", err)
|
||||
}
|
||||
|
||||
if u.membership != tables.MembershipStateJoin || isUpdate {
|
||||
if _, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateJoin, nIDs[eventID], false); err != nil {
|
||||
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return inviteEventIDs, err
|
||||
}
|
||||
|
||||
// SetToLeave implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) SetToLeave(senderUserID string, eventID string) ([]string, error) {
|
||||
var inviteEventIDs []string
|
||||
|
||||
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, senderUserID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
|
||||
}
|
||||
inviteEventIDs, err = u.d.InvitesTable.UpdateInviteRetired(
|
||||
u.ctx, u.txn, u.roomNID, u.targetUserNID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.InvitesTable.updateInviteRetired: %w", err)
|
||||
}
|
||||
|
||||
// Look up the NID of the new leave event
|
||||
nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{eventID}, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.EventNIDs: %w", err)
|
||||
}
|
||||
|
||||
if u.membership != tables.MembershipStateLeaveOrBan {
|
||||
if _, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateLeaveOrBan, nIDs[eventID], false); err != nil {
|
||||
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
return inviteEventIDs, err
|
||||
}
|
||||
|
||||
// SetToKnock implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) SetToKnock(event *gomatrixserverlib.Event) (bool, error) {
|
||||
var inserted bool
|
||||
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, event.Sender())
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
|
||||
}
|
||||
if u.membership != tables.MembershipStateKnock {
|
||||
// Look up the NID of the new knock event
|
||||
nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{event.EventID()}, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.EventNIDs: %w", err)
|
||||
}
|
||||
|
||||
if inserted, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateKnock, nIDs[event.EventID()], false); err != nil {
|
||||
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return inserted, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -72,7 +72,24 @@ func (d *Database) eventTypeNIDs(
|
|||
func (d *Database) EventStateKeys(
|
||||
ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID,
|
||||
) (map[types.EventStateKeyNID]string, error) {
|
||||
return d.EventStateKeysTable.BulkSelectEventStateKey(ctx, nil, eventStateKeyNIDs)
|
||||
result := make(map[types.EventStateKeyNID]string, len(eventStateKeyNIDs))
|
||||
fetch := make([]types.EventStateKeyNID, 0, len(eventStateKeyNIDs))
|
||||
for _, nid := range eventStateKeyNIDs {
|
||||
if key, ok := d.Cache.GetEventStateKey(nid); ok {
|
||||
result[nid] = key
|
||||
} else {
|
||||
fetch = append(fetch, nid)
|
||||
}
|
||||
}
|
||||
fromDB, err := d.EventStateKeysTable.BulkSelectEventStateKey(ctx, nil, fetch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for nid, key := range fromDB {
|
||||
result[nid] = key
|
||||
d.Cache.StoreEventStateKey(nid, key)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (d *Database) EventStateKeyNIDs(
|
||||
|
|
|
|||
|
|
@ -62,24 +62,24 @@ const insertMembershipSQL = "" +
|
|||
|
||||
const selectMembershipFromRoomAndTargetSQL = "" +
|
||||
"SELECT membership_nid, event_nid, forgotten FROM roomserver_membership" +
|
||||
" WHERE room_nid = $1 AND target_nid = $2"
|
||||
" WHERE room_nid = $1 AND event_nid != 0 AND target_nid = $2"
|
||||
|
||||
const selectMembershipsFromRoomAndMembershipSQL = "" +
|
||||
"SELECT event_nid FROM roomserver_membership" +
|
||||
" WHERE room_nid = $1 AND membership_nid = $2 and forgotten = false"
|
||||
" WHERE room_nid = $1 AND event_nid != 0 AND membership_nid = $2 and forgotten = false"
|
||||
|
||||
const selectLocalMembershipsFromRoomAndMembershipSQL = "" +
|
||||
"SELECT event_nid FROM roomserver_membership" +
|
||||
" WHERE room_nid = $1 AND membership_nid = $2" +
|
||||
" WHERE room_nid = $1 AND event_nid != 0 AND membership_nid = $2" +
|
||||
" AND target_local = true and forgotten = false"
|
||||
|
||||
const selectMembershipsFromRoomSQL = "" +
|
||||
"SELECT event_nid FROM roomserver_membership" +
|
||||
" WHERE room_nid = $1 and forgotten = false"
|
||||
" WHERE room_nid = $1 AND event_nid != 0 and forgotten = false"
|
||||
|
||||
const selectLocalMembershipsFromRoomSQL = "" +
|
||||
"SELECT event_nid FROM roomserver_membership" +
|
||||
" WHERE room_nid = $1" +
|
||||
" WHERE room_nid = $1 AND event_nid != 0" +
|
||||
" AND target_local = true and forgotten = false"
|
||||
|
||||
const selectMembershipForUpdateSQL = "" +
|
||||
|
|
@ -125,6 +125,9 @@ const selectServerInRoomSQL = "" +
|
|||
" JOIN roomserver_event_state_keys ON roomserver_membership.target_nid = roomserver_event_state_keys.event_state_key_nid" +
|
||||
" WHERE membership_nid = $1 AND room_nid = $2 AND event_state_key LIKE '%:' || $3 LIMIT 1"
|
||||
|
||||
const deleteMembershipSQL = "" +
|
||||
"DELETE FROM roomserver_membership WHERE room_nid = $1 AND target_nid = $2"
|
||||
|
||||
type membershipStatements struct {
|
||||
db *sql.DB
|
||||
insertMembershipStmt *sql.Stmt
|
||||
|
|
@ -140,6 +143,7 @@ type membershipStatements struct {
|
|||
updateMembershipForgetRoomStmt *sql.Stmt
|
||||
selectLocalServerInRoomStmt *sql.Stmt
|
||||
selectServerInRoomStmt *sql.Stmt
|
||||
deleteMembershipStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateMembershipTable(db *sql.DB) error {
|
||||
|
|
@ -166,6 +170,7 @@ func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) {
|
|||
{&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom},
|
||||
{&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL},
|
||||
{&s.selectServerInRoomStmt, selectServerInRoomSQL},
|
||||
{&s.deleteMembershipStmt, deleteMembershipSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
|
@ -383,3 +388,13 @@ func (s *membershipStatements) SelectServerInRoom(ctx context.Context, txn *sql.
|
|||
}
|
||||
return roomNID == nid, nil
|
||||
}
|
||||
|
||||
func (s *membershipStatements) DeleteMembership(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.deleteMembershipStmt).ExecContext(
|
||||
ctx, roomNID, targetUserNID,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -133,6 +133,7 @@ type Membership interface {
|
|||
UpdateForgetMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, forget bool) error
|
||||
SelectLocalServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) (bool, error)
|
||||
SelectServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, serverName gomatrixserverlib.ServerName) (bool, error)
|
||||
DeleteMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) error
|
||||
}
|
||||
|
||||
type Published interface {
|
||||
|
|
|
|||
|
|
@ -60,6 +60,9 @@ func TestMembershipTable(t *testing.T) {
|
|||
// This inserts a left user to the room
|
||||
err = tab.InsertMembership(ctx, nil, 1, stateKeyNID, true)
|
||||
assert.NoError(t, err)
|
||||
// We must update the membership with a non-zero event NID or it will get filtered out in later queries
|
||||
_, err = tab.UpdateMembership(ctx, nil, 1, stateKeyNID, userNIDs[0], tables.MembershipStateLeaveOrBan, 1, false)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
// ... so this should be false
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ type Global struct {
|
|||
// ServerNotices configuration used for sending server notices
|
||||
ServerNotices ServerNotices `yaml:"server_notices"`
|
||||
|
||||
// ReportStats configures opt-in anonymous stats reporting.
|
||||
// ReportStats configures opt-in phone-home statistics reporting.
|
||||
ReportStats ReportStats `yaml:"report_stats"`
|
||||
|
||||
// Configuration for the caches.
|
||||
|
|
@ -192,9 +192,9 @@ func (c *Cache) Verify(errors *ConfigErrors, isMonolith bool) {
|
|||
checkPositive(errors, "max_size_estimated", int64(c.EstimatedMaxSize))
|
||||
}
|
||||
|
||||
// ReportStats configures opt-in anonymous stats reporting.
|
||||
// ReportStats configures opt-in phone-home statistics reporting.
|
||||
type ReportStats struct {
|
||||
// Enabled configures anonymous usage stats of the server
|
||||
// Enabled configures phone-home statistics of the server
|
||||
Enabled bool `yaml:"enabled"`
|
||||
|
||||
// Endpoint the endpoint to report stats to
|
||||
|
|
|
|||
|
|
@ -240,6 +240,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
|||
msg.RemovesStateEventIDs,
|
||||
msg.TransactionID,
|
||||
false,
|
||||
msg.HistoryVisibility,
|
||||
)
|
||||
if err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
|
|
@ -289,7 +290,8 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
|
|||
[]string{}, // adds no state
|
||||
[]string{}, // removes no state
|
||||
nil, // no transaction
|
||||
ev.StateKey() != nil, // exclude from sync?
|
||||
ev.StateKey() != nil, // exclude from sync?,
|
||||
msg.HistoryVisibility,
|
||||
)
|
||||
if err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
|
|
@ -363,7 +365,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
|||
"event": string(msg.Event.JSON()),
|
||||
"pdupos": pduPos,
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("roomserver output log: write invite failure")
|
||||
}).Errorf("roomserver output log: write invite failure")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -383,7 +385,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
|
|||
log.WithFields(log.Fields{
|
||||
"event_id": msg.EventID,
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("roomserver output log: remove invite failure")
|
||||
}).Errorf("roomserver output log: remove invite failure")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -401,7 +403,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
|
|||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("roomserver output log: write peek failure")
|
||||
}).Errorf("roomserver output log: write peek failure")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -420,7 +422,7 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
|
|||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("roomserver output log: write peek failure")
|
||||
}).Errorf("roomserver output log: write peek failure")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -594,6 +594,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
|
|||
[]string{},
|
||||
[]string{},
|
||||
nil, true,
|
||||
gomatrixserverlib.HistoryVisibilityShared,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -69,7 +69,9 @@ type Database interface {
|
|||
// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
|
||||
// Returns an error if there was a problem inserting this event.
|
||||
WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []*gomatrixserverlib.HeaderedEvent,
|
||||
addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error)
|
||||
addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool,
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (types.StreamPosition, error)
|
||||
// PurgeRoomState completely purges room state from the sync API. This is done when
|
||||
// receiving an output event that completely resets the state.
|
||||
PurgeRoomState(ctx context.Context, roomID string) error
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
|
|||
-- The serial ID of the output_room_events table when this event became
|
||||
-- part of the current state of the room.
|
||||
added_at BIGINT,
|
||||
history_visibility SMALLINT NOT NULL DEFAULT 2,
|
||||
-- Clobber based on 3-uple of room_id, type and state_key
|
||||
CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
|
|
@ -63,8 +64,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_current_room_state_eventid_idx ON sync
|
|||
`
|
||||
|
||||
const upsertRoomStateSQL = "" +
|
||||
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" +
|
||||
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at, history_visibility)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" +
|
||||
" ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" +
|
||||
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9"
|
||||
|
||||
|
|
@ -100,11 +101,11 @@ const selectStateEventSQL = "" +
|
|||
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
// TODO: The session_id and transaction_id blanks are here because otherwise
|
||||
// the rowsToStreamEvents expects there to be exactly six columns. We need to
|
||||
// TODO: The session_id and transaction_id blanks are here because
|
||||
// the rowsToStreamEvents expects there to be exactly seven columns. We need to
|
||||
// figure out if these really need to be in the DB, and if so, we need a
|
||||
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
|
||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" +
|
||||
" FROM syncapi_current_room_state WHERE event_id = ANY($1)"
|
||||
|
||||
const selectSharedUsersSQL = "" +
|
||||
|
|
@ -336,6 +337,7 @@ func (s *currentRoomStateStatements) UpsertRoomState(
|
|||
headeredJSON,
|
||||
membership,
|
||||
addedAt,
|
||||
event.Visibility,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,50 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package deltas
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
func LoadAddHistoryVisibilityColumn(m *sqlutil.Migrations) {
|
||||
m.AddMigration(UpAddHistoryVisibilityColumn, DownAddHistoryVisibilityColumn)
|
||||
}
|
||||
|
||||
func UpAddHistoryVisibilityColumn(tx *sql.Tx) error {
|
||||
_, err := tx.Exec(`
|
||||
ALTER TABLE syncapi_output_room_events ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2;
|
||||
UPDATE syncapi_output_room_events SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
|
||||
ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2;
|
||||
UPDATE syncapi_current_room_state SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func DownAddHistoryVisibilityColumn(tx *sql.Tx) error {
|
||||
_, err := tx.Exec(`
|
||||
ALTER TABLE syncapi_output_room_events DROP COLUMN IF EXISTS history_visibility;
|
||||
ALTER TABLE syncapi_current_room_state DROP COLUMN IF EXISTS history_visibility;
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute downgrade: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -67,7 +67,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
|
|||
-- events retrieved through backfilling that have a position in the stream
|
||||
-- that relates to the moment these were retrieved rather than the moment these
|
||||
-- were emitted.
|
||||
exclude_from_sync BOOL DEFAULT FALSE
|
||||
exclude_from_sync BOOL DEFAULT FALSE,
|
||||
-- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined)
|
||||
history_visibility SMALLINT NOT NULL DEFAULT 2
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type);
|
||||
|
|
@ -78,16 +80,16 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s
|
|||
|
||||
const insertEventSQL = "" +
|
||||
"INSERT INTO syncapi_output_room_events (" +
|
||||
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
|
||||
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " +
|
||||
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
|
||||
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
|
||||
"ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
|
||||
"RETURNING id"
|
||||
|
||||
const selectEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
||||
|
||||
const selectEventsWithFilterSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)" +
|
||||
" AND ( $2::text[] IS NULL OR sender = ANY($2) )" +
|
||||
" AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" +
|
||||
" AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
|
||||
|
|
@ -96,7 +98,7 @@ const selectEventsWithFilterSQL = "" +
|
|||
" LIMIT $7"
|
||||
|
||||
const selectRecentEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
|
|
@ -105,7 +107,7 @@ const selectRecentEventsSQL = "" +
|
|||
" ORDER BY id DESC LIMIT $8"
|
||||
|
||||
const selectRecentEventsForSyncSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
|
|
@ -114,7 +116,7 @@ const selectRecentEventsForSyncSQL = "" +
|
|||
" ORDER BY id DESC LIMIT $8"
|
||||
|
||||
const selectEarlyEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
|
|
@ -130,7 +132,7 @@ const updateEventJSONSQL = "" +
|
|||
|
||||
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
||||
const selectStateInRangeSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
||||
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
|
||||
" FROM syncapi_output_room_events" +
|
||||
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||
" AND room_id = ANY($3)" +
|
||||
|
|
@ -146,10 +148,10 @@ const deleteEventsForRoomSQL = "" +
|
|||
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
||||
|
||||
const selectContextEventSQL = "" +
|
||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
||||
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
||||
|
||||
const selectContextBeforeEventSQL = "" +
|
||||
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
|
||||
"SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||
|
|
@ -157,7 +159,7 @@ const selectContextBeforeEventSQL = "" +
|
|||
" ORDER BY id DESC LIMIT $3"
|
||||
|
||||
const selectContextAfterEventSQL = "" +
|
||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
|
||||
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||
|
|
@ -246,14 +248,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
addIDs pq.StringArray
|
||||
delIDs pq.StringArray
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
addIDs pq.StringArray
|
||||
delIDs pq.StringArray
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs, &historyVisibility); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// Sanity check for deleted state and whine if we see it. We don't need to do anything
|
||||
|
|
@ -283,6 +286,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
needSet[id] = true
|
||||
}
|
||||
stateNeeded[ev.RoomID()] = needSet
|
||||
ev.Visibility = historyVisibility
|
||||
|
||||
eventIDToEvent[eventID] = types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
|
|
@ -314,7 +318,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
|
|||
func (s *outputRoomEventsStatements) InsertEvent(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
|
||||
transactionID *api.TransactionID, excludeFromSync bool,
|
||||
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (streamPos types.StreamPosition, err error) {
|
||||
var txnID *string
|
||||
var sessionID *int64
|
||||
|
|
@ -351,6 +355,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
|||
sessionID,
|
||||
txnID,
|
||||
excludeFromSync,
|
||||
historyVisibility,
|
||||
).Scan(&streamPos)
|
||||
return
|
||||
}
|
||||
|
|
@ -504,13 +509,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn
|
|||
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
|
||||
|
||||
var eventAsString string
|
||||
if err = row.Scan(&id, &eventAsString); err != nil {
|
||||
var historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil {
|
||||
return 0, evt, err
|
||||
}
|
||||
|
||||
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
|
||||
return 0, evt, err
|
||||
}
|
||||
evt.Visibility = historyVisibility
|
||||
return id, evt, nil
|
||||
}
|
||||
|
||||
|
|
@ -532,15 +539,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
|||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&eventBytes); err != nil {
|
||||
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
|
||||
return evts, err
|
||||
}
|
||||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||
return evts, err
|
||||
}
|
||||
evt.Visibility = historyVisibility
|
||||
evts = append(evts, evt)
|
||||
}
|
||||
|
||||
|
|
@ -565,15 +574,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
|||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&lastID, &eventBytes); err != nil {
|
||||
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
|
||||
return 0, evts, err
|
||||
}
|
||||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||
return 0, evts, err
|
||||
}
|
||||
evt.Visibility = historyVisibility
|
||||
evts = append(evts, evt)
|
||||
}
|
||||
|
||||
|
|
@ -584,15 +595,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
var result []types.StreamEvent
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
sessionID *int64
|
||||
txnID *string
|
||||
transactionID *api.TransactionID
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
sessionID *int64
|
||||
txnID *string
|
||||
transactionID *api.TransactionID
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil {
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
|
|
@ -607,7 +619,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
TransactionID: *txnID,
|
||||
}
|
||||
}
|
||||
|
||||
ev.Visibility = historyVisibility
|
||||
result = append(result, types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
StreamPosition: streamPos,
|
||||
|
|
|
|||
|
|
@ -42,18 +42,16 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
|||
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err = d.db.Exec(outputRoomEventsSchema); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err = d.db.Exec(currentRoomStateSchema); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
accountData, err := NewPostgresAccountDataTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events, err := NewPostgresEventsTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currState, err := NewPostgresCurrentRoomStateTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
invites, err := NewPostgresInvitesTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -101,9 +99,19 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
|||
m := sqlutil.NewMigrations()
|
||||
deltas.LoadFixSequences(m)
|
||||
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
||||
deltas.LoadAddHistoryVisibilityColumn(m)
|
||||
if err = m.RunDeltas(d.db, dbProperties); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// prepare statements after the migrations have run
|
||||
events, err := NewPostgresEventsTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currState, err := NewPostgresCurrentRoomStateTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.Database = shared.Database{
|
||||
DB: d.db,
|
||||
Writer: d.writer,
|
||||
|
|
|
|||
|
|
@ -368,11 +368,12 @@ func (d *Database) WriteEvent(
|
|||
addStateEvents []*gomatrixserverlib.HeaderedEvent,
|
||||
addStateEventIDs, removeStateEventIDs []string,
|
||||
transactionID *api.TransactionID, excludeFromSync bool,
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (pduPosition types.StreamPosition, returnErr error) {
|
||||
returnErr = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
var err error
|
||||
pos, err := d.OutputEvents.InsertEvent(
|
||||
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
|
||||
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, historyVisibility,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
|
||||
|
|
@ -391,7 +392,9 @@ func (d *Database) WriteEvent(
|
|||
// Nothing to do, the event may have just been a message event.
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := range addStateEvents {
|
||||
addStateEvents[i].Visibility = historyVisibility
|
||||
}
|
||||
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition, topoPosition)
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
|
|||
headered_event_json TEXT NOT NULL,
|
||||
membership TEXT,
|
||||
added_at BIGINT,
|
||||
history_visibility SMALLINT NOT NULL DEFAULT 2, -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined)
|
||||
UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
-- for event deletion
|
||||
|
|
@ -52,8 +53,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_current_room_state_eventid_idx ON sync
|
|||
`
|
||||
|
||||
const upsertRoomStateSQL = "" +
|
||||
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" +
|
||||
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at, history_visibility)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" +
|
||||
" ON CONFLICT (room_id, type, state_key)" +
|
||||
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9"
|
||||
|
||||
|
|
@ -84,11 +85,11 @@ const selectStateEventSQL = "" +
|
|||
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
// TODO: The session_id and transaction_id blanks are here because otherwise
|
||||
// the rowsToStreamEvents expects there to be exactly six columns. We need to
|
||||
// TODO: The session_id and transaction_id blanks are here because
|
||||
// the rowsToStreamEvents expects there to be exactly seven columns. We need to
|
||||
// figure out if these really need to be in the DB, and if so, we need a
|
||||
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
|
||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" +
|
||||
" FROM syncapi_current_room_state WHERE event_id IN ($1)"
|
||||
|
||||
const selectSharedUsersSQL = "" +
|
||||
|
|
@ -328,6 +329,7 @@ func (s *currentRoomStateStatements) UpsertRoomState(
|
|||
headeredJSON,
|
||||
membership,
|
||||
addedAt,
|
||||
event.Visibility,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,82 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package deltas
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
func LoadAddHistoryVisibilityColumn(m *sqlutil.Migrations) {
|
||||
m.AddMigration(UpAddHistoryVisibilityColumn, DownAddHistoryVisibilityColumn)
|
||||
}
|
||||
|
||||
func UpAddHistoryVisibilityColumn(tx *sql.Tx) error {
|
||||
// SQLite doesn't have "if exists", so check if the column exists. If the query doesn't return an error, it already exists.
|
||||
// Required for unit tests, as otherwise a duplicate column error will show up.
|
||||
_, err := tx.Query("SELECT history_visibility FROM syncapi_output_room_events LIMIT 1")
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
_, err = tx.Exec(`
|
||||
ALTER TABLE syncapi_output_room_events ADD COLUMN history_visibility SMALLINT NOT NULL DEFAULT 2;
|
||||
UPDATE syncapi_output_room_events SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
|
||||
_, err = tx.Query("SELECT history_visibility FROM syncapi_current_room_state LIMIT 1")
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
_, err = tx.Exec(`
|
||||
ALTER TABLE syncapi_current_room_state ADD COLUMN history_visibility SMALLINT NOT NULL DEFAULT 2;
|
||||
UPDATE syncapi_current_room_state SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func DownAddHistoryVisibilityColumn(tx *sql.Tx) error {
|
||||
// SQLite doesn't have "if exists", so check if the column exists.
|
||||
_, err := tx.Query("SELECT history_visibility FROM syncapi_output_room_events LIMIT 1")
|
||||
if err != nil {
|
||||
// The column probably doesn't exist
|
||||
return nil
|
||||
}
|
||||
_, err = tx.Exec(`
|
||||
ALTER TABLE syncapi_output_room_events DROP COLUMN history_visibility;
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute downgrade: %w", err)
|
||||
}
|
||||
_, err = tx.Query("SELECT history_visibility FROM syncapi_current_room_state LIMIT 1")
|
||||
if err != nil {
|
||||
// The column probably doesn't exist
|
||||
return nil
|
||||
}
|
||||
_, err = tx.Exec(`
|
||||
ALTER TABLE syncapi_current_room_state DROP COLUMN history_visibility;
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute downgrade: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -47,7 +47,8 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
|
|||
remove_state_ids TEXT, -- JSON encoded string array
|
||||
session_id BIGINT,
|
||||
transaction_id TEXT,
|
||||
exclude_from_sync BOOL NOT NULL DEFAULT FALSE
|
||||
exclude_from_sync BOOL NOT NULL DEFAULT FALSE,
|
||||
history_visibility SMALLINT NOT NULL DEFAULT 2 -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type);
|
||||
|
|
@ -58,27 +59,27 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s
|
|||
|
||||
const insertEventSQL = "" +
|
||||
"INSERT INTO syncapi_output_room_events (" +
|
||||
"id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
|
||||
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
|
||||
"ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $13)"
|
||||
"id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
|
||||
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) " +
|
||||
"ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $14)"
|
||||
|
||||
const selectEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id IN ($1)"
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id IN ($1)"
|
||||
|
||||
const selectRecentEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3"
|
||||
|
||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||
|
||||
const selectRecentEventsForSyncSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE"
|
||||
|
||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||
|
||||
const selectEarlyEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3"
|
||||
|
||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||
|
|
@ -90,7 +91,7 @@ const updateEventJSONSQL = "" +
|
|||
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
|
||||
|
||||
const selectStateInRangeSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
||||
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
|
||||
" FROM syncapi_output_room_events" +
|
||||
" WHERE (id > $1 AND id <= $2)" +
|
||||
" AND room_id IN ($3)" +
|
||||
|
|
@ -102,15 +103,15 @@ const deleteEventsForRoomSQL = "" +
|
|||
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
||||
|
||||
const selectContextEventSQL = "" +
|
||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
||||
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
||||
|
||||
const selectContextBeforeEventSQL = "" +
|
||||
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2"
|
||||
"SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2"
|
||||
|
||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||
|
||||
const selectContextAfterEventSQL = "" +
|
||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
|
||||
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
|
||||
|
||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||
|
||||
|
|
@ -196,14 +197,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
addIDsJSON string
|
||||
delIDsJSON string
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
addIDsJSON string
|
||||
delIDsJSON string
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil {
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON, &historyVisibility); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
|
|
@ -239,6 +241,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
needSet[id] = true
|
||||
}
|
||||
stateNeeded[ev.RoomID()] = needSet
|
||||
ev.Visibility = historyVisibility
|
||||
|
||||
eventIDToEvent[eventID] = types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
|
|
@ -270,7 +273,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
|
|||
func (s *outputRoomEventsStatements) InsertEvent(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
|
||||
transactionID *api.TransactionID, excludeFromSync bool,
|
||||
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (types.StreamPosition, error) {
|
||||
var txnID *string
|
||||
var sessionID *int64
|
||||
|
|
@ -326,6 +329,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
|||
sessionID,
|
||||
txnID,
|
||||
excludeFromSync,
|
||||
historyVisibility,
|
||||
excludeFromSync,
|
||||
)
|
||||
return streamPos, err
|
||||
|
|
@ -481,15 +485,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
var result []types.StreamEvent
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
sessionID *int64
|
||||
txnID *string
|
||||
transactionID *api.TransactionID
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
sessionID *int64
|
||||
txnID *string
|
||||
transactionID *api.TransactionID
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil {
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
|
|
@ -505,6 +510,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
}
|
||||
}
|
||||
|
||||
ev.Visibility = historyVisibility
|
||||
|
||||
result = append(result, types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
StreamPosition: streamPos,
|
||||
|
|
@ -519,13 +526,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(
|
|||
) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
|
||||
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
|
||||
var eventAsString string
|
||||
if err = row.Scan(&id, &eventAsString); err != nil {
|
||||
var historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil {
|
||||
return 0, evt, err
|
||||
}
|
||||
|
||||
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
|
||||
return 0, evt, err
|
||||
}
|
||||
evt.Visibility = historyVisibility
|
||||
return id, evt, nil
|
||||
}
|
||||
|
||||
|
|
@ -550,15 +559,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
|||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&eventBytes); err != nil {
|
||||
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
|
||||
return evts, err
|
||||
}
|
||||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||
return evts, err
|
||||
}
|
||||
evt.Visibility = historyVisibility
|
||||
evts = append(evts, evt)
|
||||
}
|
||||
|
||||
|
|
@ -586,15 +597,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
|||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&lastID, &eventBytes); err != nil {
|
||||
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
|
||||
return 0, evts, err
|
||||
}
|
||||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||
return 0, evts, err
|
||||
}
|
||||
evt.Visibility = historyVisibility
|
||||
evts = append(evts, evt)
|
||||
}
|
||||
return lastID, evts, rows.Err()
|
||||
|
|
|
|||
|
|
@ -52,18 +52,16 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
if err = d.streamID.Prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = d.db.Exec(outputRoomEventsSchema); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = d.db.Exec(currentRoomStateSchema); err != nil {
|
||||
return err
|
||||
}
|
||||
accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events, err := NewSqliteEventsTable(d.db, &d.streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
invites, err := NewSqliteInvitesTable(d.db, &d.streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -111,9 +109,19 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
m := sqlutil.NewMigrations()
|
||||
deltas.LoadFixSequences(m)
|
||||
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
||||
deltas.LoadAddHistoryVisibilityColumn(m)
|
||||
if err = m.RunDeltas(d.db, dbProperties); err != nil {
|
||||
return err
|
||||
}
|
||||
// prepare statements after the migrations have run
|
||||
events, err := NewSqliteEventsTable(d.db, &d.streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.Database = shared.Database{
|
||||
DB: d.db,
|
||||
Writer: d.writer,
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserver
|
|||
addStateEvents = append(addStateEvents, ev)
|
||||
addStateEventIDs = append(addStateEventIDs, ev.EventID())
|
||||
}
|
||||
pos, err := db.WriteEvent(ctx, ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false)
|
||||
pos, err := db.WriteEvent(ctx, ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false, gomatrixserverlib.HistoryVisibilityShared)
|
||||
if err != nil {
|
||||
t.Fatalf("WriteEvent failed: %s", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,7 +52,14 @@ type Peeks interface {
|
|||
type Events interface {
|
||||
SelectStateInRange(ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter, roomIDs []string) (map[string]map[string]bool, map[string]types.StreamEvent, error)
|
||||
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||
InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error)
|
||||
InsertEvent(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event *gomatrixserverlib.HeaderedEvent,
|
||||
addState, removeState []string,
|
||||
transactionID *api.TransactionID,
|
||||
excludeFromSync bool,
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (streamPos types.StreamPosition, err error)
|
||||
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
|
||||
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
|
||||
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ func TestOutputRoomEventsTable(t *testing.T) {
|
|||
events := room.Events()
|
||||
err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error {
|
||||
for _, ev := range events {
|
||||
_, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false)
|
||||
_, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to InsertEvent: %s", err)
|
||||
}
|
||||
|
|
@ -79,7 +79,7 @@ func TestOutputRoomEventsTable(t *testing.T) {
|
|||
"body": "test.txt",
|
||||
"url": "mxc://test.txt",
|
||||
})
|
||||
if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false); err != nil {
|
||||
if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared); err != nil {
|
||||
return fmt.Errorf("failed to InsertEvent: %s", err)
|
||||
}
|
||||
wantEventID := []string{urlEv.EventID()}
|
||||
|
|
|
|||
|
|
@ -139,7 +139,7 @@ func (p *phoneHomeStats) collect() {
|
|||
|
||||
output := bytes.Buffer{}
|
||||
if err = json.NewEncoder(&output).Encode(p.stats); err != nil {
|
||||
logrus.WithError(err).Error("unable to encode anonymous stats")
|
||||
logrus.WithError(err).Error("Unable to encode phone-home statistics")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -147,14 +147,14 @@ func (p *phoneHomeStats) collect() {
|
|||
|
||||
request, err := http.NewRequestWithContext(ctx, http.MethodPost, p.cfg.Global.ReportStats.Endpoint, &output)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("unable to create anonymous stats request")
|
||||
logrus.WithError(err).Error("Unable to create phone-home statistics request")
|
||||
return
|
||||
}
|
||||
request.Header.Set("User-Agent", "Dendrite/"+internal.VersionString())
|
||||
|
||||
_, err = p.client.Do(request)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("unable to send anonymous stats")
|
||||
logrus.WithError(err).Error("Unable to send phone-home statistics")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue