Merge branch 'main' into s7evink/createaccfix

This commit is contained in:
Neil Alexander 2022-03-24 14:12:33 +00:00 committed by GitHub
commit a5f72665cf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
70 changed files with 887 additions and 633 deletions

View file

@ -2,6 +2,7 @@
<!-- Please read docs/CONTRIBUTING.md before submitting your pull request -->
* [ ] I have added added tests for PR _or_ I have justified why this PR doesn't need tests.
* [ ] Pull request includes a [sign off](https://github.com/matrix-org/dendrite/blob/main/docs/CONTRIBUTING.md#sign-off)
Signed-off-by: `Your Name <your@email.example.org>`

View file

@ -1,34 +0,0 @@
name: "CodeQL"
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
language: ["go"]
steps:
- name: Checkout repository
uses: actions/checkout@v2
with:
fetch-depth: 2
- run: git checkout HEAD^2
if: ${{ github.event_name == 'pull_request' }}
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
with:
languages: ${{ matrix.language }}
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1

324
.github/workflows/dendrite.yml vendored Normal file
View file

@ -0,0 +1,324 @@
name: Dendrite
on:
push:
branches:
- main
pull_request:
release:
types: [published]
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
wasm:
name: WASM build test
timeout-minutes: 5
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: 1.16
- uses: actions/cache@v2
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-wasm-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-wasm
- name: Install Node
uses: actions/setup-node@v2
with:
node-version: 14
- uses: actions/cache@v2
with:
path: ~/.npm
key: ${{ runner.os }}-node-${{ hashFiles('**/package-lock.json') }}
restore-keys: |
${{ runner.os }}-node-
- name: Reconfigure Git to use HTTPS auth for repo packages
run: >
git config --global url."https://github.com/".insteadOf
ssh://git@github.com/
- name: Install test dependencies
working-directory: ./test/wasm
run: npm ci
- name: Test
run: ./test-dendritejs.sh
# Run golangci-lint
lint:
timeout-minutes: 5
name: Linting
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
# run go test with different go versions
test:
timeout-minutes: 5
name: Unit tests (Go ${{ matrix.go }})
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
go: [ '1.16', '1.17', '1.18' ]
steps:
- uses: actions/checkout@v3
- name: Setup go
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go }}
- uses: actions/cache@v3
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go${{ matrix.go }}-test-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go${{ matrix.go }}-test-
- run: go test ./...
# build Dendrite for linux with different architectures and go versions
build:
name: Build for Linux
timeout-minutes: 10
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
go: [ '1.16', '1.17', '1.18' ]
goos: [ 'linux' ]
goarch: [ 'amd64', '386' ]
steps:
- uses: actions/checkout@v3
- name: Setup go
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go }}
- name: Install dependencies x86
if: ${{ matrix.goarch == '386' }}
run: sudo apt update && sudo apt-get install -y gcc-multilib
- uses: actions/cache@v3
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go${{ matrix.go }}-${{ matrix.goarch }}-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go${{ matrix.go }}-${{ matrix.goarch }}-
- env:
GOOS: ${{ matrix.goos }}
GOARCH: ${{ matrix.goarch }}
CGO_ENABLED: 1
run: go build -trimpath -v -o "bin/" ./cmd/...
# build for Windows 64-bit
build_windows:
name: Build for Windows
timeout-minutes: 10
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.16', '1.17', '1.18' ]
goos: [ 'windows' ]
goarch: [ 'amd64' ]
steps:
- uses: actions/checkout@v3
- name: Setup Go ${{ matrix.go }}
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go }}
- name: Install dependencies
run: sudo apt update && sudo apt install -y gcc-mingw-w64-x86-64 # install required gcc
- uses: actions/cache@v3
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go${{ matrix.go }}-${{ matrix.goos }}-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go${{ matrix.go }}-${{ matrix.goos }}
- env:
GOOS: ${{ matrix.goos }}
GOARCH: ${{ matrix.goarch }}
CGO_ENABLED: 1
CC: "/usr/bin/x86_64-w64-mingw32-gcc"
run: go build -trimpath -v -o "bin/" ./cmd/...
# Dummy step to gate other tests on without repeating the whole list
initial-tests-done:
name: Initial tests passed
needs: [ lint, test, build, build_windows ]
runs-on: ubuntu-latest
if: ${{ !cancelled() }} # Run this even if prior jobs were skipped
steps:
- name: Check initial tests passed
uses: re-actors/alls-green@release/v1
with:
jobs: ${{ toJSON(needs) }}
# run database upgrade tests
upgrade_test:
name: Upgrade tests
timeout-minutes: 20
needs: initial-tests-done
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup go
uses: actions/setup-go@v2
with:
go-version: '1.16'
- uses: actions/cache@v3
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-upgrade-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-upgrade
- name: Build upgrade-tests
run: go build ./cmd/dendrite-upgrade-tests
- name: Test upgrade
run: ./dendrite-upgrade-tests --head .
# run Sytest in different variations
sytest:
timeout-minutes: 20
needs: initial-tests-done
name: "Sytest (${{ matrix.label }})"
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- label: SQLite
- label: SQLite, full HTTP APIs
api: full-http
- label: PostgreSQL
postgres: postgres
- label: PostgreSQL, full HTTP APIs
postgres: postgres
api: full-http
container:
image: matrixdotorg/sytest-dendrite:latest
volumes:
- ${{ github.workspace }}:/src
env:
POSTGRES: ${{ matrix.postgres && 1}}
API: ${{ matrix.api && 1 }}
steps:
- uses: actions/checkout@v2
- name: Run Sytest
run: /bootstrap.sh dendrite
working-directory: /src
- name: Summarise results.tap
if: ${{ always() }}
run: /sytest/scripts/tap_to_gha.pl /logs/results.tap
- name: Upload Sytest logs
uses: actions/upload-artifact@v2
if: ${{ always() }}
with:
name: Sytest Logs - ${{ job.status }} - (Dendrite, ${{ join(matrix.*, ', ') }})
path: |
/logs/results.tap
/logs/**/*.log*
# run Complement
complement:
name: "Complement (${{ matrix.label }})"
timeout-minutes: 20
needs: initial-tests-done
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- label: SQLite
- label: SQLite, full HTTP APIs
api: full-http
- label: PostgreSQL
postgres: Postgres
- label: PostgreSQL, full HTTP APIs
postgres: Postgres
api: full-http
steps:
# Env vars are set file a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on env to run Complement.
# See https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-system-path
- name: "Set Go Version"
run: |
echo "$GOROOT_1_17_X64/bin" >> $GITHUB_PATH
echo "~/go/bin" >> $GITHUB_PATH
- name: "Install Complement Dependencies"
# We don't need to install Go because it is included on the Ubuntu 20.04 image:
# See https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-Readme.md specifically GOROOT_1_17_X64
run: |
sudo apt-get update && sudo apt-get install -y libolm3 libolm-dev
go get -v github.com/haveyoudebuggedit/gotestfmt/v2/cmd/gotestfmt@latest
- name: Run actions/checkout@v2 for dendrite
uses: actions/checkout@v2
with:
path: dendrite
# Attempt to check out the same branch of Complement as the PR. If it
# doesn't exist, fallback to main.
- name: Checkout complement
shell: bash
run: |
mkdir -p complement
# Attempt to use the version of complement which best matches the current
# build. Depending on whether this is a PR or release, etc. we need to
# use different fallbacks.
#
# 1. First check if there's a similarly named branch (GITHUB_HEAD_REF
# for pull requests, otherwise GITHUB_REF).
# 2. Attempt to use the base branch, e.g. when merging into release-vX.Y
# (GITHUB_BASE_REF for pull requests).
# 3. Use the default complement branch ("master").
for BRANCH_NAME in "$GITHUB_HEAD_REF" "$GITHUB_BASE_REF" "${GITHUB_REF#refs/heads/}" "master"; do
# Skip empty branch names and merge commits.
if [[ -z "$BRANCH_NAME" || $BRANCH_NAME =~ ^refs/pull/.* ]]; then
continue
fi
(wget -O - "https://github.com/matrix-org/complement/archive/$BRANCH_NAME.tar.gz" | tar -xz --strip-components=1 -C complement) && break
done
# Build initial Dendrite image
- run: docker build -t complement-dendrite -f build/scripts/Complement${{ matrix.postgres }}.Dockerfile .
working-directory: dendrite
# Run Complement
- run: |
set -o pipefail &&
go test -v -json -tags dendrite_blacklist ./tests/... 2>&1 | gotestfmt
shell: bash
name: Run Complement Tests
env:
COMPLEMENT_BASE_IMAGE: complement-dendrite:latest
API: ${{ matrix.api && 1 }}
working-directory: complement

