Merge branch 'matrix-org:main' into main

This commit is contained in:
Brian Meek 2022-08-30 13:02:17 -07:00 committed by GitHub
commit 4906127528
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
59 changed files with 704 additions and 291 deletions

View file

@ -7,6 +7,7 @@ on:
pull_request:
release:
types: [published]
workflow_dispatch:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
@ -375,6 +376,8 @@ jobs:
# Build initial Dendrite image
- run: docker build -t complement-dendrite -f build/scripts/Complement${{ matrix.postgres }}.Dockerfile .
working-directory: dendrite
env:
DOCKER_BUILDKIT: 1
# Run Complement
- run: |

View file

@ -1,5 +1,40 @@
# Changelog
## Dendrite 0.9.5 (2022-08-25)
### Fixes
* The roomserver will now correctly unreject previously rejected events if necessary when reprocessing
* The handling of event soft-failure has been improved on the roomserver input by no longer applying rejection rules and still calculating state before the event if possible
* The federation `/state` and `/state_ids` endpoints should now return the correct error code when the state isn't known instead of returning a HTTP 500
* The federation `/event` should now return outlier events correctly instead of returning a HTTP 500
* A bug in the federation backoff allowing zero intervals has been corrected
* The `create-account` utility will no longer error if the homeserver URL ends in a trailing slash
* A regression in `/sync` introduced in 0.9.4 should be fixed
## Dendrite 0.9.4 (2022-08-19)
### Fixes
* A bug in the roomserver around handling rejected outliers has been fixed
* Backfilled events will now use the correct history visibility where possible
* The device list updater backoff has been fixed, which should reduce the number of outbound HTTP requests and `Failed to query device keys for some users` log entries for dead servers
* The `/sync` endpoint will no longer incorrectly return room entries for retired invites which could cause some rooms to show up in the client "Historical" section
* The `/createRoom` endpoint will now correctly populate `is_direct` in invite membership events, which may help clients to classify direct messages correctly
* The `create-account` tool will now log an error if the shared secret is not set in the Dendrite config
* A couple of minor bugs have been fixed in the membership lazy-loading
* Queued EDUs in the federation API are now cached properly
## Dendrite 0.9.3 (2022-08-15)
### Important
* This is a **security release** to fix a vulnerability within event auth, affecting all versions of Dendrite before 0.9.3. Upgrading to this version is highly recommended. For more information, [see here](https://github.com/matrix-org/gomatrixserverlib/security/advisories/GHSA-grvv-h2f9-7v9c).
### Fixes
* Dendrite will now correctly parse the `"events_default"` power level value for event auth.
## Dendrite 0.9.2 (2022-08-12)
### Features

View file

@ -18,14 +18,15 @@ import (
"context"
"encoding/json"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
@ -103,6 +104,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
}
if len(eventsReq.EventIDs) > 0 {
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed")
return false
}
events = append(events, eventsRes.Events...)

View file

@ -1,3 +1,5 @@
#syntax=docker/dockerfile:1.2
FROM golang:1.18-stretch as build
RUN apt-get update && apt-get install -y sqlite3
WORKDIR /build
@ -8,14 +10,12 @@ RUN mkdir /dendrite
# Utilise Docker caching when downloading dependencies, this stops us needlessly
# downloading dependencies every time.
COPY go.mod .
COPY go.sum .
RUN go mod download
COPY . .
RUN go build -o /dendrite ./cmd/dendrite-monolith-server
RUN go build -o /dendrite ./cmd/generate-keys
RUN go build -o /dendrite ./cmd/generate-config
RUN --mount=target=. \
--mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build -o /dendrite ./cmd/generate-config && \
go build -o /dendrite ./cmd/generate-keys && \
go build -o /dendrite ./cmd/dendrite-monolith-server
WORKDIR /dendrite
RUN ./generate-keys --private-key matrix_key.pem
@ -26,7 +26,7 @@ EXPOSE 8008 8448
# At runtime, generate TLS cert based on the CA now mounted at /ca
# At runtime, replace the SERVER_NAME with what we are told
CMD ./generate-keys --server $SERVER_NAME --tls-cert server.crt --tls-key server.key --tls-authority-cert /complement/ca/ca.crt --tls-authority-key /complement/ca/ca.key && \
CMD ./generate-keys -keysize 1024 --server $SERVER_NAME --tls-cert server.crt --tls-key server.key --tls-authority-cert /complement/ca/ca.crt --tls-authority-key /complement/ca/ca.key && \
./generate-config -server $SERVER_NAME --ci > dendrite.yaml && \
cp /complement/ca/ca.crt /usr/local/share/ca-certificates/ && update-ca-certificates && \
./dendrite-monolith-server --really-enable-open-registration --tls-cert server.crt --tls-key server.key --config dendrite.yaml -api=${API:-0}
exec ./dendrite-monolith-server --really-enable-open-registration --tls-cert server.crt --tls-key server.key --config dendrite.yaml -api=${API:-0}

View file

@ -1,3 +1,5 @@
#syntax=docker/dockerfile:1.2
# A local development Complement dockerfile, to be used with host mounts
# /cache -> Contains the entire dendrite code at Dockerfile build time. Builds binaries but only keeps the generate-* ones. Pre-compilation saves time.
# /dendrite -> Host-mounted sources
@ -9,11 +11,10 @@
FROM golang:1.18-stretch
RUN apt-get update && apt-get install -y sqlite3
WORKDIR /runtime
ENV SERVER_NAME=localhost
EXPOSE 8008 8448
WORKDIR /runtime
# This script compiles Dendrite for us.
RUN echo '\
#!/bin/bash -eux \n\
@ -29,25 +30,23 @@ RUN echo '\
RUN echo '\
#!/bin/bash -eu \n\
./generate-keys --private-key matrix_key.pem \n\
./generate-keys --server $SERVER_NAME --tls-cert server.crt --tls-key server.key --tls-authority-cert /complement/ca/ca.crt --tls-authority-key /complement/ca/ca.key \n\
./generate-keys -keysize 1024 --server $SERVER_NAME --tls-cert server.crt --tls-key server.key --tls-authority-cert /complement/ca/ca.crt --tls-authority-key /complement/ca/ca.key \n\
./generate-config -server $SERVER_NAME --ci > dendrite.yaml \n\
cp /complement/ca/ca.crt /usr/local/share/ca-certificates/ && update-ca-certificates \n\
./dendrite-monolith-server --really-enable-open-registration --tls-cert server.crt --tls-key server.key --config dendrite.yaml \n\
exec ./dendrite-monolith-server --really-enable-open-registration --tls-cert server.crt --tls-key server.key --config dendrite.yaml \n\
' > run.sh && chmod +x run.sh
WORKDIR /cache
# Pre-download deps; we don't need to do this if the GOPATH is mounted.
COPY go.mod .
COPY go.sum .
RUN go mod download
# Build the monolith in /cache - we won't actually use this but will rely on build artifacts to speed
# up the real compilation. Build the generate-* binaries in the true /runtime locations.
# If the generate-* source is changed, this dockerfile needs re-running.
COPY . .
RUN go build ./cmd/dendrite-monolith-server && go build -o /runtime ./cmd/generate-keys && go build -o /runtime ./cmd/generate-config
RUN --mount=target=. \
--mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build -o /runtime ./cmd/generate-config && \
go build -o /runtime ./cmd/generate-keys
WORKDIR /runtime
CMD /runtime/compile.sh && /runtime/run.sh
CMD /runtime/compile.sh && exec /runtime/run.sh

View file

@ -1,3 +1,5 @@
#syntax=docker/dockerfile:1.2
FROM golang:1.18-stretch as build
RUN apt-get update && apt-get install -y postgresql
WORKDIR /build
@ -26,14 +28,12 @@ RUN mkdir /dendrite
# Utilise Docker caching when downloading dependencies, this stops us needlessly
# downloading dependencies every time.
COPY go.mod .
COPY go.sum .
RUN go mod download
COPY . .
RUN go build -o /dendrite ./cmd/dendrite-monolith-server
RUN go build -o /dendrite ./cmd/generate-keys
RUN go build -o /dendrite ./cmd/generate-config
RUN --mount=target=. \
--mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build -o /dendrite ./cmd/generate-config && \
go build -o /dendrite ./cmd/generate-keys && \
go build -o /dendrite ./cmd/dendrite-monolith-server
WORKDIR /dendrite
RUN ./generate-keys --private-key matrix_key.pem
@ -45,10 +45,10 @@ EXPOSE 8008 8448
# At runtime, generate TLS cert based on the CA now mounted at /ca
# At runtime, replace the SERVER_NAME with what we are told
CMD /build/run_postgres.sh && ./generate-keys --server $SERVER_NAME --tls-cert server.crt --tls-key server.key --tls-authority-cert /complement/ca/ca.crt --tls-authority-key /complement/ca/ca.key && \
CMD /build/run_postgres.sh && ./generate-keys --keysize 1024 --server $SERVER_NAME --tls-cert server.crt --tls-key server.key --tls-authority-cert /complement/ca/ca.crt --tls-authority-key /complement/ca/ca.key && \
./generate-config -server $SERVER_NAME --ci > dendrite.yaml && \
# Replace the connection string with a single postgres DB, using user/db = 'postgres' and no password, bump max_conns
sed -i "s%connection_string:.*$%connection_string: postgresql://postgres@localhost/postgres?sslmode=disable%g" dendrite.yaml && \
sed -i 's/max_open_conns:.*$/max_open_conns: 100/g' dendrite.yaml && \
cp /complement/ca/ca.crt /usr/local/share/ca-certificates/ && update-ca-certificates && \
./dendrite-monolith-server --really-enable-open-registration --tls-cert server.crt --tls-key server.key --config dendrite.yaml -api=${API:-0}
exec ./dendrite-monolith-server --really-enable-open-registration --tls-cert server.crt --tls-key server.key --config dendrite.yaml -api=${API:-0}

View file

@ -49,6 +49,7 @@ type createRoomRequest struct {
GuestCanJoin bool `json:"guest_can_join"`
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
PowerLevelContentOverride json.RawMessage `json:"power_level_content_override"`
IsDirect bool `json:"is_direct"`
}
const (
@ -499,9 +500,17 @@ func createRoom(
// Build some stripped state for the invite.
var globalStrippedState []gomatrixserverlib.InviteV2StrippedState
for _, event := range builtEvents {
// Chosen events from the spec:
// https://spec.matrix.org/v1.3/client-server-api/#stripped-state
switch event.Type() {
case gomatrixserverlib.MRoomCreate:
fallthrough
case gomatrixserverlib.MRoomName:
fallthrough
case gomatrixserverlib.MRoomAvatar:
fallthrough
case gomatrixserverlib.MRoomTopic:
fallthrough
case gomatrixserverlib.MRoomCanonicalAlias:
fallthrough
case gomatrixserverlib.MRoomEncryption:
@ -522,7 +531,7 @@ func createRoom(
// Build the invite event.
inviteEvent, err := buildMembershipEvent(
ctx, invitee, "", profileAPI, device, gomatrixserverlib.Invite,
roomID, true, cfg, evTime, rsAPI, asAPI,
roomID, r.IsDirect, cfg, evTime, rsAPI, asAPI,
)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("buildMembershipEvent failed")

View file

@ -293,19 +293,19 @@ type recaptchaResponse struct {
}
// validateUsername returns an error response if the username is invalid
func validateUsername(username string) *util.JSONResponse {
func validateUsername(localpart string, domain gomatrixserverlib.ServerName) *util.JSONResponse {
// https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/rest/client/v2_alpha/register.py#L161
if len(username) > maxUsernameLength {
if id := fmt.Sprintf("@%s:%s", localpart, domain); len(id) > maxUsernameLength {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON(fmt.Sprintf("'username' >%d characters", maxUsernameLength)),
JSON: jsonerror.BadJSON(fmt.Sprintf("%q exceeds the maximum length of %d characters", id, maxUsernameLength)),
}
} else if !validUsernameRegex.MatchString(username) {
} else if !validUsernameRegex.MatchString(localpart) {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidUsername("Username can only contain characters a-z, 0-9, or '_-./='"),
}
} else if username[0] == '_' { // Regex checks its not a zero length string
} else if localpart[0] == '_' { // Regex checks its not a zero length string
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidUsername("Username cannot start with a '_'"),
@ -315,13 +315,13 @@ func validateUsername(username string) *util.JSONResponse {
}
// validateApplicationServiceUsername returns an error response if the username is invalid for an application service
func validateApplicationServiceUsername(username string) *util.JSONResponse {
if len(username) > maxUsernameLength {
func validateApplicationServiceUsername(localpart string, domain gomatrixserverlib.ServerName) *util.JSONResponse {
if id := fmt.Sprintf("@%s:%s", localpart, domain); len(id) > maxUsernameLength {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON(fmt.Sprintf("'username' >%d characters", maxUsernameLength)),
JSON: jsonerror.BadJSON(fmt.Sprintf("%q exceeds the maximum length of %d characters", id, maxUsernameLength)),
}
} else if !validUsernameRegex.MatchString(username) {
} else if !validUsernameRegex.MatchString(localpart) {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidUsername("Username can only contain characters a-z, 0-9, or '_-./='"),
@ -540,7 +540,7 @@ func validateApplicationService(
}
// Check username application service is trying to register is valid
if err := validateApplicationServiceUsername(username); err != nil {
if err := validateApplicationServiceUsername(username, cfg.Matrix.ServerName); err != nil {
return "", err
}
@ -621,7 +621,7 @@ func Register(
case r.Type == authtypes.LoginTypeApplicationService && accessTokenErr == nil:
// Spec-compliant case (the access_token is specified and the login type
// is correctly set, so it's an appservice registration)
if resErr := validateApplicationServiceUsername(r.Username); resErr != nil {
if resErr := validateApplicationServiceUsername(r.Username, cfg.Matrix.ServerName); resErr != nil {
return *resErr
}
case accessTokenErr == nil:
@ -638,7 +638,7 @@ func Register(
default:
// Spec-compliant case (neither the access_token nor the login type are
// specified, so it's a normal user registration)
if resErr := validateUsername(r.Username); resErr != nil {
if resErr := validateUsername(r.Username, cfg.Matrix.ServerName); resErr != nil {
return *resErr
}
}
@ -1050,7 +1050,7 @@ func RegisterAvailable(
// Squash username to all lowercase letters
username = strings.ToLower(username)
if err := validateUsername(username); err != nil {
if err := validateUsername(username, cfg.Matrix.ServerName); err != nil {
return *err
}
@ -1091,7 +1091,7 @@ func RegisterAvailable(
}
}
func handleSharedSecretRegistration(userAPI userapi.ClientUserAPI, sr *SharedSecretRegistration, req *http.Request) util.JSONResponse {
func handleSharedSecretRegistration(cfg *config.ClientAPI, userAPI userapi.ClientUserAPI, sr *SharedSecretRegistration, req *http.Request) util.JSONResponse {
ssrr, err := NewSharedSecretRegistrationRequest(req.Body)
if err != nil {
return util.JSONResponse{
@ -1112,7 +1112,7 @@ func handleSharedSecretRegistration(userAPI userapi.ClientUserAPI, sr *SharedSec
// downcase capitals
ssrr.User = strings.ToLower(ssrr.User)
if resErr := validateUsername(ssrr.User); resErr != nil {
if resErr := validateUsername(ssrr.User, cfg.Matrix.ServerName); resErr != nil {
return *resErr
}
if resErr := validatePassword(ssrr.Password); resErr != nil {

View file

@ -133,7 +133,7 @@ func Setup(
}
}
if req.Method == http.MethodPost {
return handleSharedSecretRegistration(userAPI, sr, req)
return handleSharedSecretRegistration(cfg, userAPI, sr, req)
}
return util.JSONResponse{
Code: http.StatusMethodNotAllowed,

View file

@ -66,10 +66,11 @@ var (
resetPassword = flag.Bool("reset-password", false, "Deprecated")
serverURL = flag.String("url", "https://localhost:8448", "The URL to connect to.")
validUsernameRegex = regexp.MustCompile(`^[0-9a-z_\-=./]+$`)
timeout = flag.Duration("timeout", time.Second*30, "Timeout for the http client when connecting to the server")
)
var cl = http.Client{
Timeout: time.Second * 10,
Timeout: time.Second * 30,
Transport: http.DefaultTransport,
}
@ -85,6 +86,10 @@ func main() {
logrus.Fatalf("The reset-password flag has been replaced by the POST /_dendrite/admin/resetPassword/{localpart} admin API.")
}
if cfg.ClientAPI.RegistrationSharedSecret == "" {
logrus.Fatalln("Shared secret registration is not enabled, enable it by setting a shared secret in the config: 'client_api.registration_shared_secret'")
}
if *username == "" {
flag.Usage()
os.Exit(1)
@ -104,6 +109,8 @@ func main() {
logrus.Fatalln(err)
}
cl.Timeout = *timeout
accessToken, err := sharedSecretRegister(cfg.ClientAPI.RegistrationSharedSecret, *serverURL, *username, pass, *isAdmin)
if err != nil {
logrus.Fatalln("Failed to create the account:", err.Error())
@ -120,8 +127,8 @@ type sharedSecretRegistrationRequest struct {
Admin bool `json:"admin"`
}
func sharedSecretRegister(sharedSecret, serverURL, localpart, password string, admin bool) (accesToken string, err error) {
registerURL := fmt.Sprintf("%s/_synapse/admin/v1/register", serverURL)
func sharedSecretRegister(sharedSecret, serverURL, localpart, password string, admin bool) (accessToken string, err error) {
registerURL := fmt.Sprintf("%s/_synapse/admin/v1/register", strings.Trim(serverURL, "/"))
nonceReq, err := http.NewRequest(http.MethodGet, registerURL, nil)
if err != nil {
return "", fmt.Errorf("unable to create http request: %w", err)

View file

@ -38,6 +38,7 @@ var (
authorityCertFile = flag.String("tls-authority-cert", "", "Optional: Create TLS certificate/keys based on this CA authority. Useful for integration testing.")
authorityKeyFile = flag.String("tls-authority-key", "", "Optional: Create TLS certificate/keys based on this CA authority. Useful for integration testing.")
serverName = flag.String("server", "", "Optional: Create TLS certificate/keys with this domain name set. Useful for integration testing.")
keySize = flag.Int("keysize", 4096, "Optional: Create TLS RSA private key with the given key size")
)
func main() {
@ -58,12 +59,12 @@ func main() {
log.Fatal("Zero or both of --tls-key and --tls-cert must be supplied")
}
if *authorityCertFile == "" && *authorityKeyFile == "" {
if err := test.NewTLSKey(*tlsKeyFile, *tlsCertFile); err != nil {
if err := test.NewTLSKey(*tlsKeyFile, *tlsCertFile, *keySize); err != nil {
panic(err)
}
} else {
// generate the TLS cert/key based on the authority given.
if err := test.NewTLSKeyWithAuthority(*serverName, *tlsKeyFile, *tlsCertFile, *authorityKeyFile, *authorityCertFile); err != nil {
if err := test.NewTLSKeyWithAuthority(*serverName, *tlsKeyFile, *tlsCertFile, *authorityKeyFile, *authorityCertFile, *keySize); err != nil {
panic(err)
}
}

View file

@ -12,7 +12,13 @@ Mostly, although there are still bugs and missing features. If you are a confide
## Is Dendrite feature-complete?
No, although a good portion of the Matrix specification has been implemented. Mostly missing are client features - see the readme at the root of the repository for more information.
No, although a good portion of the Matrix specification has been implemented. Mostly missing are client features - see the [readme](../README.md) at the root of the repository for more information.
## Why doesn't Dendrite have "x" yet?
Dendrite development is currently supported by a small team of developers and due to those limited resources, the majority of the effort is focused on getting Dendrite to be
specification complete. If there are major features you're requesting (e.g. new administration endpoints), we'd like to strongly encourage you to join the community in supporting
the development efforts through [contributing](https://matrix-org.github.io/dendrite/development/contributing).
## Is there a migration path from Synapse to Dendrite?
@ -43,6 +49,20 @@ It should do, although we are aware of some minor issues:
* **Element Android**: registration does not work, but logging in with an existing account does
* **Hydrogen**: occasionally sync can fail due to gaps in the `since` parameter, but clearing the cache fixes this
## Does Dendrite support Space Summaries?
Yes, [Space Summaries](https://github.com/matrix-org/matrix-spec-proposals/pull/2946) were merged into the Matrix Spec as of 2022-01-17 however, they are still treated as an MSC (Matrix Specification Change) in Dendrite. In order to enable Space Summaries in Dendrite, you must add the MSC to the MSC configuration section in the configuration YAML. If the MSC is not enabled, a user will typically see a perpetual loading icon on the summary page. See below for a demonstration of how to add to the Dendrite configuration:
```
mscs:
mscs:
- msc2946
```
Similarly, [msc2836](https://github.com/matrix-org/matrix-spec-proposals/pull/2836) would need to be added to mscs configuration in order to support Threading. Other MSCs are not currently supported.
Please note that MSCs should be considered experimental and can result in significant usability issues when enabled. If you'd like more details on how MSCs are ratified or the current status of MSCs, please see the [Matrix specification documentation](https://spec.matrix.org/proposals/) on the subject.
## Does Dendrite support push notifications?
Yes, we have experimental support for push notifications. Configure them in the usual way in your Matrix client.
@ -86,6 +106,10 @@ would be a huge help too, as that will help us to understand where the memory us
You may need to revisit the connection limit of your PostgreSQL server and/or make changes to the `max_connections` lines in your Dendrite configuration. Be aware that each Dendrite component opens its own database connections and has its own connection limit, even in monolith mode!
## VOIP and Video Calls don't appear to work on Dendrite
There is likely an issue with your STUN/TURN configuration on the server. If you believe your configuration to be correct, please see the [troubleshooting](administration/5_troubleshooting.md) for troubleshooting recommendations.
## What is being reported when enabling phone-home statistics?
Phone-home statistics contain your server's domain name, some configuration information about

View file

@ -13,6 +13,25 @@ without warning.
More endpoints will be added in the future.
Endpoints may be used directly through curl:
```
curl --header "Authorization: Bearer <access_token>" -X <POST|GET|PUT> <Endpoint URI> -d '<Request Body Contents>'
```
An `access_token` can be obtained through most Element-based matrix clients by going to `Settings` -> `Help & About` -> `Advanced` -> `Access Token`.
Be aware that an `access_token` allows a client to perform actions as an user and should be kept **secret**.
The user must be an administrator in the `account_accounts` table in order to use these endpoints.
Existing user accounts can be set to administrative accounts by changing `account_type` to `3` in `account_accounts`
```
UPDATE account_accounts SET account_type = 3 WHERE localpart = '$localpart';
```
Where `$localpart` is the username only (e.g. `alice`).
## GET `/_dendrite/admin/evacuateRoom/{roomID}`
This endpoint will instruct Dendrite to part all local users from the given `roomID`
@ -38,7 +57,34 @@ Request body format:
Reset the password of a local user. The `localpart` is the username only, i.e. if
the full user ID is `@alice:domain.com` then the local part is `alice`.
## POST `/_synapse/admin/v1/send_server_notice`
Request body format:
```
{
"user_id": "@target_user:server_name",
"content": {
"msgtype": "m.text",
"body": "This is my message"
}
}
```
Send a server notice to a specific user. See the [Matrix Spec](https://spec.matrix.org/v1.3/client-server-api/#server-notices) for additional details on server notice behaviour.
If successfully sent, the API will return the following response:
```
{
"event_id": "<event_id>"
}
```
## GET `/_synapse/admin/v1/register`
Shared secret registration — please see the [user creation page](createusers) for
guidance on configuring and using this endpoint.
## GET `/_matrix/client/v3/admin/whois/{userId}`
From the [Matrix Spec](https://spec.matrix.org/v1.3/client-server-api/#get_matrixclientv3adminwhoisuserid).
Gets information about a particular user. `userId` is the full user ID (e.g. `@alice:domain.com`)

View file

@ -77,5 +77,12 @@ If there aren't, you will see a log lines like this:
level=warning msg="IMPORTANT: Process file descriptor limit is currently 65535, it is recommended to raise the limit for Dendrite to at least 65535 to avoid issues"
```
Follow the [Optimisation](../installation/10_optimisation.md) instructions to correct the
Follow the [Optimisation](../installation/11_optimisation.md) instructions to correct the
available number of file descriptors.
## 6. STUN/TURN Server tester
If you are experiencing problems with VoIP or video calls, you should check that Dendrite
is able to successfully connect your TURN server using
[Matrix VoIP Tester](https://test.voip.librepush.net/). This can highlight any issues
that the server may encounter so that you can begin the troubleshooting process.

View file

@ -329,6 +329,12 @@ func SendJoin(
JSON: jsonerror.NotFound("Room does not exist"),
}
}
if !stateAndAuthChainResponse.StateKnown {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("State not known"),
}
}
// Check if the user is already in the room. If they're already in then
// there isn't much point in sending another join event into the room.

View file

@ -135,6 +135,12 @@ func getState(
return nil, nil, &resErr
}
if !response.StateKnown {
return nil, nil, &util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("State not known"),
}
}
if response.IsRejected {
return nil, nil, &util.JSONResponse{
Code: http.StatusNotFound,

View file

@ -5,10 +5,11 @@ import (
"sync"
"time"
"github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"go.uber.org/atomic"
"github.com/matrix-org/dendrite/federationapi/storage"
)
// Statistics contains information about all of the remote federated
@ -126,13 +127,13 @@ func (s *ServerStatistics) Failure() (time.Time, bool) {
go func() {
until, ok := s.backoffUntil.Load().(time.Time)
if ok {
if ok && !until.IsZero() {
select {
case <-time.After(time.Until(until)):
case <-s.interrupt:
}
s.backoffStarted.Store(false)
}
s.backoffStarted.Store(false)
}()
}

View file

@ -110,6 +110,7 @@ func (d *Database) GetPendingEDUs(
return fmt.Errorf("json.Unmarshal: %w", err)
}
edus[&Receipt{nid}] = &event
d.Cache.StoreFederationQueuedEDU(nid, &event)
}
return nil
@ -177,20 +178,18 @@ func (d *Database) GetPendingEDUServerNames(
return d.FederationQueueEDUs.SelectQueueEDUServerNames(ctx, nil)
}
// DeleteExpiredEDUs deletes expired EDUs
// DeleteExpiredEDUs deletes expired EDUs and evicts them from the cache.
func (d *Database) DeleteExpiredEDUs(ctx context.Context) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
var jsonNIDs []int64
err := d.Writer.Do(d.DB, nil, func(txn *sql.Tx) (err error) {
expiredBefore := gomatrixserverlib.AsTimestamp(time.Now())
jsonNIDs, err := d.FederationQueueEDUs.SelectExpiredEDUs(ctx, txn, expiredBefore)
jsonNIDs, err = d.FederationQueueEDUs.SelectExpiredEDUs(ctx, txn, expiredBefore)
if err != nil {
return err
}
if len(jsonNIDs) == 0 {
return nil
}
for i := range jsonNIDs {
d.Cache.EvictFederationQueuedEDU(jsonNIDs[i])
}
if err = d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, jsonNIDs); err != nil {
return err
@ -198,4 +197,14 @@ func (d *Database) DeleteExpiredEDUs(ctx context.Context) error {
return d.FederationQueueEDUs.DeleteExpiredEDUs(ctx, txn, expiredBefore)
})
if err != nil {
return err
}
for i := range jsonNIDs {
d.Cache.EvictFederationQueuedEDU(jsonNIDs[i])
}
return nil
}

View file

@ -31,7 +31,7 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Dat
func TestExpireEDUs(t *testing.T) {
var expireEDUTypes = map[string]time.Duration{
gomatrixserverlib.MReceipt: time.Millisecond,
gomatrixserverlib.MReceipt: 0,
}
ctx := context.Background()

2
go.mod
View file

@ -21,7 +21,7 @@ require (
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220801083850-5ff38e2c2839
github.com/matrix-org/gomatrixserverlib v0.0.0-20220830164018-c71e518537a2
github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.13

4
go.sum
View file

@ -478,8 +478,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220801083850-5ff38e2c2839 h1:QEFxKWH8PlEt3ZQKl31yJNAm8lvpNUwT51IMNTl9v1k=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220801083850-5ff38e2c2839/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220830164018-c71e518537a2 h1:esbNn9hg//tAStA6TogatAJAursw23A+yfVRQsdiv70=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220830164018-c71e518537a2/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9 h1:ed8yvWhTLk7+sNeK/eOZRTvESFTOHDRevoRoyeqPtvY=
github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9/go.mod h1:P4MqPf+u83OPulPJ+XTbSDbbWrdFYNY4LZ/B1PIduFE=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 9
VersionPatch = 2
VersionPatch = 5
VersionTag = "" // example: "rc1"
)

View file

@ -335,8 +335,9 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
retriesMu := &sync.Mutex{}
// restarter goroutine which will inject failed servers into ch when it is time
go func() {
var serversToRetry []gomatrixserverlib.ServerName
for {
var serversToRetry []gomatrixserverlib.ServerName
serversToRetry = serversToRetry[:0] // reuse memory
time.Sleep(time.Second)
retriesMu.Lock()
now := time.Now()
@ -355,11 +356,17 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
}
}()
for serverName := range ch {
retriesMu.Lock()
_, exists := retries[serverName]
retriesMu.Unlock()
if exists {
// Don't retry a server that we're already waiting for.
continue
}
waitTime, shouldRetry := u.processServer(serverName)
if shouldRetry {
retriesMu.Lock()
_, exists := retries[serverName]
if !exists {
if _, exists = retries[serverName]; !exists {
retries[serverName] = time.Now().Add(waitTime)
}
retriesMu.Unlock()

View file

@ -60,7 +60,7 @@ func NewInternalAPI(
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
ap.Updater = updater
go func() {
if err = updater.Start(); err != nil {
if err := updater.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start device list updater")
}
}()
@ -68,7 +68,7 @@ func NewInternalAPI(
dlConsumer := consumers.NewDeviceListUpdateConsumer(
base.ProcessContext, cfg, js, updater,
)
if err = dlConsumer.Start(); err != nil {
if err := dlConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start device list consumer")
}

View file

@ -5,9 +5,10 @@ import (
"fmt"
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
)
type PerformErrorCode int
@ -161,7 +162,8 @@ func (r *PerformBackfillRequest) PrevEventIDs() []string {
// PerformBackfillResponse is a response to PerformBackfill.
type PerformBackfillResponse struct {
// Missing events, arbritrary order.
Events []*gomatrixserverlib.HeaderedEvent `json:"events"`
Events []*gomatrixserverlib.HeaderedEvent `json:"events"`
HistoryVisibility gomatrixserverlib.HistoryVisibility `json:"history_visibility"`
}
type PerformPublishRequest struct {

View file

@ -227,6 +227,7 @@ type QueryStateAndAuthChainResponse struct {
// Do all the previous events exist on this roomserver?
// If some of previous events do not exist this will be false and StateEvents will be empty.
PrevEventsExist bool `json:"prev_events_exist"`
StateKnown bool `json:"state_known"`
// The state and auth chain events that were requested.
// The lists will be in an arbitrary order.
StateEvents []*gomatrixserverlib.HeaderedEvent `json:"state_events"`

View file

@ -19,6 +19,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
// SendEvents to the roomserver The events are written with KindNew.
@ -69,6 +70,13 @@ func SendEventWithState(
stateEventIDs[i] = stateEvents[i].EventID()
}
logrus.WithContext(ctx).WithFields(logrus.Fields{
"room_id": event.RoomID(),
"event_id": event.EventID(),
"outliers": len(ires),
"state_ids": len(stateEventIDs),
}).Infof("Submitting %q event to roomserver with state snapshot", event.Type())
ires = append(ires, InputRoomEvent{
Kind: kind,
Event: event,

View file

@ -39,7 +39,7 @@ func CheckForSoftFail(
var authStateEntries []types.StateEntry
var err error
if rewritesState {
authStateEntries, err = db.StateEntriesForEventIDs(ctx, stateEventIDs)
authStateEntries, err = db.StateEntriesForEventIDs(ctx, stateEventIDs, true)
if err != nil {
return true, fmt.Errorf("StateEntriesForEventIDs failed: %w", err)
}
@ -97,7 +97,7 @@ func CheckAuthEvents(
authEventIDs []string,
) ([]types.EventNID, error) {
// Grab the numeric IDs for the supplied auth state events from the database.
authStateEntries, err := db.StateEntriesForEventIDs(ctx, authEventIDs)
authStateEntries, err := db.StateEntriesForEventIDs(ctx, authEventIDs, true)
if err != nil {
return nil, fmt.Errorf("db.StateEntriesForEventIDs: %w", err)
}

View file

@ -254,8 +254,15 @@ func CheckServerAllowedToSeeEvent(
return false, err
}
default:
// Something else went wrong
return false, err
switch err.(type) {
case types.MissingStateError:
// If there's no state then we assume it's open visibility, as Synapse does:
// https://github.com/matrix-org/synapse/blob/aec87a0f9369a3015b2a53469f88d1de274e8b71/synapse/visibility.py#L654-L655
return true, nil
default:
// Something else went wrong
return false, err
}
}
return auth.IsServerAllowed(serverName, isServerInRoom, stateAtEvent), nil
}

View file

@ -36,6 +36,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@ -247,14 +248,24 @@ func (w *worker) _next() {
// it was a synchronous request.
var errString string
if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
switch err.(type) {
case types.RejectedError:
// Don't send events that were rejected to Sentry
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": w.roomID,
"event_id": inputRoomEvent.Event.EventID(),
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver rejected event")
default:
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
}
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": w.roomID,
"event_id": inputRoomEvent.Event.EventID(),
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process event")
}
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": w.roomID,
"event_id": inputRoomEvent.Event.EventID(),
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process async event")
_ = msg.Term()
errString = err.Error()
} else {

View file

@ -17,8 +17,8 @@
package input
import (
"bytes"
"context"
"database/sql"
"fmt"
"time"
@ -107,28 +107,6 @@ func (r *Inputer) processRoomEvent(
})
}
// if we have already got this event then do not process it again, if the input kind is an outlier.
// Outliers contain no extra information which may warrant a re-processing.
if input.Kind == api.KindOutlier {
evs, err2 := r.DB.EventsFromIDs(ctx, []string{event.EventID()})
if err2 == nil && len(evs) == 1 {
// check hash matches if we're on early room versions where the event ID was a random string
idFormat, err2 := headered.RoomVersion.EventIDFormat()
if err2 == nil {
switch idFormat {
case gomatrixserverlib.EventIDFormatV1:
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
logger.Debugf("Already processed event; ignoring")
return nil
}
default:
logger.Debugf("Already processed event; ignoring")
return nil
}
}
}
}
// Don't waste time processing the event if the room doesn't exist.
// A room entry locally will only be created in response to a create
// event.
@ -141,6 +119,29 @@ func (r *Inputer) processRoomEvent(
return fmt.Errorf("room %s does not exist for event %s", event.RoomID(), event.EventID())
}
// If we already know about this outlier and it hasn't been rejected
// then we won't attempt to reprocess it. If it was rejected or has now
// arrived as a different kind of event, then we can attempt to reprocess,
// in case we have learned something new or need to weave the event into
// the DAG now.
if input.Kind == api.KindOutlier && roomInfo != nil {
wasRejected, werr := r.DB.IsEventRejected(ctx, roomInfo.RoomNID, event.EventID())
switch {
case werr == sql.ErrNoRows:
// We haven't seen this event before so continue.
case werr != nil:
// Something has gone wrong trying to find out if we rejected
// this event already.
logger.WithError(werr).Errorf("Failed to check if event %q is already seen", event.EventID())
return werr
case !wasRejected:
// We've seen this event before and it wasn't rejected so we
// should ignore it.
logger.Debugf("Already processed event %q, ignoring", event.EventID())
return nil
}
}
var missingAuth, missingPrev bool
serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{}
if !isCreateEvent {
@ -300,7 +301,7 @@ func (r *Inputer) processRoomEvent(
// bother doing this if the event was already rejected as it just ends up
// burning CPU time.
historyVisibility := gomatrixserverlib.HistoryVisibilityShared // Default to shared.
if rejectionErr == nil && !isRejected && !softfail {
if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected {
var err error
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev)
if err != nil {
@ -312,7 +313,7 @@ func (r *Inputer) processRoomEvent(
}
// Store the event.
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected || softfail)
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
if err != nil {
return fmt.Errorf("updater.StoreEvent: %w", err)
}
@ -352,12 +353,18 @@ func (r *Inputer) processRoomEvent(
}
}
// We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it.
if isRejected || softfail {
logger.WithError(rejectionErr).WithFields(logrus.Fields{
"soft_fail": softfail,
"missing_prev": missingPrev,
}).Warn("Stored rejected event")
// We stop here if the event is rejected: We've stored it but won't update
// forward extremities or notify downstream components about it.
switch {
case isRejected:
logger.WithError(rejectionErr).Warn("Stored rejected event")
if rejectionErr != nil {
return types.RejectedError(rejectionErr.Error())
}
return nil
case softfail:
logger.WithError(rejectionErr).Warn("Stored soft-failed event")
if rejectionErr != nil {
return types.RejectedError(rejectionErr.Error())
}
@ -660,7 +667,7 @@ func (r *Inputer) calculateAndSetState(
// We've been told what the state at the event is so we don't need to calculate it.
// Check that those state events are in the database and store the state.
var entries []types.StateEntry
if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs, true); err != nil {
return fmt.Errorf("updater.StateEntriesForEventIDs: %w", err)
}
entries = types.DeduplicateStateEntries(entries)

View file

@ -18,7 +18,6 @@ import (
"context"
"fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
@ -140,11 +139,11 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
continue
}
var entries []types.StateEntry
if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil {
if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs, true); err != nil {
// attempt to fetch the missing events
r.fetchAndStoreMissingEvents(ctx, info.RoomVersion, requester, stateIDs)
// try again
entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs)
entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs, true)
if err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event")
return err
@ -164,6 +163,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
// TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point.
res.Events = events
res.HistoryVisibility = requester.historyVisiblity
return nil
}
@ -248,6 +248,7 @@ type backfillRequester struct {
servers []gomatrixserverlib.ServerName
eventIDToBeforeStateIDs map[string][]string
eventIDMap map[string]*gomatrixserverlib.Event
historyVisiblity gomatrixserverlib.HistoryVisibility
}
func newBackfillRequester(
@ -266,6 +267,7 @@ func newBackfillRequester(
eventIDMap: make(map[string]*gomatrixserverlib.Event),
bwExtrems: bwExtrems,
preferServer: preferServer,
historyVisiblity: gomatrixserverlib.HistoryVisibilityShared,
}
}
@ -317,7 +319,6 @@ FederationHit:
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res
return res, nil
}
sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit
return nil, lastErr
}
@ -395,7 +396,6 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr
}
return result, nil
}
sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit
return nil, lastErr
}
@ -447,7 +447,8 @@ FindSuccessor:
}
// possibly return all joined servers depending on history visiblity
memberEventsFromVis, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.thisServer)
memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.thisServer)
b.historyVisiblity = visibility
if err != nil {
logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules")
return nil
@ -528,7 +529,7 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion,
// pull all events and then filter by that table.
func joinEventsFromHistoryVisibility(
ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry,
thisServer gomatrixserverlib.ServerName) ([]types.Event, error) {
thisServer gomatrixserverlib.ServerName) ([]types.Event, gomatrixserverlib.HistoryVisibility, error) {
var eventNIDs []types.EventNID
for _, entry := range stateEntries {
@ -542,7 +543,9 @@ func joinEventsFromHistoryVisibility(
// Get all of the events in this state
stateEvents, err := db.Events(ctx, eventNIDs)
if err != nil {
return nil, err
// even though the default should be shared, restricting the visibility to joined
// feels more secure here.
return nil, gomatrixserverlib.HistoryVisibilityJoined, err
}
events := make([]*gomatrixserverlib.Event, len(stateEvents))
for i := range stateEvents {
@ -551,20 +554,22 @@ func joinEventsFromHistoryVisibility(
// Can we see events in the room?
canSeeEvents := auth.IsServerAllowed(thisServer, true, events)
visibility := gomatrixserverlib.HistoryVisibility(auth.HistoryVisibilityForRoom(events))
if !canSeeEvents {
logrus.Infof("ServersAtEvent history not visible to us: %s", auth.HistoryVisibilityForRoom(events))
return nil, nil
logrus.Infof("ServersAtEvent history not visible to us: %s", visibility)
return nil, visibility, nil
}
// get joined members
info, err := db.RoomInfo(ctx, roomID)
if err != nil {
return nil, err
return nil, visibility, nil
}
joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false)
if err != nil {
return nil, err
return nil, visibility, err
}
return db.Events(ctx, joinEventNIDs)
evs, err := db.Events(ctx, joinEventNIDs)
return evs, visibility, err
}
func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {

View file

@ -72,13 +72,10 @@ func (r *Queryer) QueryStateAfterEvents(
prevStates, err := r.DB.StateAtEventIDs(ctx, request.PrevEventIDs)
if err != nil {
switch err.(type) {
case types.MissingEventError:
util.GetLogger(ctx).Errorf("QueryStateAfterEvents: MissingEventError: %s", err)
if _, ok := err.(types.MissingEventError); ok {
return nil
default:
return err
}
return err
}
response.PrevEventsExist = true
@ -95,6 +92,12 @@ func (r *Queryer) QueryStateAfterEvents(
)
}
if err != nil {
if _, ok := err.(types.MissingEventError); ok {
return nil
}
if _, ok := err.(types.MissingStateError); ok {
return nil
}
return err
}
@ -500,10 +503,11 @@ func (r *Queryer) QueryStateAndAuthChain(
}
var stateEvents []*gomatrixserverlib.Event
stateEvents, rejected, err := r.loadStateAtEventIDs(ctx, info, request.PrevEventIDs)
stateEvents, rejected, stateMissing, err := r.loadStateAtEventIDs(ctx, info, request.PrevEventIDs)
if err != nil {
return err
}
response.StateKnown = !stateMissing
response.IsRejected = rejected
response.PrevEventsExist = true
@ -539,15 +543,18 @@ func (r *Queryer) QueryStateAndAuthChain(
return err
}
func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo *types.RoomInfo, eventIDs []string) ([]*gomatrixserverlib.Event, bool, error) {
// first bool: is rejected, second bool: state missing
func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo *types.RoomInfo, eventIDs []string) ([]*gomatrixserverlib.Event, bool, bool, error) {
roomState := state.NewStateResolution(r.DB, roomInfo)
prevStates, err := r.DB.StateAtEventIDs(ctx, eventIDs)
if err != nil {
switch err.(type) {
case types.MissingEventError:
return nil, false, nil
return nil, false, true, nil
case types.MissingStateError:
return nil, false, true, nil
default:
return nil, false, err
return nil, false, false, err
}
}
// Currently only used on /state and /state_ids
@ -564,12 +571,11 @@ func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo *types.RoomI
ctx, prevStates,
)
if err != nil {
return nil, rejected, err
return nil, rejected, false, err
}
events, err := helpers.LoadStateEvents(ctx, r.DB, stateEntries)
return events, rejected, err
return events, rejected, false, err
}
type eventsFromIDs func(context.Context, []string) ([]types.Event, error)

View file

@ -79,7 +79,7 @@ type Database interface {
// Look up the state entries for a list of string event IDs
// Returns an error if the there is an error talking to the database
// Returns a types.MissingEventError if the event IDs aren't in the database.
StateEntriesForEventIDs(ctx context.Context, eventIDs []string) ([]types.StateEntry, error)
StateEntriesForEventIDs(ctx context.Context, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error)
// Look up the string event state keys for a list of numeric event state keys
// Returns an error if there was a problem talking to the database.
EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error)
@ -94,6 +94,8 @@ type Database interface {
// Opens and returns a room updater, which locks the room and opens a transaction.
// The GetRoomUpdater must have Commit or Rollback called on it if this doesn't return an error.
// If this returns an error then no further action is required.
// IsEventRejected returns true if the event is known and rejected.
IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (rejected bool, err error)
GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error)
// Look up event references for the latest events in the room and the current state snapshot.
// Returns the latest events, the current state and the maximum depth of the latest events plus 1.

View file

@ -74,7 +74,7 @@ const insertEventSQL = "" +
"INSERT INTO roomserver_events AS e (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" +
" ON CONFLICT ON CONSTRAINT roomserver_event_id_unique DO UPDATE" +
" SET is_rejected = $8 WHERE e.event_id = $4 AND e.is_rejected = FALSE" +
" SET is_rejected = $8 WHERE e.event_id = $4 AND e.is_rejected = TRUE" +
" RETURNING event_nid, state_snapshot_nid"
const selectEventSQL = "" +
@ -88,6 +88,14 @@ const bulkSelectStateEventByIDSQL = "" +
" WHERE event_id = ANY($1)" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
// Bulk lookup of events by string ID that aren't excluded.
// Sort by the numeric IDs for event type and state key.
// This means we can use binary search to lookup entries by type and state key.
const bulkSelectStateEventByIDExcludingRejectedSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
" WHERE event_id = ANY($1) AND is_rejected = FALSE" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
// Bulk look up of events by event NID, optionally filtering by the event type
// or event state key NIDs if provided. (The CARDINALITY check will return true
// if the provided arrays are empty, ergo no filtering).
@ -136,23 +144,28 @@ const selectMaxEventDepthSQL = "" +
const selectRoomNIDsForEventNIDsSQL = "" +
"SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid = ANY($1)"
const selectEventRejectedSQL = "" +
"SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2"
type eventStatements struct {
insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt
bulkSelectStateEventByNIDStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt
updateEventStateStmt *sql.Stmt
selectEventSentToOutputStmt *sql.Stmt
updateEventSentToOutputStmt *sql.Stmt
selectEventIDStmt *sql.Stmt
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
bulkSelectEventReferenceStmt *sql.Stmt
bulkSelectEventIDStmt *sql.Stmt
bulkSelectEventNIDStmt *sql.Stmt
bulkSelectUnsentEventNIDStmt *sql.Stmt
selectMaxEventDepthStmt *sql.Stmt
selectRoomNIDsForEventNIDsStmt *sql.Stmt
insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt
bulkSelectStateEventByIDExcludingRejectedStmt *sql.Stmt
bulkSelectStateEventByNIDStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt
updateEventStateStmt *sql.Stmt
selectEventSentToOutputStmt *sql.Stmt
updateEventSentToOutputStmt *sql.Stmt
selectEventIDStmt *sql.Stmt
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
bulkSelectEventReferenceStmt *sql.Stmt
bulkSelectEventIDStmt *sql.Stmt
bulkSelectEventNIDStmt *sql.Stmt
bulkSelectUnsentEventNIDStmt *sql.Stmt
selectMaxEventDepthStmt *sql.Stmt
selectRoomNIDsForEventNIDsStmt *sql.Stmt
selectEventRejectedStmt *sql.Stmt
}
func CreateEventsTable(db *sql.DB) error {
@ -167,6 +180,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
{&s.insertEventStmt, insertEventSQL},
{&s.selectEventStmt, selectEventSQL},
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
{&s.bulkSelectStateEventByIDExcludingRejectedStmt, bulkSelectStateEventByIDExcludingRejectedSQL},
{&s.bulkSelectStateEventByNIDStmt, bulkSelectStateEventByNIDSQL},
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
{&s.updateEventStateStmt, updateEventStateSQL},
@ -180,6 +194,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
{&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL},
{&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL},
{&s.selectRoomNIDsForEventNIDsStmt, selectRoomNIDsForEventNIDsSQL},
{&s.selectEventRejectedStmt, selectEventRejectedSQL},
}.Prepare(db)
}
@ -216,11 +231,18 @@ func (s *eventStatements) SelectEvent(
}
// bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError
// If not excluding rejected events, and any of the requested events are missing from
// the database it returns a types.MissingEventError. If excluding rejected events,
// the events will be silently omitted without error.
func (s *eventStatements) BulkSelectStateEventByID(
ctx context.Context, txn *sql.Tx, eventIDs []string,
ctx context.Context, txn *sql.Tx, eventIDs []string, excludeRejected bool,
) ([]types.StateEntry, error) {
stmt := sqlutil.TxStmt(txn, s.bulkSelectStateEventByIDStmt)
var stmt *sql.Stmt
if excludeRejected {
stmt = sqlutil.TxStmt(txn, s.bulkSelectStateEventByIDExcludingRejectedStmt)
} else {
stmt = sqlutil.TxStmt(txn, s.bulkSelectStateEventByIDStmt)
}
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
if err != nil {
return nil, err
@ -230,10 +252,10 @@ func (s *eventStatements) BulkSelectStateEventByID(
// because of the unique constraint on event IDs.
// So we can allocate an array of the correct size now.
// We might get fewer results than IDs so we adjust the length of the slice before returning it.
results := make([]types.StateEntry, len(eventIDs))
results := make([]types.StateEntry, 0, len(eventIDs))
i := 0
for ; rows.Next(); i++ {
result := &results[i]
var result types.StateEntry
if err = rows.Scan(
&result.EventTypeNID,
&result.EventStateKeyNID,
@ -241,11 +263,12 @@ func (s *eventStatements) BulkSelectStateEventByID(
); err != nil {
return nil, err
}
results = append(results, result)
}
if err = rows.Err(); err != nil {
return nil, err
}
if i != len(eventIDs) {
if !excludeRejected && i != len(eventIDs) {
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
// We don't know which ones were missing because we don't return the string IDs in the query.
// However it should be possible debug this by replaying queries or entries from the input kafka logs.
@ -323,7 +346,7 @@ func (s *eventStatements) BulkSelectStateAtEventByID(
// Genuine create events are the only case where it's OK to have no previous state.
isCreate := result.EventTypeNID == types.MRoomCreateNID && result.EventStateKeyNID == 1
if result.BeforeStateSnapshotNID == 0 && !isCreate {
return nil, types.MissingEventError(
return nil, types.MissingStateError(
fmt.Sprintf("storage: missing state for event NID %d", result.EventNID),
)
}
@ -540,3 +563,11 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array {
}
return nids
}
func (s *eventStatements) SelectEventRejected(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string,
) (rejected bool, err error) {
stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt)
err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected)
return
}

View file

@ -113,9 +113,9 @@ func (d *Database) eventStateKeyNIDs(
}
func (d *Database) StateEntriesForEventIDs(
ctx context.Context, eventIDs []string,
ctx context.Context, eventIDs []string, excludeRejected bool,
) ([]types.StateEntry, error) {
return d.EventsTable.BulkSelectStateEventByID(ctx, nil, eventIDs)
return d.EventsTable.BulkSelectStateEventByID(ctx, nil, eventIDs, excludeRejected)
}
func (d *Database) StateEntriesForTuples(
@ -567,6 +567,10 @@ func (d *Database) GetRoomUpdater(
return updater, err
}
func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (bool, error) {
return d.EventsTable.SelectEventRejected(ctx, nil, roomNID, eventID)
}
func (d *Database) StoreEvent(
ctx context.Context, event *gomatrixserverlib.Event,
authEventNIDs []types.EventNID, isRejected bool,

View file

@ -50,7 +50,7 @@ const insertEventSQL = `
INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT DO UPDATE
SET is_rejected = $8 WHERE is_rejected = 0
SET is_rejected = $8 WHERE is_rejected = 1
RETURNING event_nid, state_snapshot_nid;
`
@ -65,6 +65,14 @@ const bulkSelectStateEventByIDSQL = "" +
" WHERE event_id IN ($1)" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
// Bulk lookup of events by string ID that aren't rejected.
// Sort by the numeric IDs for event type and state key.
// This means we can use binary search to lookup entries by type and state key.
const bulkSelectStateEventByIDExcludingRejectedSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
" WHERE event_id IN ($1) AND is_rejected = 0" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
const bulkSelectStateEventByNIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
" WHERE event_nid IN ($1)"
@ -109,19 +117,24 @@ const selectMaxEventDepthSQL = "" +
const selectRoomNIDsForEventNIDsSQL = "" +
"SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)"
const selectEventRejectedSQL = "" +
"SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2"
type eventStatements struct {
db *sql.DB
insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt
updateEventStateStmt *sql.Stmt
selectEventSentToOutputStmt *sql.Stmt
updateEventSentToOutputStmt *sql.Stmt
selectEventIDStmt *sql.Stmt
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
bulkSelectEventReferenceStmt *sql.Stmt
bulkSelectEventIDStmt *sql.Stmt
db *sql.DB
insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt
bulkSelectStateEventByIDExcludingRejectedStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt
updateEventStateStmt *sql.Stmt
selectEventSentToOutputStmt *sql.Stmt
updateEventSentToOutputStmt *sql.Stmt
selectEventIDStmt *sql.Stmt
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
bulkSelectEventReferenceStmt *sql.Stmt
bulkSelectEventIDStmt *sql.Stmt
selectEventRejectedStmt *sql.Stmt
//bulkSelectEventNIDStmt *sql.Stmt
//bulkSelectUnsentEventNIDStmt *sql.Stmt
//selectRoomNIDsForEventNIDsStmt *sql.Stmt
@ -141,6 +154,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
{&s.insertEventStmt, insertEventSQL},
{&s.selectEventStmt, selectEventSQL},
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
{&s.bulkSelectStateEventByIDExcludingRejectedStmt, bulkSelectStateEventByIDExcludingRejectedSQL},
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
{&s.updateEventStateStmt, updateEventStateSQL},
{&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL},
@ -152,6 +166,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
//{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL},
//{&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL},
//{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL},
{&s.selectEventRejectedStmt, selectEventRejectedSQL},
}.Prepare(db)
}
@ -189,16 +204,24 @@ func (s *eventStatements) SelectEvent(
}
// bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError
// If not excluding rejected events, and any of the requested events are missing from
// the database it returns a types.MissingEventError. If excluding rejected events,
// the events will be silently omitted without error.
func (s *eventStatements) BulkSelectStateEventByID(
ctx context.Context, txn *sql.Tx, eventIDs []string,
ctx context.Context, txn *sql.Tx, eventIDs []string, excludeRejected bool,
) ([]types.StateEntry, error) {
///////////////
var sql string
if excludeRejected {
sql = bulkSelectStateEventByIDExcludingRejectedSQL
} else {
sql = bulkSelectStateEventByIDSQL
}
iEventIDs := make([]interface{}, len(eventIDs))
for k, v := range eventIDs {
iEventIDs[k] = v
}
selectOrig := strings.Replace(bulkSelectStateEventByIDSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
selectOrig := strings.Replace(sql, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
selectPrep, err := s.db.Prepare(selectOrig)
if err != nil {
return nil, err
@ -216,10 +239,10 @@ func (s *eventStatements) BulkSelectStateEventByID(
// because of the unique constraint on event IDs.
// So we can allocate an array of the correct size now.
// We might get fewer results than IDs so we adjust the length of the slice before returning it.
results := make([]types.StateEntry, len(eventIDs))
results := make([]types.StateEntry, 0, len(eventIDs))
i := 0
for ; rows.Next(); i++ {
result := &results[i]
var result types.StateEntry
if err = rows.Scan(
&result.EventTypeNID,
&result.EventStateKeyNID,
@ -227,8 +250,9 @@ func (s *eventStatements) BulkSelectStateEventByID(
); err != nil {
return nil, err
}
results = append(results, result)
}
if i != len(eventIDs) {
if !excludeRejected && i != len(eventIDs) {
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
// We don't know which ones were missing because we don't return the string IDs in the query.
// However it should be possible debug this by replaying queries or entries from the input kafka logs.
@ -338,7 +362,7 @@ func (s *eventStatements) BulkSelectStateAtEventByID(
// Genuine create events are the only case where it's OK to have no previous state.
isCreate := result.EventTypeNID == types.MRoomCreateNID && result.EventStateKeyNID == 1
if result.BeforeStateSnapshotNID == 0 && !isCreate {
return nil, types.MissingEventError(
return nil, types.MissingStateError(
fmt.Sprintf("storage: missing state for event NID %d", result.EventNID),
)
}
@ -614,3 +638,11 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) string {
b, _ := json.Marshal(eventNIDs)
return string(b)
}
func (s *eventStatements) SelectEventRejected(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string,
) (rejected bool, err error) {
stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt)
err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected)
return
}

View file

@ -102,7 +102,7 @@ func Test_EventsTable(t *testing.T) {
})
}
stateEvents, err := tab.BulkSelectStateEventByID(ctx, nil, eventIDs)
stateEvents, err := tab.BulkSelectStateEventByID(ctx, nil, eventIDs, false)
assert.NoError(t, err)
assert.Equal(t, len(stateEvents), len(eventIDs))
nids := make([]types.EventNID, 0, len(stateEvents))

View file

@ -46,7 +46,7 @@ type Events interface {
SelectEvent(ctx context.Context, txn *sql.Tx, eventID string) (types.EventNID, types.StateSnapshotNID, error)
// bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError
BulkSelectStateEventByID(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StateEntry, error)
BulkSelectStateEventByID(ctx context.Context, txn *sql.Tx, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error)
BulkSelectStateEventByNID(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntry, error)
// BulkSelectStateAtEventByID lookups the state at a list of events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError.
@ -66,6 +66,7 @@ type Events interface {
BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error)
SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error)
SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error)
SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error)
}
type Rooms interface {

View file

@ -25,21 +25,23 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@ -47,6 +49,8 @@ import (
"github.com/gorilla/mux"
"github.com/kardianos/minwinsvc"
"github.com/sirupsen/logrus"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
asinthttp "github.com/matrix-org/dendrite/appservice/inthttp"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
@ -58,7 +62,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
userapi "github.com/matrix-org/dendrite/userapi/api"
userapiinthttp "github.com/matrix-org/dendrite/userapi/inthttp"
"github.com/sirupsen/logrus"
)
// BaseDendrite is a base for creating new instances of dendrite. It parses
@ -87,6 +90,7 @@ type BaseDendrite struct {
Database *sql.DB
DatabaseWriter sqlutil.Writer
EnableMetrics bool
startupLock sync.Mutex
}
const NoListener = ""
@ -394,6 +398,9 @@ func (b *BaseDendrite) SetupAndServeHTTP(
internalHTTPAddr, externalHTTPAddr config.HTTPAddress,
certFile, keyFile *string,
) {
// Manually unlocked right before actually serving requests,
// as we don't return from this method (defer doesn't work).
b.startupLock.Lock()
internalAddr, _ := internalHTTPAddr.Address()
externalAddr, _ := externalHTTPAddr.Address()
@ -472,6 +479,7 @@ func (b *BaseDendrite) SetupAndServeHTTP(
externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.PublicMediaAPIMux)
externalRouter.PathPrefix(httputil.PublicWellKnownPrefix).Handler(b.PublicWellKnownAPIMux)
b.startupLock.Unlock()
if internalAddr != NoListener && internalAddr != externalAddr {
go func() {
var internalShutdown atomic.Bool // RegisterOnShutdown can be called more than once

View file

@ -18,14 +18,16 @@ import (
"context"
"strings"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
keytypes "github.com/matrix-org/dendrite/keyserver/types"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
// DeviceOTKCounts adds one-time key counts to the /sync response
@ -125,7 +127,7 @@ func DeviceListCatchup(
"from": offset,
"to": toOffset,
"response_offset": queryRes.Offset,
}).Debugf("QueryKeyChanges request result: %+v", res.DeviceLists)
}).Tracef("QueryKeyChanges request result: %+v", res.DeviceLists)
return types.StreamPosition(queryRes.Offset), hasNew, nil
}
@ -277,6 +279,10 @@ func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID strin
// it's enough to know that we have our member event here, don't need to check membership content
// as it's implied by being in the respective section of the sync response.
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
// ignore e.g. join -> join changes
if gjson.GetBytes(ev.Unsigned, "prev_content.membership").Str == gjson.GetBytes(ev.Content, "membership").Str {
continue
}
return true
}
}

View file

@ -350,8 +350,10 @@ func (r *messagesReq) retrieveEvents() (
startTime := time.Now()
filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, r.rsAPI, events, nil, r.device.UserID, "messages")
logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime),
"room_id": r.roomID,
"duration": time.Since(startTime),
"room_id": r.roomID,
"events_before": len(events),
"events_after": len(filteredEvents),
}).Debug("applied history visibility (messages)")
return gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), start, end, err
}
@ -513,6 +515,9 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
// Store the events in the database, while marking them as unfit to show
// up in responses to sync requests.
if res.HistoryVisibility == "" {
res.HistoryVisibility = gomatrixserverlib.HistoryVisibilityShared
}
for i := range res.Events {
_, err = r.db.WriteEvent(
context.Background(),
@ -521,7 +526,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
[]string{},
[]string{},
nil, true,
gomatrixserverlib.HistoryVisibilityShared,
res.HistoryVisibility,
)
if err != nil {
return nil, err
@ -534,6 +539,9 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
// last `limit` events
events = events[len(events)-limit:]
}
for _, ev := range events {
ev.Visibility = res.HistoryVisibility
}
return events, nil
}

View file

@ -19,10 +19,11 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {

View file

@ -279,8 +279,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
log.WithFields(log.Fields{
"since": r.From,
"current": r.To,
"adds": addIDs,
"dels": delIDs,
"adds": len(addIDs),
"dels": len(delIDs),
}).Warn("StateBetween: ignoring deleted state")
}

View file

@ -41,6 +41,8 @@ CREATE TABLE IF NOT EXISTS syncapi_send_to_device (
-- The event content JSON.
content TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS syncapi_send_to_device_user_id_device_id_idx ON syncapi_send_to_device(user_id, device_id);
`
const insertSendToDeviceMessageSQL = `

View file

@ -20,15 +20,18 @@ import (
"encoding/json"
"fmt"
"github.com/tidwall/gjson"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
// Database is a temporary struct until we have made syncserver.go the same for both pq/sqlite
@ -683,7 +686,7 @@ func (d *Database) GetStateDeltas(
ctx context.Context, device *userapi.Device,
r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]types.StateDelta, []string, error) {
) (deltas []types.StateDelta, joinedRoomsIDs []string, err error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response
// - For each room which has membership list changes:
@ -718,8 +721,6 @@ func (d *Database) GetStateDeltas(
}
}
var deltas []types.StateDelta
// get all the state events ever (i.e. for all available rooms) between these two positions
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter, allRoomIDs)
if err != nil {
@ -767,15 +768,11 @@ func (d *Database) GetStateDeltas(
}
// handle newly joined rooms and non-joined rooms
newlyJoinedRooms := make(map[string]bool, len(state))
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
// We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
// dupe join events will result in the entire room state coming down to the client again. This is added in
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
// the timeline.
if membership := getMembershipFromEvent(ev.Event, userID); membership != "" {
if membership == gomatrixserverlib.Join {
if membership, prevMembership := getMembershipFromEvent(ev.Event, userID); membership != "" {
if membership == gomatrixserverlib.Join && prevMembership != membership {
// send full room state down instead of a delta
var s []types.StreamEvent
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter)
@ -786,6 +783,7 @@ func (d *Database) GetStateDeltas(
return nil, nil, err
}
state[roomID] = s
newlyJoinedRooms[roomID] = true
continue // we'll add this room in when we do joined rooms
}
@ -806,6 +804,7 @@ func (d *Database) GetStateDeltas(
Membership: gomatrixserverlib.Join,
StateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
RoomID: joinedRoomID,
NewlyJoined: newlyJoinedRooms[joinedRoomID],
})
}
@ -892,7 +891,7 @@ func (d *Database) GetStateDeltasForFullStateSync(
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
if membership := getMembershipFromEvent(ev.Event, userID); membership != "" {
if membership, _ := getMembershipFromEvent(ev.Event, userID); membership != "" {
if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
deltas[roomID] = types.StateDelta{
Membership: membership,
@ -1003,15 +1002,16 @@ func (d *Database) CleanSendToDeviceUpdates(
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) (string, string) {
if ev.Type() != "m.room.member" || !ev.StateKeyEquals(userID) {
return ""
return "", ""
}
membership, err := ev.Membership()
if err != nil {
return ""
return "", ""
}
return membership
prevMembership := gjson.GetBytes(ev.Unsigned(), "prev_content.membership").Str
return membership, prevMembership
}
// StoreReceipt stores user receipts

View file

@ -234,8 +234,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
log.WithFields(log.Fields{
"since": r.From,
"current": r.To,
"adds": addIDsJSON,
"dels": delIDsJSON,
"adds": len(addIDsJSON),
"dels": len(delIDsJSON),
}).Warn("StateBetween: ignoring deleted state")
}

View file

@ -39,6 +39,8 @@ CREATE TABLE IF NOT EXISTS syncapi_send_to_device (
-- The event content JSON.
content TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS syncapi_send_to_device_user_id_device_id_idx ON syncapi_send_to_device(user_id, device_id);
`
const insertSendToDeviceMessageSQL = `

View file

@ -7,8 +7,9 @@ import (
"strconv"
"time"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/syncapi/types"
)
type InviteStreamProvider struct {
@ -62,6 +63,11 @@ func (p *InviteStreamProvider) IncrementalSync(
req.Response.Rooms.Invite[roomID] = *ir
}
// When doing an initial sync, we don't want to add retired invites, as this
// can add rooms we were invited to, but already left.
if from == 0 {
return to
}
for roomID := range retiredInvites {
if _, ok := req.Response.Rooms.Join[roomID]; !ok {
lr := types.NewLeaveResponse()

View file

@ -178,24 +178,24 @@ func (p *PDUStreamProvider) IncrementalSync(
var err error
var stateDeltas []types.StateDelta
var joinedRooms []string
var syncJoinedRooms []string
stateFilter := req.Filter.Room.State
eventFilter := req.Filter.Room.Timeline
if req.WantFullState {
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
if stateDeltas, syncJoinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed")
return
}
} else {
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
if stateDeltas, syncJoinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
return
}
}
for _, roomID := range joinedRooms {
for _, roomID := range syncJoinedRooms {
req.Rooms[roomID] = gomatrixserverlib.Join
}
@ -209,11 +209,27 @@ func (p *PDUStreamProvider) IncrementalSync(
newPos = from
for _, delta := range stateDeltas {
newRange := r
// If this room was joined in this sync, try to fetch
// as much timeline events as allowed by the filter.
if delta.NewlyJoined {
// Reverse the range, so we get the most recent first.
// This will be limited by the eventFilter.
newRange = types.Range{
From: r.To,
To: 0,
Backwards: true,
}
}
var pos types.StreamPosition
if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil {
if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return to
}
// Reset the position, as it is only for the special case of newly joined rooms
if delta.NewlyJoined {
pos = newRange.From
}
switch {
case r.Backwards && pos < newPos:
fallthrough
@ -287,7 +303,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
if stateFilter.LazyLoadMembers {
delta.StateEvents, err = p.lazyLoadMembers(
ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers,
ctx, delta.RoomID, true, limited, stateFilter,
device, recentEvents, delta.StateEvents,
)
if err != nil && err != sql.ErrNoRows {
@ -309,12 +325,12 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
logrus.WithError(err).Error("unable to apply history visibility filter")
}
if len(events) > 0 {
updateLatestPosition(events[len(events)-1].EventID())
}
if len(delta.StateEvents) > 0 {
updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
}
if len(events) > 0 {
updateLatestPosition(events[len(events)-1].EventID())
}
switch delta.Membership {
case gomatrixserverlib.Join:
@ -387,6 +403,8 @@ func applyHistoryVisibilityFilter(
logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime),
"room_id": roomID,
"before": len(recentEvents),
"after": len(events),
}).Debug("applied history visibility (sync)")
return events, nil
}
@ -514,7 +532,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
return nil, err
}
stateEvents, err = p.lazyLoadMembers(ctx, roomID,
false, limited, stateFilter.IncludeRedundantMembers,
false, limited, stateFilter,
device, recentEvents, stateEvents,
)
if err != nil && err != sql.ErrNoRows {
@ -533,7 +551,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
func (p *PDUStreamProvider) lazyLoadMembers(
ctx context.Context, roomID string,
incremental, limited, includeRedundant bool,
incremental, limited bool, stateFilter *gomatrixserverlib.StateFilter,
device *userapi.Device,
timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
@ -560,14 +578,13 @@ func (p *PDUStreamProvider) lazyLoadMembers(
// If this is a gapped incremental sync, we still want this membership
isGappedIncremental := limited && incremental
// We want this users membership event, keep it in the list
_, ok := timelineUsers[event.Sender()]
wantMembership := ok || isGappedIncremental
if wantMembership {
stateKey := *event.StateKey()
if _, ok := timelineUsers[stateKey]; ok || isGappedIncremental {
newStateEvents = append(newStateEvents, event)
if !includeRedundant {
p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, event.Sender(), event.EventID())
if !stateFilter.IncludeRedundantMembers {
p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, stateKey, event.EventID())
}
delete(timelineUsers, event.Sender())
delete(timelineUsers, stateKey)
}
} else {
newStateEvents = append(newStateEvents, event)
@ -578,17 +595,17 @@ func (p *PDUStreamProvider) lazyLoadMembers(
wantUsers = append(wantUsers, userID)
}
// Query missing membership events
memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{
Limit: 100,
Senders: &wantUsers,
Types: &[]string{gomatrixserverlib.MRoomMember},
})
filter := gomatrixserverlib.DefaultStateFilter()
filter.Limit = stateFilter.Limit
filter.Senders = &wantUsers
filter.Types = &[]string{gomatrixserverlib.MRoomMember}
memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &filter)
if err != nil {
return stateEvents, err
}
// cache the membership events
for _, membership := range memberships {
p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, membership.Sender(), membership.EventID())
p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, *membership.StateKey(), membership.EventID())
}
stateEvents = append(newStateEvents, memberships...)
return stateEvents, nil

View file

@ -19,9 +19,11 @@ import (
"encoding/json"
"sync"
"github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type PresenceStreamProvider struct {
@ -175,6 +177,10 @@ func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID strin
// it's enough to know that we have our member event here, don't need to check membership content
// as it's implied by being in the respective section of the sync response.
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
// ignore e.g. join -> join changes
if gjson.GetBytes(ev.Unsigned, "prev_content.membership").Str == gjson.GetBytes(ev.Content, "membership").Str {
continue
}
return true
}
}

View file

@ -23,12 +23,13 @@ import (
"strconv"
"time"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
const defaultSyncTimeout = time.Duration(0)
@ -46,15 +47,9 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
return nil, err
}
}
// TODO: read from stored filters too
// Create a default filter and apply a stored filter on top of it (if specified)
filter := gomatrixserverlib.DefaultFilter()
if since.IsEmpty() {
// Send as much account data down for complete syncs as possible
// by default, otherwise clients do weird things while waiting
// for the rest of the data to trickle down.
filter.AccountData.Limit = math.MaxInt32
filter.Room.AccountData.Limit = math.MaxInt32
}
filterQuery := req.URL.Query().Get("filter")
if filterQuery != "" {
if filterQuery[0] == '{' {
@ -76,6 +71,17 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
}
}
// A loaded filter might have overwritten these values,
// so set them after loading the filter.
if since.IsEmpty() {
// Send as much account data down for complete syncs as possible
// by default, otherwise clients do weird things while waiting
// for the rest of the data to trickle down.
filter.AccountData.Limit = math.MaxInt32
filter.Room.AccountData.Limit = math.MaxInt32
filter.Room.State.Limit = math.MaxInt32
}
logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{
"user_id": device.UserID,
"device_id": device.ID,

View file

@ -298,8 +298,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
return giveup()
case <-userStreamListener.GetNotifyChannel(syncReq.Since):
syncReq.Log.Debugln("Responding to sync after wake-up")
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync after wake-up")
}
} else {
syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")

View file

@ -10,6 +10,10 @@ import (
"testing"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/clientapi/producers"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver"
@ -21,9 +25,6 @@ import (
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
)
type syncRoomserverAPI struct {
@ -153,8 +154,12 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
wantJoinedRooms: []string{room.ID},
},
}
// TODO: find a better way
time.Sleep(500 * time.Millisecond)
syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists()
})
for _, tc := range testCases {
w := httptest.NewRecorder()
@ -190,6 +195,7 @@ func TestSyncAPICreateRoomSyncEarly(t *testing.T) {
}
func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
t.Skip("Skipped, possibly fixed")
user := test.NewUser(t)
room := test.NewRoom(t, user)
alice := userapi.Device{
@ -342,6 +348,13 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// create the users
alice := test.NewUser(t)
aliceDev := userapi.Device{
ID: "ALICEID",
UserID: alice.ID,
AccessToken: "ALICE_BEARER_TOKEN",
DisplayName: "ALICE",
}
bob := test.NewUser(t)
bobDev := userapi.Device{
@ -408,7 +421,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
rsAPI := roomserver.NewInternalAPI(base)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{bobDev}}, rsAPI, &syncKeyAPI{})
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, &syncKeyAPI{})
for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@ -417,11 +430,18 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility))
// send the events/messages to NATS to create the rooms
beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)})
beforeJoinBody := fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)
beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": beforeJoinBody})
eventsToSend := append(room.Events(), beforeJoinEv)
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
syncUntil(t, base, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
return gjson.Get(syncBody, path).Exists()
},
)
// There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest.NewRecorder()
@ -447,13 +467,20 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID))
afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)})
joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID))
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After join in a %s room", tc.historyVisibility)})
afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility)
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody})
eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
syncUntil(t, base, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
return gjson.Get(syncBody, path).Exists()
},
)
// Verify the messages after/before invite are visible or not
w = httptest.NewRecorder()
@ -508,8 +535,8 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
@ -604,7 +631,14 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
t.Fatalf("unable to send to device message: %v", err)
}
}
time.Sleep((time.Millisecond * 15) * time.Duration(tc.sendMessagesCount)) // wait a bit, so the messages can be processed
syncUntil(t, base, alice.AccessToken,
len(tc.want) == 0,
func(body string) bool {
return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
},
)
// Execute a /sync request, recording the response
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
@ -627,6 +661,42 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
}
}
func syncUntil(t *testing.T,
base *base.BaseDendrite, accessToken string,
skip bool,
checkFunc func(syncBody string) bool,
) {
if checkFunc == nil {
t.Fatalf("No checkFunc defined")
}
if skip {
return
}
// loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it
// to the syncAPI when hitting /sync
done := make(chan bool)
defer close(done)
go func() {
for {
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": accessToken,
"timeout": "1000",
})))
if checkFunc(w.Body.String()) {
done <- true
return
}
}
}()
select {
case <-done:
case <-time.After(time.Second * 5):
t.Fatalf("Timed out waiting for messages")
}
}
func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input))
for i, ev := range input {

View file

@ -37,6 +37,7 @@ var (
type StateDelta struct {
RoomID string
StateEvents []*gomatrixserverlib.HeaderedEvent
NewlyJoined bool
Membership string
// The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.

View file

@ -144,7 +144,6 @@ Server correctly handles incoming m.device_list_update
If remote user leaves room, changes device and rejoins we see update in sync
If remote user leaves room, changes device and rejoins we see update in /keys/changes
If remote user leaves room we no longer receive device updates
If a device list update goes missing, the server resyncs on the next one
Server correctly resyncs when client query keys and there is no remote cache
Server correctly resyncs when server leaves and rejoins a room
Device list doesn't change if remote server is down
@ -633,7 +632,6 @@ Test that rejected pushers are removed.
Trying to add push rule with no scope fails with 400
Trying to add push rule with invalid scope fails with 400
Forward extremities remain so even after the next events are populated as outliers
If a device list update goes missing, the server resyncs on the next one
uploading self-signing key notifies over federation
uploading signed devices gets propagated over federation
Device list doesn't change if remote server is down

View file

@ -68,7 +68,7 @@ func ListenAndServe(t *testing.T, router http.Handler, withTLS bool) (apiURL str
if withTLS {
certFile := filepath.Join(t.TempDir(), "dendrite.cert")
keyFile := filepath.Join(t.TempDir(), "dendrite.key")
err = NewTLSKey(keyFile, certFile)
err = NewTLSKey(keyFile, certFile, 1024)
if err != nil {
t.Errorf("failed to make TLS key: %s", err)
return

View file

@ -69,8 +69,8 @@ func NewMatrixKey(matrixKeyPath string) (err error) {
const certificateDuration = time.Hour * 24 * 365 * 10
func generateTLSTemplate(dnsNames []string) (*rsa.PrivateKey, *x509.Certificate, error) {
priv, err := rsa.GenerateKey(rand.Reader, 4096)
func generateTLSTemplate(dnsNames []string, bitSize int) (*rsa.PrivateKey, *x509.Certificate, error) {
priv, err := rsa.GenerateKey(rand.Reader, bitSize)
if err != nil {
return nil, nil, err
}
@ -118,8 +118,8 @@ func writePrivateKey(tlsKeyPath string, priv *rsa.PrivateKey) error {
}
// NewTLSKey generates a new RSA TLS key and certificate and writes it to a file.
func NewTLSKey(tlsKeyPath, tlsCertPath string) error {
priv, template, err := generateTLSTemplate(nil)
func NewTLSKey(tlsKeyPath, tlsCertPath string, keySize int) error {
priv, template, err := generateTLSTemplate(nil, keySize)
if err != nil {
return err
}
@ -136,8 +136,8 @@ func NewTLSKey(tlsKeyPath, tlsCertPath string) error {
return writePrivateKey(tlsKeyPath, priv)
}
func NewTLSKeyWithAuthority(serverName, tlsKeyPath, tlsCertPath, authorityKeyPath, authorityCertPath string) error {
priv, template, err := generateTLSTemplate([]string{serverName})
func NewTLSKeyWithAuthority(serverName, tlsKeyPath, tlsCertPath, authorityKeyPath, authorityCertPath string, keySize int) error {
priv, template, err := generateTLSTemplate([]string{serverName}, keySize)
if err != nil {
return err
}

View file

@ -7,6 +7,10 @@ import (
"strings"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/pushrules"
@ -20,9 +24,6 @@ import (
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/userapi/util"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
type OutputStreamEventConsumer struct {
@ -529,7 +530,9 @@ func (s *OutputStreamEventConsumer) notifyHTTP(ctx context.Context, event *gomat
case "event_id_only":
req = pushgateway.NotifyRequest{
Notification: pushgateway.Notification{
Counts: &pushgateway.Counts{},
Counts: &pushgateway.Counts{
Unread: userNumUnreadNotifs,
},
Devices: devices,
EventID: event.EventID(),
RoomID: event.RoomID(),