View file

@ -1,71 +0,0 @@
name: Tests
on:
push:
branches: ["main"]
pull_request:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
complement:
runs-on: ubuntu-latest
steps:
# Env vars are set file a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on env to run Complement.
# See https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-system-path
- name: "Set Go Version"
run: |
echo "$GOROOT_1_17_X64/bin" >> $GITHUB_PATH
echo "~/go/bin" >> $GITHUB_PATH
- name: "Install Complement Dependencies"
# We don't need to install Go because it is included on the Ubuntu 20.04 image:
# See https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-Readme.md specifically GOROOT_1_17_X64
run: |
sudo apt-get update && sudo apt-get install -y libolm3 libolm-dev
go get -v github.com/haveyoudebuggedit/gotestfmt/v2/cmd/gotestfmt@latest
- name: Run actions/checkout@v2 for dendrite
uses: actions/checkout@v2
with:
path: dendrite
# Attempt to check out the same branch of Complement as the PR. If it
# doesn't exist, fallback to main.
- name: Checkout complement
shell: bash
run: |
mkdir -p complement
# Attempt to use the version of complement which best matches the current
# build. Depending on whether this is a PR or release, etc. we need to
# use different fallbacks.
#
# 1. First check if there's a similarly named branch (GITHUB_HEAD_REF
# for pull requests, otherwise GITHUB_REF).
# 2. Attempt to use the base branch, e.g. when merging into release-vX.Y
# (GITHUB_BASE_REF for pull requests).
# 3. Use the default complement branch ("master").
for BRANCH_NAME in "$GITHUB_HEAD_REF" "$GITHUB_BASE_REF" "${GITHUB_REF#refs/heads/}" "master"; do
# Skip empty branch names and merge commits.
if [[ -z "$BRANCH_NAME" || $BRANCH_NAME =~ ^refs/pull/.* ]]; then
continue
fi
(wget -O - "https://github.com/matrix-org/complement/archive/$BRANCH_NAME.tar.gz" | tar -xz --strip-components=1 -C complement) && break
done
# Build initial Dendrite image
- run: docker build -t complement-dendrite -f build/scripts/Complement.Dockerfile .
working-directory: dendrite
# Run Complement
- run: |
set -o pipefail &&
go test -v -json -tags dendrite_blacklist ./tests/... 2>&1 | gotestfmt
shell: bash
name: Run Complement Tests
env:
COMPLEMENT_BASE_IMAGE: complement-dendrite:latest
working-directory: complement

View file

@ -1,49 +0,0 @@
name: WebAssembly
on:
push:
pull_request:
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: 1.16.5
- uses: actions/cache@v2
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Install Node
uses: actions/setup-node@v2
with:
node-version: 14
- uses: actions/cache@v2
with:
path: ~/.npm
key: ${{ runner.os }}-node-${{ hashFiles('**/package-lock.json') }}
restore-keys: |
${{ runner.os }}-node-
- name: Reconfigure Git to use HTTPS auth for repo packages
run: >
git config --global url."https://github.com/".insteadOf
ssh://git@github.com/
- name: Install test dependencies
working-directory: ./test/wasm
run: npm ci
- name: Test
run: ./test-dendritejs.sh

View file

@ -1,4 +1,5 @@
# Dendrite [![Build Status](https://badge.buildkite.com/4be40938ab19f2bbc4a6c6724517353ee3ec1422e279faf374.svg?branch=master)](https://buildkite.com/matrix-dot-org/dendrite) [![Dendrite](https://img.shields.io/matrix/dendrite:matrix.org.svg?label=%23dendrite%3Amatrix.org&logo=matrix&server_fqdn=matrix.org)](https://matrix.to/#/#dendrite:matrix.org) [![Dendrite Dev](https://img.shields.io/matrix/dendrite-dev:matrix.org.svg?label=%23dendrite-dev%3Amatrix.org&logo=matrix&server_fqdn=matrix.org)](https://matrix.to/#/#dendrite-dev:matrix.org)
# Dendrite
[![Build status](https://github.com/matrix-org/dendrite/actions/workflows/dendrite.yml/badge.svg?event=push)](https://github.com/matrix-org/dendrite/actions/workflows/dendrite.yml) [![Dendrite](https://img.shields.io/matrix/dendrite:matrix.org.svg?label=%23dendrite%3Amatrix.org&logo=matrix&server_fqdn=matrix.org)](https://matrix.to/#/#dendrite:matrix.org) [![Dendrite Dev](https://img.shields.io/matrix/dendrite-dev:matrix.org.svg?label=%23dendrite-dev%3Amatrix.org&logo=matrix&server_fqdn=matrix.org)](https://matrix.to/#/#dendrite-dev:matrix.org)
Dendrite is a second-generation Matrix homeserver written in Go.
It intends to provide an **efficient**, **reliable** and **scalable** alternative to [Synapse](https://github.com/matrix-org/synapse):

View file

@ -59,7 +59,7 @@ func NewInternalAPI(
},
},
}
js, _ := jetstream.Prepare(&base.Cfg.Global.JetStream)
js, _ := jetstream.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database)

View file

@ -56,7 +56,7 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(),
jetstream: js,
durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"),
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
topic: cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
asDB: appserviceDB,
rsAPI: rsAPI,
serverName: string(cfg.Global.ServerName),

View file

@ -28,7 +28,6 @@ import (
// Database stores events intended to be later sent to application services
type Database struct {
sqlutil.PartitionOffsetStatements
events eventsStatements
txnID txnStatements
db *sql.DB
@ -46,9 +45,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if err = result.prepare(); err != nil {
return nil, err
}
if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil {
return nil, err
}
return &result, nil
}

View file

@ -27,7 +27,6 @@ import (
// Database stores events intended to be later sent to application services
type Database struct {
sqlutil.PartitionOffsetStatements
events eventsStatements
txnID txnStatements
db *sql.DB
@ -45,9 +44,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if err = result.prepare(); err != nil {
return nil, err
}
if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil {
return nil, err
}
return &result, nil
}

View file

@ -408,6 +408,8 @@ func (m *DendriteMonolith) Stop() {
_ = m.PineconeRouter.Close()
}
const MaxFrameSize = types.MaxFrameSize
type Conduit struct {
conn net.Conn
port types.SwitchPortID

View file

@ -21,6 +21,7 @@ WORKDIR /dendrite
RUN ./generate-keys --private-key matrix_key.pem
ENV SERVER_NAME=localhost
ENV API=0
EXPOSE 8008 8448
# At runtime, generate TLS cert based on the CA now mounted at /ca
@ -28,4 +29,4 @@ EXPOSE 8008 8448
CMD ./generate-keys --server $SERVER_NAME --tls-cert server.crt --tls-key server.key --tls-authority-cert /complement/ca/ca.crt --tls-authority-key /complement/ca/ca.key && \
./generate-config -server $SERVER_NAME --ci > dendrite.yaml && \
cp /complement/ca/ca.crt /usr/local/share/ca-certificates/ && update-ca-certificates && \
./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml
./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml -api=${API:-0}

View file

@ -39,6 +39,7 @@ WORKDIR /dendrite
RUN ./generate-keys --private-key matrix_key.pem
ENV SERVER_NAME=localhost
ENV API=0
EXPOSE 8008 8448
@ -50,4 +51,4 @@ CMD /build/run_postgres.sh && ./generate-keys --server $SERVER_NAME --tls-cert s
sed -i "s%connection_string:.*$%connection_string: postgresql://postgres@localhost/postgres?sslmode=disable%g" dendrite.yaml && \
sed -i 's/max_open_conns:.*$/max_open_conns: 100/g' dendrite.yaml && \
cp /complement/ca/ca.crt /usr/local/share/ca-certificates/ && update-ca-certificates && \
./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml
./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml -api=${API:-0}

View file

@ -27,6 +27,7 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
userapi "github.com/matrix-org/dendrite/userapi/api"
userdb "github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/gomatrixserverlib"
@ -34,6 +35,7 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component.
func AddPublicRoutes(
process *process.ProcessContext,
router *mux.Router,
synapseAdminRouter *mux.Router,
cfg *config.ClientAPI,
@ -49,11 +51,11 @@ func AddPublicRoutes(
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
) {
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{
JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
}
routing.Setup(

View file

@ -33,9 +33,9 @@ func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
keyAPI := base.KeyServerHTTPClient()
clientapi.AddPublicRoutes(
base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, accountDB, federation,
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil,
&cfg.MSCs,
base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI,
accountDB, federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI,
keyAPI, nil, &cfg.MSCs,
)
base.SetupAndServeHTTP(

View file

@ -12,6 +12,10 @@ No, although a good portion of the Matrix specification has been implemented. Mo
No, not at present. There will be in the future when Dendrite reaches version 1.0.
### Can I use Dendrite with an existing Synapse database?
No, Dendrite has a very different database schema to Synapse and the two are not interchangeable.
### Should I run a monolith or a polylith deployment?
Monolith deployments are always preferred where possible, and at this time, are far better tested than polylith deployments are. The only reason to consider a polylith deployment is if you wish to run different Dendrite components on separate physical machines.
@ -33,7 +37,7 @@ It should do, although we are aware of some minor issues:
### Does Dendrite support push notifications?
No, not yet. This is a planned feature.
Yes, we have experimental support for push notifications. Configure them in the usual way in your Matrix client.
### Does Dendrite support application services/bridges?

View file

@ -42,15 +42,15 @@ func NewInternalAPI(
) api.EDUServerInputAPI {
cfg := &base.Cfg.EDUServer
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
return &input.EDUServerInputAPI{
Cache: eduCache,
UserAPI: userAPI,
JetStream: js,
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
OutputTypingEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
ServerName: cfg.Matrix.ServerName,
}
}

View file

@ -58,9 +58,9 @@ func NewOutputEDUConsumer(
db: store,
ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"),
typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
typingTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
sendToDeviceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
receiptTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
}
}

View file

@ -55,8 +55,8 @@ func NewKeyChangeConsumer(
return &KeyChangeConsumer{
ctx: process.Context(),
jetstream: js,
durable: cfg.Matrix.JetStream.TopicFor("FederationAPIKeyChangeConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
queues: queues,
db: store,
serverName: cfg.Matrix.ServerName,

View file

@ -61,7 +61,7 @@ func NewOutputRoomEventConsumer(
queues: queues,
rsAPI: rsAPI,
durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
}
}

View file

@ -92,7 +92,7 @@ func NewInternalAPI(
FailuresUntilBlacklist: cfg.FederationMaxRetries,
}
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
queues := queue.NewOutgoingQueues(
federationDB, base.ProcessContext,

View file

@ -30,7 +30,6 @@ import (
// Database stores information needed by the federation sender
type Database struct {
shared.Database
sqlutil.PartitionOffsetStatements
db *sql.DB
writer sqlutil.Writer
}
@ -104,8 +103,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationC
NotaryServerKeysMetadata: notaryMetadata,
ServerSigningKeys: serverSigningKeys,
}
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err
}
return &d, nil
}

View file

@ -29,7 +29,6 @@ import (
// Database stores information needed by the federation sender
type Database struct {
shared.Database
sqlutil.PartitionOffsetStatements
db *sql.DB
writer sqlutil.Writer
}
@ -103,8 +102,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationC
NotaryServerKeysMetadata: notaryKeysMetadata,
ServerSigningKeys: serverSigningKeys,
}
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err
}
return &d, nil
}

2
go.mod
View file

@ -39,7 +39,7 @@ require (
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220317164600-0980b7f341e0
github.com/matrix-org/pinecone v0.0.0-20220308124038-cfde1f8054c5
github.com/matrix-org/pinecone v0.0.0-20220323142759-6fb077377278
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.10
github.com/morikuni/aec v1.0.0 // indirect

4
go.sum
View file

@ -943,8 +943,8 @@ github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5d
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220317164600-0980b7f341e0 h1:IINbE/0jSYGb7M31StazufyIQdYWSivRlhuns3JYPOM=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220317164600-0980b7f341e0/go.mod h1:+WF5InseAMgi1fTnU46JH39IDpEvLep0fDzx9LDf2Bo=
github.com/matrix-org/pinecone v0.0.0-20220308124038-cfde1f8054c5 h1:7viLTiLAA2MtGKY+uf14j6TjfKvvGLAMj/qdm70jJuQ=
github.com/matrix-org/pinecone v0.0.0-20220308124038-cfde1f8054c5/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
github.com/matrix-org/pinecone v0.0.0-20220323142759-6fb077377278 h1:lRrvMMv7x1FIVW1mcBdU89lvbgAXKz6RyYR0VQTAr3E=
github.com/matrix-org/pinecone v0.0.0-20220323142759-6fb077377278/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -1,133 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlutil
import (
"context"
"database/sql"
"strings"
)
// A PartitionOffset is the offset into a partition of the input log.
type PartitionOffset struct {
// The ID of the partition.
Partition int32
// The offset into the partition.
Offset int64
}
const partitionOffsetsSchema = `
-- The offsets that the server has processed up to.
CREATE TABLE IF NOT EXISTS ${prefix}_partition_offsets (
-- The name of the topic.
topic TEXT NOT NULL,
-- The 32-bit partition ID
partition INTEGER NOT NULL,
-- The 64-bit offset.
partition_offset BIGINT NOT NULL,
UNIQUE (topic, partition)
);
`
const selectPartitionOffsetsSQL = "" +
"SELECT partition, partition_offset FROM ${prefix}_partition_offsets WHERE topic = $1"
const upsertPartitionOffsetsSQL = "" +
"INSERT INTO ${prefix}_partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" +
" ON CONFLICT (topic, partition)" +
" DO UPDATE SET partition_offset = $3"
// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table.
type PartitionOffsetStatements struct {
db *sql.DB
writer Writer
selectPartitionOffsetsStmt *sql.Stmt
upsertPartitionOffsetStmt *sql.Stmt
}
// Prepare converts the raw SQL statements into prepared statements.
// Takes a prefix to prepend to the table name used to store the partition offsets.
// This allows multiple components to share the same database schema.
func (s *PartitionOffsetStatements) Prepare(db *sql.DB, writer Writer, prefix string) (err error) {
s.db = db
s.writer = writer
_, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1))
if err != nil {
return
}
if s.selectPartitionOffsetsStmt, err = db.Prepare(
strings.Replace(selectPartitionOffsetsSQL, "${prefix}", prefix, -1),
); err != nil {
return
}
if s.upsertPartitionOffsetStmt, err = db.Prepare(
strings.Replace(upsertPartitionOffsetsSQL, "${prefix}", prefix, -1),
); err != nil {
return
}
return
}
// PartitionOffsets implements PartitionStorer
func (s *PartitionOffsetStatements) PartitionOffsets(
ctx context.Context, topic string,
) ([]PartitionOffset, error) {
return s.selectPartitionOffsets(ctx, topic)
}
// SetPartitionOffset implements PartitionStorer
func (s *PartitionOffsetStatements) SetPartitionOffset(
ctx context.Context, topic string, partition int32, offset int64,
) error {
return s.upsertPartitionOffset(ctx, topic, partition, offset)
}
// selectPartitionOffsets returns all the partition offsets for the given topic.
func (s *PartitionOffsetStatements) selectPartitionOffsets(
ctx context.Context, topic string,
) (results []PartitionOffset, err error) {
rows, err := s.selectPartitionOffsetsStmt.QueryContext(ctx, topic)
if err != nil {
return nil, err
}
defer checkNamedErr(rows.Close, &err)
for rows.Next() {
var offset PartitionOffset
if err = rows.Scan(&offset.Partition, &offset.Offset); err != nil {
return nil, err
}
results = append(results, offset)
}
err = rows.Err()
return results, err
}
// checkNamedErr calls fn and overwrite err if it was nil and fn returned non-nil
func checkNamedErr(fn func() error, err *error) {
if e := fn(); e != nil && *err == nil {
*err = e
}
}
// UpsertPartitionOffset updates or inserts the partition offset for the given topic.
func (s *PartitionOffsetStatements) upsertPartitionOffset(
ctx context.Context, topic string, partition int32, offset int64,
) error {
return s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
stmt := TxStmt(txn, s.upsertPartitionOffsetStmt)
_, err := stmt.ExecContext(ctx, topic, partition, offset)
return err
})
}

View file

@ -223,6 +223,7 @@ func (a *KeyInternalAPI) QueryDeviceMessages(ctx context.Context, req *api.Query
res.StreamID = maxStreamID
}
// nolint:gocyclo
func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse) {
res.DeviceKeys = make(map[string]map[string]json.RawMessage)
res.MasterKeys = make(map[string]gomatrixserverlib.CrossSigningKey)

View file

@ -39,14 +39,14 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
func NewInternalAPI(
base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
) api.KeyInternalAPI {
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
db, err := storage.NewDatabase(&cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to key server database")
}
keyChangeProducer := &producers.KeyChange{
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent)),
JetStream: js,
DB: db,
}

View file

@ -70,8 +70,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error)
CrossSigningKeysTable: csk,
CrossSigningSigsTable: css,
}
if err = d.PartitionOffsetStatements.Prepare(db, d.Writer, "keyserver"); err != nil {
return nil, err
}
return d, nil
}

View file

@ -36,7 +36,6 @@ type Database struct {
StaleDeviceListsTable tables.StaleDeviceLists
CrossSigningKeysTable tables.CrossSigningKeys
CrossSigningSigsTable tables.CrossSigningSigs
sqlutil.PartitionOffsetStatements
}
func (d *Database) ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error) {

View file

@ -69,8 +69,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error)
CrossSigningKeysTable: csk,
CrossSigningSigsTable: css,
}
if err = d.PartitionOffsetStatements.Prepare(db, d.Writer, "keyserver"); err != nil {
return nil, err
}
return d, nil
}

View file

@ -722,8 +722,8 @@ func (r *downloadRequest) fetchRemoteFile(
// create request for remote file
resp, err := client.CreateMediaDownloadRequest(ctx, r.MediaMetadata.Origin, string(r.MediaMetadata.MediaID))
if err != nil || resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
if err != nil || (resp != nil && resp.StatusCode != http.StatusOK) {
if resp != nil && resp.StatusCode == http.StatusNotFound {
return "", false, fmt.Errorf("File with media ID %q does not exist on %s", r.MediaMetadata.MediaID, r.MediaMetadata.Origin)
}
return "", false, fmt.Errorf("file with media ID %q could not be downloaded from %s", r.MediaMetadata.MediaID, r.MediaMetadata.Origin)

View file

@ -102,4 +102,3 @@ func (a AliasEvent) Valid() bool {
}
return a.Alias == "" || validateAliasRegex.MatchString(a.Alias)
}

View file

@ -23,28 +23,28 @@ func TestAliasEvent_Valid(t *testing.T) {
name: "empty alias, invalid alt aliases",
fields: fields{
Alias: "",
AltAliases: []string{ "%not:valid.local"},
AltAliases: []string{"%not:valid.local"},
},
},
{
name: "valid alias, invalid alt aliases",
fields: fields{
Alias: "#valid:test.local",
AltAliases: []string{ "%not:valid.local"},
AltAliases: []string{"%not:valid.local"},
},
},
{
name: "empty alias, invalid alt aliases",
fields: fields{
Alias: "",
AltAliases: []string{ "%not:valid.local"},
AltAliases: []string{"%not:valid.local"},
},
},
{
name: "invalid alias",
fields: fields{
Alias: "%not:valid.local",
AltAliases: []string{ },
AltAliases: []string{},
},
},
}

View file

@ -173,12 +173,15 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
}
if creatorID != request.UserID {
plEvent, err := r.DB.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomPowerLevels, "")
var plEvent *gomatrixserverlib.HeaderedEvent
var pls *gomatrixserverlib.PowerLevelContent
plEvent, err = r.DB.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomPowerLevels, "")
if err != nil {
return fmt.Errorf("r.DB.GetStateEvent: %w", err)
}
pls, err := plEvent.PowerLevels()
pls, err = plEvent.PowerLevels()
if err != nil {
return fmt.Errorf("plEvent.PowerLevels: %w", err)
}
@ -223,7 +226,7 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
}
stateRes := &api.QueryLatestEventsAndStateResponse{}
if err := helpers.QueryLatestEventsAndState(ctx, r.DB, &api.QueryLatestEventsAndStateRequest{RoomID: roomID, StateToFetch: eventsNeeded.Tuples()}, stateRes); err != nil {
if err = helpers.QueryLatestEventsAndState(ctx, r.DB, &api.QueryLatestEventsAndStateRequest{RoomID: roomID, StateToFetch: eventsNeeded.Tuples()}, stateRes); err != nil {
return err
}

View file

@ -90,6 +90,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
r.KeyRing = keyRing
r.Inputer = &input.Inputer{
Cfg: r.Cfg,
ProcessContext: r.ProcessContext,
DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic,

View file

@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
@ -29,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
@ -45,7 +47,35 @@ var keyContentFields = map[string]string{
"m.room.member": "membership",
}
// Inputer is responsible for consuming from the roomserver input
// streams and processing the events. All input events are queued
// into a single NATS stream and the order is preserved strictly.
// The `room_id` message header will contain the room ID which will
// be used to assign the pending event to a per-room worker.
//
// The input API maintains an ephemeral headers-only consumer. It
// will speed through the stream working out which room IDs are
// pending and create durable consumers for them. The durable
// consumer will then be used for each room worker goroutine to
// fetch events one by one and process them. Each room having a
// durable consumer of its own means there is no head-of-line
// blocking between rooms. Filtering ensures that each durable
// consumer only receives events for the room it is interested in.
//
// The ephemeral consumer closely tracks the newest events. The
// per-room durable consumers will only progress through the stream
// as events are processed.
//
// A BC * -> positions of each consumer (* = ephemeral)
// ⌄ ⌄⌄ ⌄
// ABAABCAABCAA -> newest (letter = subject for each message)
//
// In this example, A is still processing an event but has two
// pending events to process afterwards. Both B and C are caught
// up, so they will do nothing until a new event comes in for B
// or C.
type Inputer struct {
Cfg *config.RoomServer
ProcessContext *process.ProcessContext
DB storage.Database
NATSClient *nats.Conn
@ -57,60 +87,172 @@ type Inputer struct {
ACLs *acls.ServerACLs
InputRoomEventTopic string
OutputRoomEventTopic string
workers sync.Map // room ID -> *phony.Inbox
workers sync.Map // room ID -> *worker
Queryer *query.Queryer
}
func (r *Inputer) workerForRoom(roomID string) *phony.Inbox {
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
return inbox.(*phony.Inbox)
type worker struct {
phony.Inbox
sync.Mutex
r *Inputer
roomID string
subscription *nats.Subscription
}
// eventsInProgress is an in-memory map to keep a track of which events we have
// queued up for processing. If we get a redelivery from NATS and we still have
// the queued up item then we won't do anything with the redelivered message. If
// we've restarted Dendrite and now this map is empty then it means that we will
// reload pending work from NATS.
var eventsInProgress sync.Map
func (r *Inputer) startWorkerForRoom(roomID string) {
v, loaded := r.workers.LoadOrStore(roomID, &worker{
r: r,
roomID: roomID,
})
w := v.(*worker)
w.Lock()
defer w.Unlock()
if !loaded || w.subscription == nil {
consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
// onMessage is called when a new event arrives in the roomserver input stream.
// Create the consumer. We do this as a specific step rather than
// letting PullSubscribe create it for us because we need the consumer
// to outlive the subscription. If we do it this way, we can Bind in the
// next step, and when we Unsubscribe, the consumer continues to live. If
// we leave PullSubscribe to create the durable consumer, Unsubscribe will
// delete it because it thinks it "owns" it, which in turn breaks the
// interest-based retention storage policy.
// If the durable consumer already exists, this is effectively a no-op.
// Another interesting tid-bit here: the ACK policy is set to "all" so that
// if we acknowledge a message, we also acknowledge everything that comes
// before it. This is necessary because otherwise our consumer will never
// acknowledge things we filtered out for other subjects and therefore they
// will linger around forever.
if _, err := w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
&nats.ConsumerConfig{
Durable: consumer,
AckPolicy: nats.AckAllPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: subject,
AckWait: MaximumMissingProcessingTime + (time.Second * 10),
},
); err != nil {
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
return
}
// Bind to our durable consumer. We want to receive all messages waiting
// for this subject and we want to manually acknowledge them, so that we
// can ensure they are only cleaned up when we are done processing them.
sub, err := w.r.JetStream.PullSubscribe(
subject, consumer,
nats.ManualAck(),
nats.DeliverAll(),
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
nats.Bind(r.InputRoomEventTopic, consumer),
)
if err != nil {
logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
return
}
// Go and start pulling messages off the queue.
w.subscription = sub
w.Act(nil, w._next)
}
}
// Start creates an ephemeral non-durable consumer on the roomserver
// input topic. It is configured to deliver us headers only because we
// don't actually care about the contents of the message at this point,
// we only care about the `room_id` field. Once a message arrives, we
// will look to see if we have a worker for that room which has its
// own consumer. If we don't, we'll start one.
func (r *Inputer) Start() error {
_, err := r.JetStream.Subscribe(
r.InputRoomEventTopic,
// We specifically don't use jetstream.WithJetStreamMessage here because we
// queue the task off to a room-specific queue and the ACK needs to be sent
// later, possibly with an error response to the inputter if synchronous.
func(msg *nats.Msg) {
roomID := msg.Header.Get("room_id")
"", // This is blank because we specified it in BindStream.
func(m *nats.Msg) {
roomID := m.Header.Get(jetstream.RoomID)
r.startWorkerForRoom(roomID)
_ = m.Ack()
},
nats.HeadersOnly(),
nats.DeliverAll(),
nats.AckAll(),
nats.BindStream(r.InputRoomEventTopic),
)
return err
}
// _next is called by the worker for the room. It must only be called
// by the actor embedded into the worker.
func (w *worker) _next() {
// Look up what the next event is that's waiting to be processed.
ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Minute)
defer cancel()
msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
switch err {
case nil:
// Make sure that once we're done here, we queue up another call
// to _next in the inbox.
defer w.Act(nil, w._next)
// If no error was reported, but we didn't get exactly one message,
// then skip over this and try again on the next iteration.
if len(msgs) != 1 {
return
}
case context.DeadlineExceeded:
// The context exceeded, so we've been waiting for more than a
// minute for activity in this room. At this point we will shut
// down the subscriber to free up resources. It'll get started
// again if new activity happens.
if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
}
w.Lock()
w.subscription = nil
w.Unlock()
return
default:
// Something went wrong while trying to fetch the next event
// from the queue. In which case, we'll shut down the subscriber
// and wait to be notified about new room activity again. Maybe
// the problem will be corrected by then.
logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID)
if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
}
w.Lock()
w.subscription = nil
w.Unlock()
return
}
// Try to unmarshal the input room event. If the JSON unmarshalling
// fails then we'll terminate the message — this notifies NATS that
// we are done with the message and never want to see it again.
msg := msgs[0]
var inputRoomEvent api.InputRoomEvent
if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
_ = msg.Term()
return
}
_ = msg.InProgress()
index := roomID + "\000" + inputRoomEvent.Event.EventID()
if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok {
// We're already waiting to deal with this event, so there's no
// point in queuing it up again. We've notified NATS that we're
// working on the message still, so that will have deferred the
// redelivery by a bit.
return
}
roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc()
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
r.workerForRoom(roomID).Act(nil, func() {
_ = msg.InProgress() // resets the acknowledgement wait timer
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
// Process the room event. If something goes wrong then we'll tell
// NATS to terminate the message. We'll store the error result as
// a string, because we might want to return that to the caller if
// it was a synchronous request.
var errString string
if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil {
if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
}
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"room_id": w.roomID,
"event_id": inputRoomEvent.Event.EventID(),
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process async event")
@ -119,35 +261,76 @@ func (r *Inputer) Start() error {
} else {
_ = msg.Ack()
}
// If it was a synchronous input request then the "sync" field
// will be present in the message. That means that someone is
// waiting for a response. The temporary inbox name is present in
// that field, so send back the error string (if any). If there
// was no error then we'll return a blank message, which means
// that everything was OK.
if replyTo := msg.Header.Get("sync"); replyTo != "" {
if err := r.NATSClient.Publish(replyTo, []byte(errString)); err != nil {
if err = w.r.NATSClient.Publish(replyTo, []byte(errString)); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"room_id": w.roomID,
"event_id": inputRoomEvent.Event.EventID(),
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to respond for sync event")
}
}
})
},
// NATS wants to acknowledge automatically by default when the message is
// read from the stream, but we want to override that behaviour by making
// sure that we only acknowledge when we're happy we've done everything we
// can. This ensures we retry things when it makes sense to do so.
nats.ManualAck(),
// Use a durable named consumer.
r.Durable,
// If we've missed things in the stream, e.g. we restarted, then replay
// all of the queued messages that were waiting for us.
nats.DeliverAll(),
// Ensure that NATS doesn't try to resend us something that wasn't done
// within the period of time that we might still be processing it.
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
// It is recommended to disable this for pull consumers as per the docs:
// https://docs.nats.io/nats-concepts/jetstream/consumers#note-about-push-and-pull-consumers
nats.MaxAckPending(-1),
)
return err
}
// queueInputRoomEvents queues events into the roomserver input
// stream in NATS.
func (r *Inputer) queueInputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest,
) (replySub *nats.Subscription, err error) {
// If the request is synchronous then we need to create a
// temporary inbox to wait for responses on, and then create
// a subscription to it. If it's asynchronous then we won't
// bother, so these values will remain empty.
var replyTo string
if !request.Asynchronous {
replyTo = nats.NewInbox()
replySub, err = r.NATSClient.SubscribeSync(replyTo)
if err != nil {
return nil, fmt.Errorf("r.NATSClient.SubscribeSync: %w", err)
}
if replySub == nil {
// This shouldn't ever happen, but it doesn't hurt to check
// because we can potentially avoid a nil pointer panic later
// if it did for some reason.
return nil, fmt.Errorf("expected a subscription to the temporary inbox")
}
}
// For each event, marshal the input room event and then
// send it into the input queue.
for _, e := range request.InputRoomEvents {
roomID := e.Event.RoomID()
subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID))
msg := &nats.Msg{
Subject: subj,
Header: nats.Header{},
}
msg.Header.Set("room_id", roomID)
if replyTo != "" {
msg.Header.Set("sync", replyTo)
}
msg.Data, err = json.Marshal(e)
if err != nil {
return nil, fmt.Errorf("json.Marshal: %w", err)
}
if _, err = r.JetStream.PublishMsg(msg, nats.Context(ctx)); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": e.Event.EventID(),
"subj": subj,
}).Error("Roomserver failed to queue async event")
return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
}
}
return
}
// InputRoomEvents implements api.RoomserverInternalAPI
@ -156,48 +339,23 @@ func (r *Inputer) InputRoomEvents(
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
) {
var replyTo string
var replySub *nats.Subscription
if !request.Asynchronous {
var err error
replyTo = nats.NewInbox()
replySub, err = r.NATSClient.SubscribeSync(replyTo)
// Queue up the event into the roomserver.
replySub, err := r.queueInputRoomEvents(ctx, request)
if err != nil {
response.ErrMsg = err.Error()
return
}
}
var err error
for _, e := range request.InputRoomEvents {
msg := &nats.Msg{
Subject: r.InputRoomEventTopic,
Header: nats.Header{},
Reply: replyTo,
}
roomID := e.Event.RoomID()
msg.Header.Set("room_id", roomID)
if replyTo != "" {
msg.Header.Set("sync", replyTo)
}
msg.Data, err = json.Marshal(e)
if err != nil {
response.ErrMsg = err.Error()
return
}
if _, err = r.JetStream.PublishMsg(msg); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": e.Event.EventID(),
}).Error("Roomserver failed to queue async event")
return
}
}
if request.Asynchronous || replySub == nil {
// If we aren't waiting for synchronous responses then we can
// give up here, there is nothing further to do.
if replySub == nil {
return
}
// Otherwise, we'll want to sit and wait for the responses
// from the roomserver. There will be one response for every
// input we submitted. The last error value we receive will
// be the one returned as the error string.
defer replySub.Drain() // nolint:errcheck
for i := 0; i < len(request.InputRoomEvents); i++ {
msg, err := replySub.NextMsgWithContext(ctx)
@ -207,7 +365,6 @@ func (r *Inputer) InputRoomEvents(
}
if len(msg.Data) > 0 {
response.ErrMsg = string(msg.Data)
return
}
}
}

View file

@ -50,12 +50,12 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to room server db")
}
js, nc := jetstream.Prepare(&cfg.Matrix.JetStream)
js, nc := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
return internal.NewRoomserverAPI(
base.ProcessContext, cfg, roomserverDB, js, nc,
cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent),
cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames,
)
}

View file

@ -151,7 +151,7 @@ func (s *eventStateKeyStatements) BulkSelectEventStateKey(
if err != nil {
return nil, err
}
defer selectPrep.Close()
defer internal.CloseAndLogIfError(ctx, selectPrep, "selectPrep.close() failed")
stmt := sqlutil.TxStmt(txn, selectPrep)
rows, err := stmt.QueryContext(ctx, iEventStateKeyNIDs...)
if err != nil {

View file

@ -128,7 +128,7 @@ func (s *eventTypeStatements) BulkSelectEventTypeNID(
if err != nil {
return nil, err
}
defer selectPrep.Close()
defer internal.CloseAndLogIfError(ctx, selectPrep, "selectPrep.close() failed")
stmt := sqlutil.TxStmt(txn, selectPrep)
///////////////

View file

@ -567,7 +567,7 @@ func (s *eventStatements) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx,
if err != nil {
return 0, err
}
defer sqlPrep.Close()
defer internal.CloseAndLogIfError(ctx, sqlPrep, "sqlPrep.close() failed")
err = sqlutil.TxStmt(txn, sqlPrep).QueryRowContext(ctx, iEventIDs...).Scan(&result)
if err != nil {
return 0, fmt.Errorf("sqlutil.TxStmt.QueryRowContext: %w", err)
@ -583,7 +583,7 @@ func (s *eventStatements) SelectRoomNIDsForEventNIDs(
if err != nil {
return nil, err
}
defer sqlPrep.Close()
defer internal.CloseAndLogIfError(ctx, sqlPrep, "sqlPrep.close() failed")
sqlStmt := sqlutil.TxStmt(txn, sqlPrep)
iEventNIDs := make([]interface{}, len(eventNIDs))
for i, v := range eventNIDs {

View file

@ -209,13 +209,14 @@ func setupRegexps(asAPI *AppServiceAPI, derived *Derived) (err error) {
for _, appservice := range derived.ApplicationServices {
// The sender_localpart can be considered an exclusive regex for a single user, so let's do that
// to simplify the code
var senderUserIDSlice = []string{fmt.Sprintf("@%s:%s", appservice.SenderLocalpart, asAPI.Matrix.ServerName)}
usersSlice, found := appservice.NamespaceMap["users"]
users, found := appservice.NamespaceMap["users"]
if !found {
usersSlice = []ApplicationServiceNamespace{}
appservice.NamespaceMap["users"] = usersSlice
users = []ApplicationServiceNamespace{}
}
appendExclusiveNamespaceRegexs(&senderUserIDSlice, usersSlice)
appservice.NamespaceMap["users"] = append(users, ApplicationServiceNamespace{
Exclusive: true,
Regex: regexp.QuoteMeta(fmt.Sprintf("@%s:%s", appservice.SenderLocalpart, asAPI.Matrix.ServerName)),
})
for key, namespaceSlice := range appservice.NamespaceMap {
switch key {

View file

@ -19,12 +19,12 @@ type JetStream struct {
InMemory bool `yaml:"in_memory"`
}
func (c *JetStream) TopicFor(name string) string {
func (c *JetStream) Prefixed(name string) string {
return fmt.Sprintf("%s%s", c.TopicPrefix, name)
}
func (c *JetStream) Durable(name string) string {
return c.TopicFor(name)
return c.Prefixed(name)
}
func (c *JetStream) Defaults(generate bool) {

View file

@ -1,11 +1,13 @@
package jetstream
import (
"reflect"
"strings"
"sync"
"time"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/sirupsen/logrus"
natsserver "github.com/nats-io/nats-server/v2/server"
@ -15,7 +17,7 @@ import (
var natsServer *natsserver.Server
var natsServerMutex sync.Mutex
func Prepare(cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
return setupNATS(cfg, nil)
@ -35,7 +37,16 @@ func Prepare(cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Co
panic(err)
}
natsServer.ConfigureLogger()
go natsServer.Start()
go func() {
process.ComponentStarted()
natsServer.Start()
}()
go func() {
<-process.WaitForShutdown()
natsServer.Shutdown()
natsServer.WaitForShutdown()
process.ComponentFinished()
}()
}
natsServerMutex.Unlock()
if !natsServer.ReadyForConnections(time.Second * 10) {
@ -65,14 +76,35 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
}
for _, stream := range streams { // streams are defined in streams.go
name := cfg.TopicFor(stream.Name)
name := cfg.Prefixed(stream.Name)
info, err := s.StreamInfo(name)
if err != nil && err != natsclient.ErrStreamNotFound {
logrus.WithError(err).Fatal("Unable to get stream info")
}
subjects := stream.Subjects
if len(subjects) == 0 {
// By default we want each stream to listen for the subjects
// that are either an exact match for the stream name, or where
// the first part of the subject is the stream name. ">" is a
// wildcard in NATS for one or more subject tokens. In the case
// that the stream is called "Foo", this will match any message
// with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc.
subjects = []string{name, name + ".>"}
}
if info != nil {
switch {
case !reflect.DeepEqual(info.Config.Subjects, subjects):
fallthrough
case info.Config.Retention != stream.Retention:
fallthrough
case info.Config.Storage != stream.Storage:
if err = s.DeleteStream(name); err != nil {
logrus.WithError(err).Fatal("Unable to delete stream")
}
info = nil
}
}
if info == nil {
stream.Subjects = []string{name}
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
@ -83,8 +115,9 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
// array, otherwise we end up with namespaces on namespaces.
namespaced := *stream
namespaced.Name = name
namespaced.Subjects = subjects
if _, err = s.AddStream(&namespaced); err != nil {
logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream")
}
}
}

View file

@ -1,6 +1,8 @@
package jetstream
import (
"fmt"
"regexp"
"time"
"github.com/nats-io/nats.go"
@ -24,10 +26,20 @@ var (
OutputReadUpdate = "OutputReadUpdate"
)
var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+")
func Tokenise(str string) string {
return safeCharacters.ReplaceAllString(str, "_")
}
func InputRoomEventSubj(roomID string) string {
return fmt.Sprintf("%s.%s", InputRoomEvent, Tokenise(roomID))
}
var streams = []*nats.StreamConfig{
{
Name: InputRoomEvent,
Retention: nats.WorkQueuePolicy,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{

View file

@ -57,7 +57,7 @@ type Monolith struct {
// AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux *mux.Router) {
clientapi.AddPublicRoutes(
csMux, synapseMux, &m.Config.ClientAPI, m.AccountDB,
process, csMux, synapseMux, &m.Config.ClientAPI, m.AccountDB,
m.FedClient, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
m.FederationAPI, m.UserAPI, m.KeyAPI,

View file

@ -283,11 +283,7 @@ func (w *walker) walk() util.JSONResponse {
if !roomExists {
// attempt to query this room over federation, as either we've never heard of it before
// or we've left it and hence are not authorised (but info may be exposed regardless)
fedRes, err := w.federatedRoomInfo(rv.roomID, rv.vias)
if err != nil {
util.GetLogger(w.ctx).WithError(err).WithField("room_id", rv.roomID).Errorf("failed to query federated spaces")
continue
}
fedRes := w.federatedRoomInfo(rv.roomID, rv.vias)
if fedRes != nil {
discoveredChildEvents = fedRes.Room.ChildrenState
discoveredRooms = append(discoveredRooms, fedRes.Room)
@ -420,15 +416,15 @@ func (w *walker) publicRoomsChunk(roomID string) *gomatrixserverlib.PublicRoom {
// federatedRoomInfo returns more of the spaces graph from another server. Returns nil if this was
// unsuccessful.
func (w *walker) federatedRoomInfo(roomID string, vias []string) (*gomatrixserverlib.MSC2946SpacesResponse, error) {
func (w *walker) federatedRoomInfo(roomID string, vias []string) *gomatrixserverlib.MSC2946SpacesResponse {
// only do federated requests for client requests
if w.caller == nil {
return nil, nil
return nil
}
resp, ok := w.cache.GetSpaceSummary(roomID)
if ok {
util.GetLogger(w.ctx).Debugf("Returning cached response for %s", roomID)
return &resp, nil
return &resp
}
util.GetLogger(w.ctx).Debugf("Querying %s via %+v", roomID, vias)
ctx := context.Background()
@ -455,9 +451,9 @@ func (w *walker) federatedRoomInfo(roomID string, vias []string) (*gomatrixserve
}
w.cache.StoreSpaceSummary(roomID, res)
return &res, nil
return &res
}
return nil, nil
return nil
}
func (w *walker) roomExists(roomID string) bool {
@ -717,23 +713,6 @@ func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEve
}
}
func eventKey(event *gomatrixserverlib.MSC2946StrippedEvent) string {
return event.RoomID + "|" + event.Type + "|" + event.StateKey
}
func spaceTargetStripped(event *gomatrixserverlib.MSC2946StrippedEvent) string {
if event.StateKey == "" {
return "" // no-op
}
switch event.Type {
case ConstSpaceParentEventType:
return event.StateKey
case ConstSpaceChildEventType:
return event.StateKey
}
return ""
}
func parseInt(intstr string, defaultVal int) int {
i, err := strconv.ParseInt(intstr, 10, 32)
if err != nil {

View file

@ -61,7 +61,7 @@ func NewOutputClientDataConsumer(
return &OutputClientDataConsumer{
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
db: store,
notifier: notifier,

View file

@ -62,7 +62,7 @@ func NewOutputReceiptEventConsumer(
return &OutputReceiptEventConsumer{
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
db: store,
notifier: notifier,

View file

@ -57,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer(
return &OutputSendToDeviceEventConsumer{
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"),
db: store,
serverName: cfg.Matrix.ServerName,

View file

@ -56,7 +56,7 @@ func NewOutputTypingEventConsumer(
return &OutputTypingEventConsumer{
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"),
eduCache: eduCache,
notifier: notifier,

View file

@ -65,7 +65,7 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(),
cfg: cfg,
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"),
db: store,
notifier: notifier,

View file

@ -56,7 +56,7 @@ func NewOutputNotificationDataConsumer(
ctx: process.Context(),
jetstream: js,
durable: cfg.Matrix.JetStream.Durable("SyncAPINotificationDataConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData),
db: store,
notifier: notifier,
stream: stream,

View file

@ -41,7 +41,6 @@ type messagesReq struct {
roomID string
from *types.TopologyToken
to *types.TopologyToken
fromStream *types.StreamingToken
device *userapi.Device
wasToProvided bool
backwardOrdering bool
@ -50,7 +49,7 @@ type messagesReq struct {
type messagesResp struct {
Start string `json:"start"`
StartStream string `json:"start_stream,omitempty"` // NOTSPEC: so clients can hit /messages then immediately /sync with a latest sync token
StartStream string `json:"start_stream,omitempty"` // NOTSPEC: used by Cerulean, so clients can hit /messages then immediately /sync with a latest sync token
End string `json:"end"`
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
State []gomatrixserverlib.ClientEvent `json:"state"`
@ -93,6 +92,7 @@ func OnIncomingMessagesRequest(
// Pagination tokens.
var fromStream *types.StreamingToken
fromQuery := req.URL.Query().Get("from")
toQuery := req.URL.Query().Get("to")
emptyFromSupplied := fromQuery == ""
if emptyFromSupplied {
// NOTSPEC: We will pretend they used the latest sync token if no ?from= was provided.
@ -101,18 +101,6 @@ func OnIncomingMessagesRequest(
fromQuery = currPos.String()
}
from, err := types.NewTopologyTokenFromString(fromQuery)
if err != nil {
fs, err2 := types.NewStreamTokenFromString(fromQuery)
fromStream = &fs
if err2 != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err2.Error()),
}
}
}
// Direction to return events from.
dir := req.URL.Query().Get("dir")
if dir != "b" && dir != "f" {
@ -125,17 +113,44 @@ func OnIncomingMessagesRequest(
// to have one of the two accepted values (so dir == "f" <=> !backwardOrdering).
backwardOrdering := (dir == "b")
from, err := types.NewTopologyTokenFromString(fromQuery)
if err != nil {
var streamToken types.StreamingToken
if streamToken, err = types.NewStreamTokenFromString(fromQuery); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err.Error()),
}
} else {
fromStream = &streamToken
from, err = db.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, backwardOrdering)
if err != nil {
logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken)
return jsonerror.InternalServerError()
}
}
}
// Pagination tokens. To is optional, and its default value depends on the
// direction ("b" or "f").
var to types.TopologyToken
wasToProvided := true
if s := req.URL.Query().Get("to"); len(s) > 0 {
to, err = types.NewTopologyTokenFromString(s)
if len(toQuery) > 0 {
to, err = types.NewTopologyTokenFromString(toQuery)
if err != nil {
var streamToken types.StreamingToken
if streamToken, err = types.NewStreamTokenFromString(toQuery); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue("Invalid to parameter: " + err.Error()),
}
} else {
to, err = db.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, !backwardOrdering)
if err != nil {
logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken)
return jsonerror.InternalServerError()
}
}
}
} else {
// If "to" isn't provided, it defaults to either the earliest stream
@ -168,7 +183,6 @@ func OnIncomingMessagesRequest(
roomID: roomID,
from: &from,
to: &to,
fromStream: fromStream,
wasToProvided: wasToProvided,
filter: filter,
backwardOrdering: backwardOrdering,
@ -215,7 +229,7 @@ func OnIncomingMessagesRequest(
End: end.String(),
State: state,
}
if emptyFromSupplied {
if fromStream != nil {
res.StartStream = fromStream.String()
}
@ -251,17 +265,9 @@ func (r *messagesReq) retrieveEvents() (
eventFilter := r.filter
// Retrieve the events from the local database.
var streamEvents []types.StreamEvent
if r.fromStream != nil {
toStream := r.to.StreamToken()
streamEvents, err = r.db.GetEventsInStreamingRange(
r.ctx, r.fromStream, &toStream, r.roomID, eventFilter, r.backwardOrdering,
)
} else {
streamEvents, err = r.db.GetEventsInTopologicalRange(
streamEvents, err := r.db.GetEventsInTopologicalRange(
r.ctx, r.from, r.to, r.roomID, eventFilter.Limit, r.backwardOrdering,
)
}
if err != nil {
err = fmt.Errorf("GetEventsInRange: %w", err)
return

View file

@ -103,8 +103,6 @@ type Database interface {
// DeletePeek deletes all peeks for a given room by a given user
// Returns an error if there was a problem communicating with the database.
DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error)
// GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit.
GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, eventFilter *gomatrixserverlib.RoomEventFilter, backwardOrdering bool) (events []types.StreamEvent, err error)
// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit.
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
// EventPositionInTopology returns the depth and stream position of the given event.
@ -149,4 +147,6 @@ type Database interface {
SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error)
}

View file

@ -472,7 +472,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
if err != nil {
return
}
defer rows.Close()
defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
for rows.Next() {
var (
@ -504,7 +504,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
if err != nil {
return
}
defer rows.Close()
defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
for rows.Next() {
var (

View file

@ -51,7 +51,7 @@ const selectEventIDsInRangeASCSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1 AND (" +
"(topological_position > $2 AND topological_position < $3) OR" +
"(topological_position = $4 AND stream_position <= $5)" +
"(topological_position = $4 AND stream_position >= $5)" +
") ORDER BY topological_position ASC, stream_position ASC LIMIT $6"
const selectEventIDsInRangeDESCSQL = "" +
@ -76,6 +76,12 @@ const selectMaxPositionInTopologySQL = "" +
const deleteTopologyForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1"
const selectStreamToTopologicalPositionAscSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;"
const selectStreamToTopologicalPositionDescSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
type outputRoomEventsTopologyStatements struct {
insertEventInTopologyStmt *sql.Stmt
selectEventIDsInRangeASCStmt *sql.Stmt
@ -83,6 +89,8 @@ type outputRoomEventsTopologyStatements struct {
selectPositionInTopologyStmt *sql.Stmt
selectMaxPositionInTopologyStmt *sql.Stmt
deleteTopologyForRoomStmt *sql.Stmt
selectStreamToTopologicalPositionAscStmt *sql.Stmt
selectStreamToTopologicalPositionDescStmt *sql.Stmt
}
func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
@ -109,6 +117,12 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
if s.deleteTopologyForRoomStmt, err = db.Prepare(deleteTopologyForRoomSQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
return nil, err
}
return s, nil
}
@ -170,6 +184,19 @@ func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology(
return
}
// SelectStreamToTopologicalPosition returns the closest position of a given event
// in the topology of the room it belongs to from the given stream position.
func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition(
ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, backwardOrdering bool,
) (topoPos types.StreamPosition, err error) {
if backwardOrdering {
err = s.selectStreamToTopologicalPositionDescStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos)
} else {
err = s.selectStreamToTopologicalPositionAscStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos)
}
return
}
func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
ctx context.Context, txn *sql.Tx, roomID string,
) (pos types.StreamPosition, spos types.StreamPosition, err error) {

View file

@ -32,7 +32,6 @@ type SyncServerDatasource struct {
shared.Database
db *sql.DB
writer sqlutil.Writer
sqlutil.PartitionOffsetStatements
}
// NewDatabase creates a new sync server database
@ -43,9 +42,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
return nil, err
}
d.writer = sqlutil.NewDummyWriter()
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
return nil, err
}
accountData, err := NewPostgresAccountDataTable(d.db)
if err != nil {
return nil, err

View file

@ -155,37 +155,6 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixse
return d.StreamEventsToEvents(nil, streamEvents), nil
}
// GetEventsInStreamingRange retrieves all of the events on a given ordering using the
// given extremities and limit.
func (d *Database) GetEventsInStreamingRange(
ctx context.Context,
from, to *types.StreamingToken,
roomID string, eventFilter *gomatrixserverlib.RoomEventFilter,
backwardOrdering bool,
) (events []types.StreamEvent, err error) {
r := types.Range{
From: from.PDUPosition,
To: to.PDUPosition,
Backwards: backwardOrdering,
}
if backwardOrdering {
// When using backward ordering, we want the most recent events first.
if events, _, err = d.OutputEvents.SelectRecentEvents(
ctx, nil, roomID, r, eventFilter, false, false,
); err != nil {
return
}
} else {
// When using forward ordering, we want the least recent events first.
if events, err = d.OutputEvents.SelectEarlyEvents(
ctx, nil, roomID, r, eventFilter,
); err != nil {
return
}
}
return events, err
}
func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
return d.CurrentRoomState.SelectJoinedUsers(ctx)
}
@ -513,6 +482,26 @@ func (d *Database) EventPositionInTopology(
return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil
}
func (d *Database) StreamToTopologicalPosition(
ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool,
) (types.TopologyToken, error) {
topoPos, err := d.Topology.SelectStreamToTopologicalPosition(ctx, nil, roomID, streamPos, backwardOrdering)
switch {
case err == sql.ErrNoRows && backwardOrdering: // no events in range, going backward
return types.TopologyToken{PDUPosition: streamPos}, nil
case err == sql.ErrNoRows && !backwardOrdering: // no events in range, going forward
topoPos, streamPos, err = d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID)
if err != nil {
return types.TopologyToken{}, fmt.Errorf("d.Topology.SelectMaxPositionInTopology: %w", err)
}
return types.TopologyToken{Depth: topoPos, PDUPosition: streamPos}, nil
case err != nil: // some other error happened
return types.TopologyToken{}, fmt.Errorf("d.Topology.SelectStreamToTopologicalPosition: %w", err)
default:
return types.TopologyToken{Depth: topoPos, PDUPosition: streamPos}, nil
}
}
func (d *Database) GetFilter(
ctx context.Context, localpart string, filterID string,
) (*gomatrixserverlib.Filter, error) {

View file

@ -514,7 +514,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
if err != nil {
return
}
defer rows.Close()
defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
for rows.Next() {
var (
@ -550,7 +550,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
if err != nil {
return
}
defer rows.Close()
defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
for rows.Next() {
var (

View file

@ -47,7 +47,7 @@ const selectEventIDsInRangeASCSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1 AND (" +
"(topological_position > $2 AND topological_position < $3) OR" +
"(topological_position = $4 AND stream_position <= $5)" +
"(topological_position = $4 AND stream_position >= $5)" +
") ORDER BY topological_position ASC, stream_position ASC LIMIT $6"
const selectEventIDsInRangeDESCSQL = "" +
@ -65,8 +65,11 @@ const selectMaxPositionInTopologySQL = "" +
"SELECT MAX(topological_position), stream_position FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1 ORDER BY stream_position DESC"
const deleteTopologyForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1"
const selectStreamToTopologicalPositionAscSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;"
const selectStreamToTopologicalPositionDescSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
type outputRoomEventsTopologyStatements struct {
db *sql.DB
@ -76,6 +79,8 @@ type outputRoomEventsTopologyStatements struct {
selectPositionInTopologyStmt *sql.Stmt
selectMaxPositionInTopologyStmt *sql.Stmt
deleteTopologyForRoomStmt *sql.Stmt
selectStreamToTopologicalPositionAscStmt *sql.Stmt
selectStreamToTopologicalPositionDescStmt *sql.Stmt
}
func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
@ -101,7 +106,10 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
return nil, err
}
if s.deleteTopologyForRoomStmt, err = db.Prepare(deleteTopologyForRoomSQL); err != nil {
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
return nil, err
}
return s, nil
@ -163,6 +171,19 @@ func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology(
return
}
// SelectStreamToTopologicalPosition returns the closest position of a given event
// in the topology of the room it belongs to from the given stream position.
func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition(
ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, backwardOrdering bool,
) (topoPos types.StreamPosition, err error) {
if backwardOrdering {
err = s.selectStreamToTopologicalPositionDescStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos)
} else {
err = s.selectStreamToTopologicalPositionAscStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos)
}
return
}
func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
ctx context.Context, txn *sql.Tx, roomID string,
) (pos types.StreamPosition, spos types.StreamPosition, err error) {

View file

@ -30,7 +30,6 @@ type SyncServerDatasource struct {
shared.Database
db *sql.DB
writer sqlutil.Writer
sqlutil.PartitionOffsetStatements
streamID streamIDStatements
}
@ -50,9 +49,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
}
func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (err error) {
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
return err
}
if err = d.streamID.prepare(d.db); err != nil {
return err
}

View file

@ -87,6 +87,8 @@ type Topology interface {
SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error)
// DeleteTopologyForRoom removes all topological information for a room. This should only be done when removing the room entirely.
DeleteTopologyForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error)
// SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room.
SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, forward bool) (topoPos types.StreamPosition, err error)
}
type CurrentRoomState interface {

View file

@ -49,7 +49,7 @@ func AddPublicRoutes(
federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI,
) {
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {
@ -67,18 +67,18 @@ func AddPublicRoutes(
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent),
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
}
userAPIReadUpdateProducer := &producers.UserAPIReadProducer{
JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate),
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
}
_ = userAPIReadUpdateProducer
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier,
streams.DeviceListStreamProvider,
)

View file

@ -239,7 +239,6 @@ Inbound federation can query room alias directory
Outbound federation can query v2 /send_join
Inbound federation can receive v2 /send_join
Message history can be paginated
Getting messages going forward is limited for a departed room (SPEC-216)
Backfill works correctly with history visibility set to joined
Guest user cannot call /events globally
Guest users can join guest_access rooms

View file

@ -47,7 +47,7 @@ func NewOutputReadUpdateConsumer(
db: store,
ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
pgClient: pgClient,
userAPI: userAPI,
syncProducer: syncProducer,

View file

@ -54,7 +54,7 @@ func NewOutputStreamEventConsumer(
jetstream: js,
db: store,
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
pgClient: pgClient,
userAPI: userAPI,
rsAPI: rsAPI,

View file

@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/matrix-org/gomatrixserverlib"
@ -298,7 +299,12 @@ func (d *Database) CheckAccountAvailability(ctx context.Context, localpart strin
// Returns sql.ErrNoRows if no account exists which matches the given localpart.
func (d *Database) GetAccountByLocalpart(ctx context.Context, localpart string,
) (*api.Account, error) {
return d.Accounts.SelectAccountByLocalpart(ctx, localpart)
// try to get the account with lowercase localpart (majority)
acc, err := d.Accounts.SelectAccountByLocalpart(ctx, strings.ToLower(localpart))
if err == sql.ErrNoRows {
acc, err = d.Accounts.SelectAccountByLocalpart(ctx, localpart) // try with localpart as passed by the request
}
return acc, err
}
// SearchProfiles returns all profiles where the provided localpart or display name

View file

@ -46,7 +46,7 @@ func NewInternalAPI(
appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, pgClient pushgateway.Client,
) api.UserInternalAPI {
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
syncProducer := producers.NewSyncAPI(
db, js,
@ -54,8 +54,8 @@ func NewInternalAPI(
// it's handled by clientapi, and hence uses its topic. When user
// API handles it for all account data, we can remove it from
// here.
cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData),
cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData),
)
userAPI := &internal.UserInternalAPI{