diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 8014e9414..9bfb01667 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -2,6 +2,7 @@ +* [ ] I have added added tests for PR _or_ I have justified why this PR doesn't need tests. * [ ] Pull request includes a [sign off](https://github.com/matrix-org/dendrite/blob/main/docs/CONTRIBUTING.md#sign-off) Signed-off-by: `Your Name ` diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml deleted file mode 100644 index de6c79ddc..000000000 --- a/.github/workflows/codeql-analysis.yml +++ /dev/null @@ -1,34 +0,0 @@ -name: "CodeQL" - -on: - push: - branches: [main] - pull_request: - branches: [main] - -jobs: - analyze: - name: Analyze - runs-on: ubuntu-latest - - strategy: - fail-fast: false - matrix: - language: ["go"] - - steps: - - name: Checkout repository - uses: actions/checkout@v2 - with: - fetch-depth: 2 - - - run: git checkout HEAD^2 - if: ${{ github.event_name == 'pull_request' }} - - - name: Initialize CodeQL - uses: github/codeql-action/init@v1 - with: - languages: ${{ matrix.language }} - - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 diff --git a/.github/workflows/dendrite.yml b/.github/workflows/dendrite.yml new file mode 100644 index 000000000..d337865f2 --- /dev/null +++ b/.github/workflows/dendrite.yml @@ -0,0 +1,324 @@ +name: Dendrite + +on: + push: + branches: + - main + pull_request: + release: + types: [published] + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + wasm: + name: WASM build test + timeout-minutes: 5 + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Install Go + uses: actions/setup-go@v2 + with: + go-version: 1.16 + + - uses: actions/cache@v2 + with: + path: | + ~/.cache/go-build + ~/go/pkg/mod + key: ${{ runner.os }}-go-wasm-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go-wasm + + - name: Install Node + uses: actions/setup-node@v2 + with: + node-version: 14 + + - uses: actions/cache@v2 + with: + path: ~/.npm + key: ${{ runner.os }}-node-${{ hashFiles('**/package-lock.json') }} + restore-keys: | + ${{ runner.os }}-node- + + - name: Reconfigure Git to use HTTPS auth for repo packages + run: > + git config --global url."https://github.com/".insteadOf + ssh://git@github.com/ + + - name: Install test dependencies + working-directory: ./test/wasm + run: npm ci + + - name: Test + run: ./test-dendritejs.sh + + # Run golangci-lint + lint: + timeout-minutes: 5 + name: Linting + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: golangci-lint + uses: golangci/golangci-lint-action@v2 + + # run go test with different go versions + test: + timeout-minutes: 5 + name: Unit tests (Go ${{ matrix.go }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + go: [ '1.16', '1.17', '1.18' ] + steps: + - uses: actions/checkout@v3 + - name: Setup go + uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.go }} + - uses: actions/cache@v3 + with: + path: | + ~/.cache/go-build + ~/go/pkg/mod + key: ${{ runner.os }}-go${{ matrix.go }}-test-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go${{ matrix.go }}-test- + - run: go test ./... + + # build Dendrite for linux with different architectures and go versions + build: + name: Build for Linux + timeout-minutes: 10 + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + go: [ '1.16', '1.17', '1.18' ] + goos: [ 'linux' ] + goarch: [ 'amd64', '386' ] + steps: + - uses: actions/checkout@v3 + - name: Setup go + uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.go }} + - name: Install dependencies x86 + if: ${{ matrix.goarch == '386' }} + run: sudo apt update && sudo apt-get install -y gcc-multilib + - uses: actions/cache@v3 + with: + path: | + ~/.cache/go-build + ~/go/pkg/mod + key: ${{ runner.os }}-go${{ matrix.go }}-${{ matrix.goarch }}-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go${{ matrix.go }}-${{ matrix.goarch }}- + - env: + GOOS: ${{ matrix.goos }} + GOARCH: ${{ matrix.goarch }} + CGO_ENABLED: 1 + run: go build -trimpath -v -o "bin/" ./cmd/... + + # build for Windows 64-bit + build_windows: + name: Build for Windows + timeout-minutes: 10 + runs-on: ubuntu-latest + strategy: + matrix: + go: [ '1.16', '1.17', '1.18' ] + goos: [ 'windows' ] + goarch: [ 'amd64' ] + steps: + - uses: actions/checkout@v3 + - name: Setup Go ${{ matrix.go }} + uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.go }} + - name: Install dependencies + run: sudo apt update && sudo apt install -y gcc-mingw-w64-x86-64 # install required gcc + - uses: actions/cache@v3 + with: + path: | + ~/.cache/go-build + ~/go/pkg/mod + key: ${{ runner.os }}-go${{ matrix.go }}-${{ matrix.goos }}-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go${{ matrix.go }}-${{ matrix.goos }} + - env: + GOOS: ${{ matrix.goos }} + GOARCH: ${{ matrix.goarch }} + CGO_ENABLED: 1 + CC: "/usr/bin/x86_64-w64-mingw32-gcc" + run: go build -trimpath -v -o "bin/" ./cmd/... + + # Dummy step to gate other tests on without repeating the whole list + initial-tests-done: + name: Initial tests passed + needs: [ lint, test, build, build_windows ] + runs-on: ubuntu-latest + if: ${{ !cancelled() }} # Run this even if prior jobs were skipped + steps: + - name: Check initial tests passed + uses: re-actors/alls-green@release/v1 + with: + jobs: ${{ toJSON(needs) }} + + # run database upgrade tests + upgrade_test: + name: Upgrade tests + timeout-minutes: 20 + needs: initial-tests-done + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Setup go + uses: actions/setup-go@v2 + with: + go-version: '1.16' + - uses: actions/cache@v3 + with: + path: | + ~/.cache/go-build + ~/go/pkg/mod + key: ${{ runner.os }}-go-upgrade-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go-upgrade + - name: Build upgrade-tests + run: go build ./cmd/dendrite-upgrade-tests + - name: Test upgrade + run: ./dendrite-upgrade-tests --head . + + # run Sytest in different variations + sytest: + timeout-minutes: 20 + needs: initial-tests-done + name: "Sytest (${{ matrix.label }})" + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - label: SQLite + + - label: SQLite, full HTTP APIs + api: full-http + + - label: PostgreSQL + postgres: postgres + + - label: PostgreSQL, full HTTP APIs + postgres: postgres + api: full-http + container: + image: matrixdotorg/sytest-dendrite:latest + volumes: + - ${{ github.workspace }}:/src + env: + POSTGRES: ${{ matrix.postgres && 1}} + API: ${{ matrix.api && 1 }} + steps: + - uses: actions/checkout@v2 + - name: Run Sytest + run: /bootstrap.sh dendrite + working-directory: /src + - name: Summarise results.tap + if: ${{ always() }} + run: /sytest/scripts/tap_to_gha.pl /logs/results.tap + + - name: Upload Sytest logs + uses: actions/upload-artifact@v2 + if: ${{ always() }} + with: + name: Sytest Logs - ${{ job.status }} - (Dendrite, ${{ join(matrix.*, ', ') }}) + path: | + /logs/results.tap + /logs/**/*.log* + + # run Complement + complement: + name: "Complement (${{ matrix.label }})" + timeout-minutes: 20 + needs: initial-tests-done + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - label: SQLite + + - label: SQLite, full HTTP APIs + api: full-http + + - label: PostgreSQL + postgres: Postgres + + - label: PostgreSQL, full HTTP APIs + postgres: Postgres + api: full-http + steps: + # Env vars are set file a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on env to run Complement. + # See https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-system-path + - name: "Set Go Version" + run: | + echo "$GOROOT_1_17_X64/bin" >> $GITHUB_PATH + echo "~/go/bin" >> $GITHUB_PATH + + - name: "Install Complement Dependencies" + # We don't need to install Go because it is included on the Ubuntu 20.04 image: + # See https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-Readme.md specifically GOROOT_1_17_X64 + run: | + sudo apt-get update && sudo apt-get install -y libolm3 libolm-dev + go get -v github.com/haveyoudebuggedit/gotestfmt/v2/cmd/gotestfmt@latest + + - name: Run actions/checkout@v2 for dendrite + uses: actions/checkout@v2 + with: + path: dendrite + + # Attempt to check out the same branch of Complement as the PR. If it + # doesn't exist, fallback to main. + - name: Checkout complement + shell: bash + run: | + mkdir -p complement + # Attempt to use the version of complement which best matches the current + # build. Depending on whether this is a PR or release, etc. we need to + # use different fallbacks. + # + # 1. First check if there's a similarly named branch (GITHUB_HEAD_REF + # for pull requests, otherwise GITHUB_REF). + # 2. Attempt to use the base branch, e.g. when merging into release-vX.Y + # (GITHUB_BASE_REF for pull requests). + # 3. Use the default complement branch ("master"). + for BRANCH_NAME in "$GITHUB_HEAD_REF" "$GITHUB_BASE_REF" "${GITHUB_REF#refs/heads/}" "master"; do + # Skip empty branch names and merge commits. + if [[ -z "$BRANCH_NAME" || $BRANCH_NAME =~ ^refs/pull/.* ]]; then + continue + fi + + (wget -O - "https://github.com/matrix-org/complement/archive/$BRANCH_NAME.tar.gz" | tar -xz --strip-components=1 -C complement) && break + done + + # Build initial Dendrite image + - run: docker build -t complement-dendrite -f build/scripts/Complement${{ matrix.postgres }}.Dockerfile . + working-directory: dendrite + + # Run Complement + - run: | + set -o pipefail && + go test -v -json -tags dendrite_blacklist ./tests/... 2>&1 | gotestfmt + shell: bash + name: Run Complement Tests + env: + COMPLEMENT_BASE_IMAGE: complement-dendrite:latest + API: ${{ matrix.api && 1 }} + working-directory: complement diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml deleted file mode 100644 index 4a1720295..000000000 --- a/.github/workflows/tests.yml +++ /dev/null @@ -1,71 +0,0 @@ -name: Tests - -on: - push: - branches: ["main"] - pull_request: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -jobs: - complement: - runs-on: ubuntu-latest - steps: - # Env vars are set file a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on env to run Complement. - # See https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-system-path - - name: "Set Go Version" - run: | - echo "$GOROOT_1_17_X64/bin" >> $GITHUB_PATH - echo "~/go/bin" >> $GITHUB_PATH - - - name: "Install Complement Dependencies" - # We don't need to install Go because it is included on the Ubuntu 20.04 image: - # See https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-Readme.md specifically GOROOT_1_17_X64 - run: | - sudo apt-get update && sudo apt-get install -y libolm3 libolm-dev - go get -v github.com/haveyoudebuggedit/gotestfmt/v2/cmd/gotestfmt@latest - - - name: Run actions/checkout@v2 for dendrite - uses: actions/checkout@v2 - with: - path: dendrite - - # Attempt to check out the same branch of Complement as the PR. If it - # doesn't exist, fallback to main. - - name: Checkout complement - shell: bash - run: | - mkdir -p complement - # Attempt to use the version of complement which best matches the current - # build. Depending on whether this is a PR or release, etc. we need to - # use different fallbacks. - # - # 1. First check if there's a similarly named branch (GITHUB_HEAD_REF - # for pull requests, otherwise GITHUB_REF). - # 2. Attempt to use the base branch, e.g. when merging into release-vX.Y - # (GITHUB_BASE_REF for pull requests). - # 3. Use the default complement branch ("master"). - for BRANCH_NAME in "$GITHUB_HEAD_REF" "$GITHUB_BASE_REF" "${GITHUB_REF#refs/heads/}" "master"; do - # Skip empty branch names and merge commits. - if [[ -z "$BRANCH_NAME" || $BRANCH_NAME =~ ^refs/pull/.* ]]; then - continue - fi - - (wget -O - "https://github.com/matrix-org/complement/archive/$BRANCH_NAME.tar.gz" | tar -xz --strip-components=1 -C complement) && break - done - - # Build initial Dendrite image - - run: docker build -t complement-dendrite -f build/scripts/Complement.Dockerfile . - working-directory: dendrite - - # Run Complement - - run: | - set -o pipefail && - go test -v -json -tags dendrite_blacklist ./tests/... 2>&1 | gotestfmt - shell: bash - name: Run Complement Tests - env: - COMPLEMENT_BASE_IMAGE: complement-dendrite:latest - working-directory: complement diff --git a/.github/workflows/wasm.yml b/.github/workflows/wasm.yml deleted file mode 100644 index 4889283af..000000000 --- a/.github/workflows/wasm.yml +++ /dev/null @@ -1,49 +0,0 @@ -name: WebAssembly - -on: - push: - pull_request: - -jobs: - test: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - - name: Install Go - uses: actions/setup-go@v2 - with: - go-version: 1.16.5 - - - uses: actions/cache@v2 - with: - path: | - ~/.cache/go-build - ~/go/pkg/mod - key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-go- - - - name: Install Node - uses: actions/setup-node@v2 - with: - node-version: 14 - - - uses: actions/cache@v2 - with: - path: ~/.npm - key: ${{ runner.os }}-node-${{ hashFiles('**/package-lock.json') }} - restore-keys: | - ${{ runner.os }}-node- - - - name: Reconfigure Git to use HTTPS auth for repo packages - run: > - git config --global url."https://github.com/".insteadOf - ssh://git@github.com/ - - - name: Install test dependencies - working-directory: ./test/wasm - run: npm ci - - - name: Test - run: ./test-dendritejs.sh diff --git a/README.md b/README.md index d3a862587..b4a9a614a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ -# Dendrite [![Build Status](https://badge.buildkite.com/4be40938ab19f2bbc4a6c6724517353ee3ec1422e279faf374.svg?branch=master)](https://buildkite.com/matrix-dot-org/dendrite) [![Dendrite](https://img.shields.io/matrix/dendrite:matrix.org.svg?label=%23dendrite%3Amatrix.org&logo=matrix&server_fqdn=matrix.org)](https://matrix.to/#/#dendrite:matrix.org) [![Dendrite Dev](https://img.shields.io/matrix/dendrite-dev:matrix.org.svg?label=%23dendrite-dev%3Amatrix.org&logo=matrix&server_fqdn=matrix.org)](https://matrix.to/#/#dendrite-dev:matrix.org) +# Dendrite +[![Build status](https://github.com/matrix-org/dendrite/actions/workflows/dendrite.yml/badge.svg?event=push)](https://github.com/matrix-org/dendrite/actions/workflows/dendrite.yml) [![Dendrite](https://img.shields.io/matrix/dendrite:matrix.org.svg?label=%23dendrite%3Amatrix.org&logo=matrix&server_fqdn=matrix.org)](https://matrix.to/#/#dendrite:matrix.org) [![Dendrite Dev](https://img.shields.io/matrix/dendrite-dev:matrix.org.svg?label=%23dendrite-dev%3Amatrix.org&logo=matrix&server_fqdn=matrix.org)](https://matrix.to/#/#dendrite-dev:matrix.org) Dendrite is a second-generation Matrix homeserver written in Go. It intends to provide an **efficient**, **reliable** and **scalable** alternative to [Synapse](https://github.com/matrix-org/synapse): diff --git a/appservice/appservice.go b/appservice/appservice.go index 3e19e09b2..b99091866 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -59,7 +59,7 @@ func NewInternalAPI( }, }, } - js, _ := jetstream.Prepare(&base.Cfg.Global.JetStream) + js, _ := jetstream.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 9d723bed1..01790722a 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -56,7 +56,7 @@ func NewOutputRoomEventConsumer( ctx: process.Context(), jetstream: js, durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"), - topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), + topic: cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), asDB: appserviceDB, rsAPI: rsAPI, serverName: string(cfg.Global.ServerName), diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go index d2c3e261e..eaf947ff3 100644 --- a/appservice/storage/postgres/storage.go +++ b/appservice/storage/postgres/storage.go @@ -28,7 +28,6 @@ import ( // Database stores events intended to be later sent to application services type Database struct { - sqlutil.PartitionOffsetStatements events eventsStatements txnID txnStatements db *sql.DB @@ -46,9 +45,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if err = result.prepare(); err != nil { return nil, err } - if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil { - return nil, err - } return &result, nil } diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go index 51bfe7109..9260c7fe7 100644 --- a/appservice/storage/sqlite3/storage.go +++ b/appservice/storage/sqlite3/storage.go @@ -27,7 +27,6 @@ import ( // Database stores events intended to be later sent to application services type Database struct { - sqlutil.PartitionOffsetStatements events eventsStatements txnID txnStatements db *sql.DB @@ -45,9 +44,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if err = result.prepare(); err != nil { return nil, err } - if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil { - return nil, err - } return &result, nil } diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 5ab90adaf..865457010 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -408,6 +408,8 @@ func (m *DendriteMonolith) Stop() { _ = m.PineconeRouter.Close() } +const MaxFrameSize = types.MaxFrameSize + type Conduit struct { conn net.Conn port types.SwitchPortID diff --git a/build/scripts/Complement.Dockerfile b/build/scripts/Complement.Dockerfile index 1d520b4e7..6b2942d97 100644 --- a/build/scripts/Complement.Dockerfile +++ b/build/scripts/Complement.Dockerfile @@ -21,6 +21,7 @@ WORKDIR /dendrite RUN ./generate-keys --private-key matrix_key.pem ENV SERVER_NAME=localhost +ENV API=0 EXPOSE 8008 8448 # At runtime, generate TLS cert based on the CA now mounted at /ca @@ -28,4 +29,4 @@ EXPOSE 8008 8448 CMD ./generate-keys --server $SERVER_NAME --tls-cert server.crt --tls-key server.key --tls-authority-cert /complement/ca/ca.crt --tls-authority-key /complement/ca/ca.key && \ ./generate-config -server $SERVER_NAME --ci > dendrite.yaml && \ cp /complement/ca/ca.crt /usr/local/share/ca-certificates/ && update-ca-certificates && \ - ./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml + ./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml -api=${API:-0} diff --git a/build/scripts/ComplementPostgres.Dockerfile b/build/scripts/ComplementPostgres.Dockerfile index 6024ae8da..b98f4671c 100644 --- a/build/scripts/ComplementPostgres.Dockerfile +++ b/build/scripts/ComplementPostgres.Dockerfile @@ -39,6 +39,7 @@ WORKDIR /dendrite RUN ./generate-keys --private-key matrix_key.pem ENV SERVER_NAME=localhost +ENV API=0 EXPOSE 8008 8448 @@ -50,4 +51,4 @@ CMD /build/run_postgres.sh && ./generate-keys --server $SERVER_NAME --tls-cert s sed -i "s%connection_string:.*$%connection_string: postgresql://postgres@localhost/postgres?sslmode=disable%g" dendrite.yaml && \ sed -i 's/max_open_conns:.*$/max_open_conns: 100/g' dendrite.yaml && \ cp /complement/ca/ca.crt /usr/local/share/ca-certificates/ && update-ca-certificates && \ - ./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml \ No newline at end of file + ./dendrite-monolith-server --tls-cert server.crt --tls-key server.key --config dendrite.yaml -api=${API:-0} \ No newline at end of file diff --git a/clientapi/auth/password.go b/clientapi/auth/password.go index 18cf94979..046b36f0b 100644 --- a/clientapi/auth/password.go +++ b/clientapi/auth/password.go @@ -62,7 +62,7 @@ func (t *LoginTypePassword) LoginFromJSON(ctx context.Context, reqBytes []byte) func (t *LoginTypePassword) Login(ctx context.Context, req interface{}) (*Login, *util.JSONResponse) { r := req.(*PasswordRequest) - username := strings.ToLower(r.Username()) + username := strings.ToLower(r.Username()) if username == "" { return nil, &util.JSONResponse{ Code: http.StatusUnauthorized, diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index e4279c220..4550343c7 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -27,6 +27,7 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" userdb "github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/gomatrixserverlib" @@ -34,6 +35,7 @@ import ( // AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component. func AddPublicRoutes( + process *process.ProcessContext, router *mux.Router, synapseAdminRouter *mux.Router, cfg *config.ClientAPI, @@ -49,11 +51,11 @@ func AddPublicRoutes( extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, ) { - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ JetStream: js, - Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), } routing.Setup( diff --git a/cmd/dendrite-polylith-multi/personalities/clientapi.go b/cmd/dendrite-polylith-multi/personalities/clientapi.go index bd9f7a109..5e67acd03 100644 --- a/cmd/dendrite-polylith-multi/personalities/clientapi.go +++ b/cmd/dendrite-polylith-multi/personalities/clientapi.go @@ -33,9 +33,9 @@ func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { keyAPI := base.KeyServerHTTPClient() clientapi.AddPublicRoutes( - base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, accountDB, federation, - rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil, - &cfg.MSCs, + base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, + accountDB, federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, + keyAPI, nil, &cfg.MSCs, ) base.SetupAndServeHTTP( diff --git a/docs/FAQ.md b/docs/FAQ.md index 149efe619..978212cce 100644 --- a/docs/FAQ.md +++ b/docs/FAQ.md @@ -12,6 +12,10 @@ No, although a good portion of the Matrix specification has been implemented. Mo No, not at present. There will be in the future when Dendrite reaches version 1.0. +### Can I use Dendrite with an existing Synapse database? + +No, Dendrite has a very different database schema to Synapse and the two are not interchangeable. + ### Should I run a monolith or a polylith deployment? Monolith deployments are always preferred where possible, and at this time, are far better tested than polylith deployments are. The only reason to consider a polylith deployment is if you wish to run different Dendrite components on separate physical machines. @@ -33,7 +37,7 @@ It should do, although we are aware of some minor issues: ### Does Dendrite support push notifications? -No, not yet. This is a planned feature. +Yes, we have experimental support for push notifications. Configure them in the usual way in your Matrix client. ### Does Dendrite support application services/bridges? diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 6882399da..91208a400 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -42,15 +42,15 @@ func NewInternalAPI( ) api.EDUServerInputAPI { cfg := &base.Cfg.EDUServer - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) return &input.EDUServerInputAPI{ Cache: eduCache, UserAPI: userAPI, JetStream: js, - OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), - OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), - OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + OutputTypingEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + OutputReceiptEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), ServerName: cfg.Matrix.ServerName, } } diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go index 1f81fa258..e14e60f47 100644 --- a/federationapi/consumers/eduserver.go +++ b/federationapi/consumers/eduserver.go @@ -58,9 +58,9 @@ func NewOutputEDUConsumer( db: store, ServerName: cfg.Matrix.ServerName, durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"), - typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), - sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), - receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + typingTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + sendToDeviceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + receiptTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), } } diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go index 33d716d25..94e454359 100644 --- a/federationapi/consumers/keychange.go +++ b/federationapi/consumers/keychange.go @@ -55,8 +55,8 @@ func NewKeyChangeConsumer( return &KeyChangeConsumer{ ctx: process.Context(), jetstream: js, - durable: cfg.Matrix.JetStream.TopicFor("FederationAPIKeyChangeConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), + durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), queues: queues, db: store, serverName: cfg.Matrix.ServerName, diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 989f7cf49..ff2c8e5d4 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -61,7 +61,7 @@ func NewOutputRoomEventConsumer( queues: queues, rsAPI: rsAPI, durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), } } diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 9f149d973..b7f93ecb9 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -92,7 +92,7 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) queues := queue.NewOutgoingQueues( federationDB, base.ProcessContext, diff --git a/federationapi/storage/postgres/storage.go b/federationapi/storage/postgres/storage.go index 2e2c08911..b2aea6929 100644 --- a/federationapi/storage/postgres/storage.go +++ b/federationapi/storage/postgres/storage.go @@ -30,7 +30,6 @@ import ( // Database stores information needed by the federation sender type Database struct { shared.Database - sqlutil.PartitionOffsetStatements db *sql.DB writer sqlutil.Writer } @@ -104,8 +103,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationC NotaryServerKeysMetadata: notaryMetadata, ServerSigningKeys: serverSigningKeys, } - if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { - return nil, err - } return &d, nil } diff --git a/federationapi/storage/sqlite3/storage.go b/federationapi/storage/sqlite3/storage.go index 978dd7136..c2e83211e 100644 --- a/federationapi/storage/sqlite3/storage.go +++ b/federationapi/storage/sqlite3/storage.go @@ -29,7 +29,6 @@ import ( // Database stores information needed by the federation sender type Database struct { shared.Database - sqlutil.PartitionOffsetStatements db *sql.DB writer sqlutil.Writer } @@ -103,8 +102,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationC NotaryServerKeysMetadata: notaryKeysMetadata, ServerSigningKeys: serverSigningKeys, } - if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { - return nil, err - } return &d, nil } diff --git a/go.mod b/go.mod index d316f4b66..ae925c719 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrixserverlib v0.0.0-20220317164600-0980b7f341e0 - github.com/matrix-org/pinecone v0.0.0-20220308124038-cfde1f8054c5 + github.com/matrix-org/pinecone v0.0.0-20220323142759-6fb077377278 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.10 github.com/morikuni/aec v1.0.0 // indirect diff --git a/go.sum b/go.sum index 27fe34daf..25174f468 100644 --- a/go.sum +++ b/go.sum @@ -943,8 +943,8 @@ github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5d github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrixserverlib v0.0.0-20220317164600-0980b7f341e0 h1:IINbE/0jSYGb7M31StazufyIQdYWSivRlhuns3JYPOM= github.com/matrix-org/gomatrixserverlib v0.0.0-20220317164600-0980b7f341e0/go.mod h1:+WF5InseAMgi1fTnU46JH39IDpEvLep0fDzx9LDf2Bo= -github.com/matrix-org/pinecone v0.0.0-20220308124038-cfde1f8054c5 h1:7viLTiLAA2MtGKY+uf14j6TjfKvvGLAMj/qdm70jJuQ= -github.com/matrix-org/pinecone v0.0.0-20220308124038-cfde1f8054c5/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk= +github.com/matrix-org/pinecone v0.0.0-20220323142759-6fb077377278 h1:lRrvMMv7x1FIVW1mcBdU89lvbgAXKz6RyYR0VQTAr3E= +github.com/matrix-org/pinecone v0.0.0-20220323142759-6fb077377278/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= diff --git a/internal/sqlutil/partition_offset_table.go b/internal/sqlutil/partition_offset_table.go deleted file mode 100644 index e19a092f9..000000000 --- a/internal/sqlutil/partition_offset_table.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sqlutil - -import ( - "context" - "database/sql" - "strings" -) - -// A PartitionOffset is the offset into a partition of the input log. -type PartitionOffset struct { - // The ID of the partition. - Partition int32 - // The offset into the partition. - Offset int64 -} - -const partitionOffsetsSchema = ` --- The offsets that the server has processed up to. -CREATE TABLE IF NOT EXISTS ${prefix}_partition_offsets ( - -- The name of the topic. - topic TEXT NOT NULL, - -- The 32-bit partition ID - partition INTEGER NOT NULL, - -- The 64-bit offset. - partition_offset BIGINT NOT NULL, - UNIQUE (topic, partition) -); -` - -const selectPartitionOffsetsSQL = "" + - "SELECT partition, partition_offset FROM ${prefix}_partition_offsets WHERE topic = $1" - -const upsertPartitionOffsetsSQL = "" + - "INSERT INTO ${prefix}_partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" + - " ON CONFLICT (topic, partition)" + - " DO UPDATE SET partition_offset = $3" - -// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table. -type PartitionOffsetStatements struct { - db *sql.DB - writer Writer - selectPartitionOffsetsStmt *sql.Stmt - upsertPartitionOffsetStmt *sql.Stmt -} - -// Prepare converts the raw SQL statements into prepared statements. -// Takes a prefix to prepend to the table name used to store the partition offsets. -// This allows multiple components to share the same database schema. -func (s *PartitionOffsetStatements) Prepare(db *sql.DB, writer Writer, prefix string) (err error) { - s.db = db - s.writer = writer - _, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1)) - if err != nil { - return - } - if s.selectPartitionOffsetsStmt, err = db.Prepare( - strings.Replace(selectPartitionOffsetsSQL, "${prefix}", prefix, -1), - ); err != nil { - return - } - if s.upsertPartitionOffsetStmt, err = db.Prepare( - strings.Replace(upsertPartitionOffsetsSQL, "${prefix}", prefix, -1), - ); err != nil { - return - } - return -} - -// PartitionOffsets implements PartitionStorer -func (s *PartitionOffsetStatements) PartitionOffsets( - ctx context.Context, topic string, -) ([]PartitionOffset, error) { - return s.selectPartitionOffsets(ctx, topic) -} - -// SetPartitionOffset implements PartitionStorer -func (s *PartitionOffsetStatements) SetPartitionOffset( - ctx context.Context, topic string, partition int32, offset int64, -) error { - return s.upsertPartitionOffset(ctx, topic, partition, offset) -} - -// selectPartitionOffsets returns all the partition offsets for the given topic. -func (s *PartitionOffsetStatements) selectPartitionOffsets( - ctx context.Context, topic string, -) (results []PartitionOffset, err error) { - rows, err := s.selectPartitionOffsetsStmt.QueryContext(ctx, topic) - if err != nil { - return nil, err - } - defer checkNamedErr(rows.Close, &err) - for rows.Next() { - var offset PartitionOffset - if err = rows.Scan(&offset.Partition, &offset.Offset); err != nil { - return nil, err - } - results = append(results, offset) - } - err = rows.Err() - return results, err -} - -// checkNamedErr calls fn and overwrite err if it was nil and fn returned non-nil -func checkNamedErr(fn func() error, err *error) { - if e := fn(); e != nil && *err == nil { - *err = e - } -} - -// UpsertPartitionOffset updates or inserts the partition offset for the given topic. -func (s *PartitionOffsetStatements) upsertPartitionOffset( - ctx context.Context, topic string, partition int32, offset int64, -) error { - return s.writer.Do(s.db, nil, func(txn *sql.Tx) error { - stmt := TxStmt(txn, s.upsertPartitionOffsetStmt) - _, err := stmt.ExecContext(ctx, topic, partition, offset) - return err - }) -} diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index cc9d3a616..a05476f5f 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -223,6 +223,7 @@ func (a *KeyInternalAPI) QueryDeviceMessages(ctx context.Context, req *api.Query res.StreamID = maxStreamID } +// nolint:gocyclo func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse) { res.DeviceKeys = make(map[string]map[string]json.RawMessage) res.MasterKeys = make(map[string]gomatrixserverlib.CrossSigningKey) diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 8a0ce6178..c557dfbaa 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -39,14 +39,14 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { func NewInternalAPI( base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) db, err := storage.NewDatabase(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to key server database") } keyChangeProducer := &producers.KeyChange{ - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), + Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent)), JetStream: js, DB: db, } diff --git a/keyserver/storage/postgres/storage.go b/keyserver/storage/postgres/storage.go index b71cc1a7a..136986885 100644 --- a/keyserver/storage/postgres/storage.go +++ b/keyserver/storage/postgres/storage.go @@ -70,8 +70,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) CrossSigningKeysTable: csk, CrossSigningSigsTable: css, } - if err = d.PartitionOffsetStatements.Prepare(db, d.Writer, "keyserver"); err != nil { - return nil, err - } return d, nil } diff --git a/keyserver/storage/shared/storage.go b/keyserver/storage/shared/storage.go index 03215b93b..7ba0b3ea1 100644 --- a/keyserver/storage/shared/storage.go +++ b/keyserver/storage/shared/storage.go @@ -36,7 +36,6 @@ type Database struct { StaleDeviceListsTable tables.StaleDeviceLists CrossSigningKeysTable tables.CrossSigningKeys CrossSigningSigsTable tables.CrossSigningSigs - sqlutil.PartitionOffsetStatements } func (d *Database) ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error) { diff --git a/keyserver/storage/sqlite3/storage.go b/keyserver/storage/sqlite3/storage.go index 50ce00d05..0e0adceef 100644 --- a/keyserver/storage/sqlite3/storage.go +++ b/keyserver/storage/sqlite3/storage.go @@ -69,8 +69,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) CrossSigningKeysTable: csk, CrossSigningSigsTable: css, } - if err = d.PartitionOffsetStatements.Prepare(db, d.Writer, "keyserver"); err != nil { - return nil, err - } return d, nil } diff --git a/mediaapi/routing/download.go b/mediaapi/routing/download.go index 95eab5124..5f22a9461 100644 --- a/mediaapi/routing/download.go +++ b/mediaapi/routing/download.go @@ -722,8 +722,8 @@ func (r *downloadRequest) fetchRemoteFile( // create request for remote file resp, err := client.CreateMediaDownloadRequest(ctx, r.MediaMetadata.Origin, string(r.MediaMetadata.MediaID)) - if err != nil || resp.StatusCode != http.StatusOK { - if resp.StatusCode == http.StatusNotFound { + if err != nil || (resp != nil && resp.StatusCode != http.StatusOK) { + if resp != nil && resp.StatusCode == http.StatusNotFound { return "", false, fmt.Errorf("File with media ID %q does not exist on %s", r.MediaMetadata.MediaID, r.MediaMetadata.Origin) } return "", false, fmt.Errorf("file with media ID %q could not be downloaded from %s", r.MediaMetadata.MediaID, r.MediaMetadata.Origin) diff --git a/roomserver/api/alias.go b/roomserver/api/alias.go index be37333b6..baab27751 100644 --- a/roomserver/api/alias.go +++ b/roomserver/api/alias.go @@ -102,4 +102,3 @@ func (a AliasEvent) Valid() bool { } return a.Alias == "" || validateAliasRegex.MatchString(a.Alias) } - diff --git a/roomserver/api/alias_test.go b/roomserver/api/alias_test.go index 680493b7b..686f064bb 100644 --- a/roomserver/api/alias_test.go +++ b/roomserver/api/alias_test.go @@ -22,29 +22,29 @@ func TestAliasEvent_Valid(t *testing.T) { { name: "empty alias, invalid alt aliases", fields: fields{ - Alias: "", - AltAliases: []string{ "%not:valid.local"}, + Alias: "", + AltAliases: []string{"%not:valid.local"}, }, }, { name: "valid alias, invalid alt aliases", fields: fields{ - Alias: "#valid:test.local", - AltAliases: []string{ "%not:valid.local"}, + Alias: "#valid:test.local", + AltAliases: []string{"%not:valid.local"}, }, }, { name: "empty alias, invalid alt aliases", fields: fields{ - Alias: "", - AltAliases: []string{ "%not:valid.local"}, + Alias: "", + AltAliases: []string{"%not:valid.local"}, }, }, { name: "invalid alias", fields: fields{ - Alias: "%not:valid.local", - AltAliases: []string{ }, + Alias: "%not:valid.local", + AltAliases: []string{}, }, }, } diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go index 5c1c04f01..02fc4a5a7 100644 --- a/roomserver/internal/alias.go +++ b/roomserver/internal/alias.go @@ -173,12 +173,15 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( } if creatorID != request.UserID { - plEvent, err := r.DB.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomPowerLevels, "") + var plEvent *gomatrixserverlib.HeaderedEvent + var pls *gomatrixserverlib.PowerLevelContent + + plEvent, err = r.DB.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomPowerLevels, "") if err != nil { return fmt.Errorf("r.DB.GetStateEvent: %w", err) } - pls, err := plEvent.PowerLevels() + pls, err = plEvent.PowerLevels() if err != nil { return fmt.Errorf("plEvent.PowerLevels: %w", err) } @@ -223,7 +226,7 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( } stateRes := &api.QueryLatestEventsAndStateResponse{} - if err := helpers.QueryLatestEventsAndState(ctx, r.DB, &api.QueryLatestEventsAndStateRequest{RoomID: roomID, StateToFetch: eventsNeeded.Tuples()}, stateRes); err != nil { + if err = helpers.QueryLatestEventsAndState(ctx, r.DB, &api.QueryLatestEventsAndStateRequest{RoomID: roomID, StateToFetch: eventsNeeded.Tuples()}, stateRes); err != nil { return err } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 91001e418..f96cefcb3 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -90,6 +90,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA r.KeyRing = keyRing r.Inputer = &input.Inputer{ + Cfg: r.Cfg, ProcessContext: r.ProcessContext, DB: r.DB, InputRoomEventTopic: r.InputRoomEventTopic, diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index c6e354611..6a8ae6d00 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "sync" "time" @@ -29,6 +30,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" @@ -45,7 +47,35 @@ var keyContentFields = map[string]string{ "m.room.member": "membership", } +// Inputer is responsible for consuming from the roomserver input +// streams and processing the events. All input events are queued +// into a single NATS stream and the order is preserved strictly. +// The `room_id` message header will contain the room ID which will +// be used to assign the pending event to a per-room worker. +// +// The input API maintains an ephemeral headers-only consumer. It +// will speed through the stream working out which room IDs are +// pending and create durable consumers for them. The durable +// consumer will then be used for each room worker goroutine to +// fetch events one by one and process them. Each room having a +// durable consumer of its own means there is no head-of-line +// blocking between rooms. Filtering ensures that each durable +// consumer only receives events for the room it is interested in. +// +// The ephemeral consumer closely tracks the newest events. The +// per-room durable consumers will only progress through the stream +// as events are processed. +// +// A BC * -> positions of each consumer (* = ephemeral) +// ⌄ ⌄⌄ ⌄ +// ABAABCAABCAA -> newest (letter = subject for each message) +// +// In this example, A is still processing an event but has two +// pending events to process afterwards. Both B and C are caught +// up, so they will do nothing until a new event comes in for B +// or C. type Inputer struct { + Cfg *config.RoomServer ProcessContext *process.ProcessContext DB storage.Database NATSClient *nats.Conn @@ -57,147 +87,275 @@ type Inputer struct { ACLs *acls.ServerACLs InputRoomEventTopic string OutputRoomEventTopic string - workers sync.Map // room ID -> *phony.Inbox + workers sync.Map // room ID -> *worker Queryer *query.Queryer } -func (r *Inputer) workerForRoom(roomID string) *phony.Inbox { - inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) - return inbox.(*phony.Inbox) +type worker struct { + phony.Inbox + sync.Mutex + r *Inputer + roomID string + subscription *nats.Subscription } -// eventsInProgress is an in-memory map to keep a track of which events we have -// queued up for processing. If we get a redelivery from NATS and we still have -// the queued up item then we won't do anything with the redelivered message. If -// we've restarted Dendrite and now this map is empty then it means that we will -// reload pending work from NATS. -var eventsInProgress sync.Map +func (r *Inputer) startWorkerForRoom(roomID string) { + v, loaded := r.workers.LoadOrStore(roomID, &worker{ + r: r, + roomID: roomID, + }) + w := v.(*worker) + w.Lock() + defer w.Unlock() + if !loaded || w.subscription == nil { + consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) + subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID)) -// onMessage is called when a new event arrives in the roomserver input stream. + // Create the consumer. We do this as a specific step rather than + // letting PullSubscribe create it for us because we need the consumer + // to outlive the subscription. If we do it this way, we can Bind in the + // next step, and when we Unsubscribe, the consumer continues to live. If + // we leave PullSubscribe to create the durable consumer, Unsubscribe will + // delete it because it thinks it "owns" it, which in turn breaks the + // interest-based retention storage policy. + // If the durable consumer already exists, this is effectively a no-op. + // Another interesting tid-bit here: the ACK policy is set to "all" so that + // if we acknowledge a message, we also acknowledge everything that comes + // before it. This is necessary because otherwise our consumer will never + // acknowledge things we filtered out for other subjects and therefore they + // will linger around forever. + if _, err := w.r.JetStream.AddConsumer( + r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), + &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckAllPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: MaximumMissingProcessingTime + (time.Second * 10), + }, + ); err != nil { + logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) + return + } + + // Bind to our durable consumer. We want to receive all messages waiting + // for this subject and we want to manually acknowledge them, so that we + // can ensure they are only cleaned up when we are done processing them. + sub, err := w.r.JetStream.PullSubscribe( + subject, consumer, + nats.ManualAck(), + nats.DeliverAll(), + nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), + nats.Bind(r.InputRoomEventTopic, consumer), + ) + if err != nil { + logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) + return + } + + // Go and start pulling messages off the queue. + w.subscription = sub + w.Act(nil, w._next) + } +} + +// Start creates an ephemeral non-durable consumer on the roomserver +// input topic. It is configured to deliver us headers only because we +// don't actually care about the contents of the message at this point, +// we only care about the `room_id` field. Once a message arrives, we +// will look to see if we have a worker for that room which has its +// own consumer. If we don't, we'll start one. func (r *Inputer) Start() error { _, err := r.JetStream.Subscribe( - r.InputRoomEventTopic, - // We specifically don't use jetstream.WithJetStreamMessage here because we - // queue the task off to a room-specific queue and the ACK needs to be sent - // later, possibly with an error response to the inputter if synchronous. - func(msg *nats.Msg) { - roomID := msg.Header.Get("room_id") - var inputRoomEvent api.InputRoomEvent - if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { - _ = msg.Term() - return - } - - _ = msg.InProgress() - index := roomID + "\000" + inputRoomEvent.Event.EventID() - if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok { - // We're already waiting to deal with this event, so there's no - // point in queuing it up again. We've notified NATS that we're - // working on the message still, so that will have deferred the - // redelivery by a bit. - return - } - - roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() - r.workerForRoom(roomID).Act(nil, func() { - _ = msg.InProgress() // resets the acknowledgement wait timer - defer eventsInProgress.Delete(index) - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() - var errString string - if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil { - if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { - sentry.CaptureException(err) - } - logrus.WithError(err).WithFields(logrus.Fields{ - "room_id": roomID, - "event_id": inputRoomEvent.Event.EventID(), - "type": inputRoomEvent.Event.Type(), - }).Warn("Roomserver failed to process async event") - _ = msg.Term() - errString = err.Error() - } else { - _ = msg.Ack() - } - if replyTo := msg.Header.Get("sync"); replyTo != "" { - if err := r.NATSClient.Publish(replyTo, []byte(errString)); err != nil { - logrus.WithError(err).WithFields(logrus.Fields{ - "room_id": roomID, - "event_id": inputRoomEvent.Event.EventID(), - "type": inputRoomEvent.Event.Type(), - }).Warn("Roomserver failed to respond for sync event") - } - } - }) + "", // This is blank because we specified it in BindStream. + func(m *nats.Msg) { + roomID := m.Header.Get(jetstream.RoomID) + r.startWorkerForRoom(roomID) + _ = m.Ack() }, - // NATS wants to acknowledge automatically by default when the message is - // read from the stream, but we want to override that behaviour by making - // sure that we only acknowledge when we're happy we've done everything we - // can. This ensures we retry things when it makes sense to do so. - nats.ManualAck(), - // Use a durable named consumer. - r.Durable, - // If we've missed things in the stream, e.g. we restarted, then replay - // all of the queued messages that were waiting for us. + nats.HeadersOnly(), nats.DeliverAll(), - // Ensure that NATS doesn't try to resend us something that wasn't done - // within the period of time that we might still be processing it. - nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), - // It is recommended to disable this for pull consumers as per the docs: - // https://docs.nats.io/nats-concepts/jetstream/consumers#note-about-push-and-pull-consumers - nats.MaxAckPending(-1), + nats.AckAll(), + nats.BindStream(r.InputRoomEventTopic), ) return err } +// _next is called by the worker for the room. It must only be called +// by the actor embedded into the worker. +func (w *worker) _next() { + // Look up what the next event is that's waiting to be processed. + ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Minute) + defer cancel() + msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) + switch err { + case nil: + // Make sure that once we're done here, we queue up another call + // to _next in the inbox. + defer w.Act(nil, w._next) + + // If no error was reported, but we didn't get exactly one message, + // then skip over this and try again on the next iteration. + if len(msgs) != 1 { + return + } + + case context.DeadlineExceeded: + // The context exceeded, so we've been waiting for more than a + // minute for activity in this room. At this point we will shut + // down the subscriber to free up resources. It'll get started + // again if new activity happens. + if err = w.subscription.Unsubscribe(); err != nil { + logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) + } + w.Lock() + w.subscription = nil + w.Unlock() + return + + default: + // Something went wrong while trying to fetch the next event + // from the queue. In which case, we'll shut down the subscriber + // and wait to be notified about new room activity again. Maybe + // the problem will be corrected by then. + logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID) + if err = w.subscription.Unsubscribe(); err != nil { + logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) + } + w.Lock() + w.subscription = nil + w.Unlock() + return + } + + // Try to unmarshal the input room event. If the JSON unmarshalling + // fails then we'll terminate the message — this notifies NATS that + // we are done with the message and never want to see it again. + msg := msgs[0] + var inputRoomEvent api.InputRoomEvent + if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { + _ = msg.Term() + return + } + + roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc() + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + + // Process the room event. If something goes wrong then we'll tell + // NATS to terminate the message. We'll store the error result as + // a string, because we might want to return that to the caller if + // it was a synchronous request. + var errString string + if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil { + if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { + sentry.CaptureException(err) + } + logrus.WithError(err).WithFields(logrus.Fields{ + "room_id": w.roomID, + "event_id": inputRoomEvent.Event.EventID(), + "type": inputRoomEvent.Event.Type(), + }).Warn("Roomserver failed to process async event") + _ = msg.Term() + errString = err.Error() + } else { + _ = msg.Ack() + } + + // If it was a synchronous input request then the "sync" field + // will be present in the message. That means that someone is + // waiting for a response. The temporary inbox name is present in + // that field, so send back the error string (if any). If there + // was no error then we'll return a blank message, which means + // that everything was OK. + if replyTo := msg.Header.Get("sync"); replyTo != "" { + if err = w.r.NATSClient.Publish(replyTo, []byte(errString)); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "room_id": w.roomID, + "event_id": inputRoomEvent.Event.EventID(), + "type": inputRoomEvent.Event.Type(), + }).Warn("Roomserver failed to respond for sync event") + } + } +} + +// queueInputRoomEvents queues events into the roomserver input +// stream in NATS. +func (r *Inputer) queueInputRoomEvents( + ctx context.Context, + request *api.InputRoomEventsRequest, +) (replySub *nats.Subscription, err error) { + // If the request is synchronous then we need to create a + // temporary inbox to wait for responses on, and then create + // a subscription to it. If it's asynchronous then we won't + // bother, so these values will remain empty. + var replyTo string + if !request.Asynchronous { + replyTo = nats.NewInbox() + replySub, err = r.NATSClient.SubscribeSync(replyTo) + if err != nil { + return nil, fmt.Errorf("r.NATSClient.SubscribeSync: %w", err) + } + if replySub == nil { + // This shouldn't ever happen, but it doesn't hurt to check + // because we can potentially avoid a nil pointer panic later + // if it did for some reason. + return nil, fmt.Errorf("expected a subscription to the temporary inbox") + } + } + + // For each event, marshal the input room event and then + // send it into the input queue. + for _, e := range request.InputRoomEvents { + roomID := e.Event.RoomID() + subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID)) + msg := &nats.Msg{ + Subject: subj, + Header: nats.Header{}, + } + msg.Header.Set("room_id", roomID) + if replyTo != "" { + msg.Header.Set("sync", replyTo) + } + msg.Data, err = json.Marshal(e) + if err != nil { + return nil, fmt.Errorf("json.Marshal: %w", err) + } + if _, err = r.JetStream.PublishMsg(msg, nats.Context(ctx)); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "room_id": roomID, + "event_id": e.Event.EventID(), + "subj": subj, + }).Error("Roomserver failed to queue async event") + return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err) + } + } + return +} + // InputRoomEvents implements api.RoomserverInternalAPI func (r *Inputer) InputRoomEvents( ctx context.Context, request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) { - var replyTo string - var replySub *nats.Subscription - if !request.Asynchronous { - var err error - replyTo = nats.NewInbox() - replySub, err = r.NATSClient.SubscribeSync(replyTo) - if err != nil { - response.ErrMsg = err.Error() - return - } - } - - var err error - for _, e := range request.InputRoomEvents { - msg := &nats.Msg{ - Subject: r.InputRoomEventTopic, - Header: nats.Header{}, - Reply: replyTo, - } - roomID := e.Event.RoomID() - msg.Header.Set("room_id", roomID) - if replyTo != "" { - msg.Header.Set("sync", replyTo) - } - msg.Data, err = json.Marshal(e) - if err != nil { - response.ErrMsg = err.Error() - return - } - if _, err = r.JetStream.PublishMsg(msg); err != nil { - logrus.WithError(err).WithFields(logrus.Fields{ - "room_id": roomID, - "event_id": e.Event.EventID(), - }).Error("Roomserver failed to queue async event") - return - } - } - - if request.Asynchronous || replySub == nil { + // Queue up the event into the roomserver. + replySub, err := r.queueInputRoomEvents(ctx, request) + if err != nil { + response.ErrMsg = err.Error() return } + // If we aren't waiting for synchronous responses then we can + // give up here, there is nothing further to do. + if replySub == nil { + return + } + + // Otherwise, we'll want to sit and wait for the responses + // from the roomserver. There will be one response for every + // input we submitted. The last error value we receive will + // be the one returned as the error string. defer replySub.Drain() // nolint:errcheck for i := 0; i < len(request.InputRoomEvents); i++ { msg, err := replySub.NextMsgWithContext(ctx) @@ -207,7 +365,6 @@ func (r *Inputer) InputRoomEvents( } if len(msg.Data) > 0 { response.ErrMsg = string(msg.Data) - return } } } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 1992ac335..36e3c5269 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -50,12 +50,12 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to room server db") } - js, nc := jetstream.Prepare(&cfg.Matrix.JetStream) + js, nc := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) return internal.NewRoomserverAPI( base.ProcessContext, cfg, roomserverDB, js, nc, - cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent), - cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), + cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), + cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), base.Caches, perspectiveServerNames, ) } diff --git a/roomserver/storage/sqlite3/event_state_keys_table.go b/roomserver/storage/sqlite3/event_state_keys_table.go index 6ae3ab0c4..f97541f4a 100644 --- a/roomserver/storage/sqlite3/event_state_keys_table.go +++ b/roomserver/storage/sqlite3/event_state_keys_table.go @@ -151,7 +151,7 @@ func (s *eventStateKeyStatements) BulkSelectEventStateKey( if err != nil { return nil, err } - defer selectPrep.Close() + defer internal.CloseAndLogIfError(ctx, selectPrep, "selectPrep.close() failed") stmt := sqlutil.TxStmt(txn, selectPrep) rows, err := stmt.QueryContext(ctx, iEventStateKeyNIDs...) if err != nil { diff --git a/roomserver/storage/sqlite3/event_types_table.go b/roomserver/storage/sqlite3/event_types_table.go index 1fe4c91c2..c49cc509a 100644 --- a/roomserver/storage/sqlite3/event_types_table.go +++ b/roomserver/storage/sqlite3/event_types_table.go @@ -128,7 +128,7 @@ func (s *eventTypeStatements) BulkSelectEventTypeNID( if err != nil { return nil, err } - defer selectPrep.Close() + defer internal.CloseAndLogIfError(ctx, selectPrep, "selectPrep.close() failed") stmt := sqlutil.TxStmt(txn, selectPrep) /////////////// diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index 2ab1151d5..45b49e5cb 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -567,7 +567,7 @@ func (s *eventStatements) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, if err != nil { return 0, err } - defer sqlPrep.Close() + defer internal.CloseAndLogIfError(ctx, sqlPrep, "sqlPrep.close() failed") err = sqlutil.TxStmt(txn, sqlPrep).QueryRowContext(ctx, iEventIDs...).Scan(&result) if err != nil { return 0, fmt.Errorf("sqlutil.TxStmt.QueryRowContext: %w", err) @@ -583,7 +583,7 @@ func (s *eventStatements) SelectRoomNIDsForEventNIDs( if err != nil { return nil, err } - defer sqlPrep.Close() + defer internal.CloseAndLogIfError(ctx, sqlPrep, "sqlPrep.close() failed") sqlStmt := sqlutil.TxStmt(txn, sqlPrep) iEventNIDs := make([]interface{}, len(eventNIDs)) for i, v := range eventNIDs { diff --git a/setup/config/config_appservice.go b/setup/config/config_appservice.go index 4f6553f10..3f4e1c917 100644 --- a/setup/config/config_appservice.go +++ b/setup/config/config_appservice.go @@ -209,13 +209,14 @@ func setupRegexps(asAPI *AppServiceAPI, derived *Derived) (err error) { for _, appservice := range derived.ApplicationServices { // The sender_localpart can be considered an exclusive regex for a single user, so let's do that // to simplify the code - var senderUserIDSlice = []string{fmt.Sprintf("@%s:%s", appservice.SenderLocalpart, asAPI.Matrix.ServerName)} - usersSlice, found := appservice.NamespaceMap["users"] + users, found := appservice.NamespaceMap["users"] if !found { - usersSlice = []ApplicationServiceNamespace{} - appservice.NamespaceMap["users"] = usersSlice + users = []ApplicationServiceNamespace{} } - appendExclusiveNamespaceRegexs(&senderUserIDSlice, usersSlice) + appservice.NamespaceMap["users"] = append(users, ApplicationServiceNamespace{ + Exclusive: true, + Regex: regexp.QuoteMeta(fmt.Sprintf("@%s:%s", appservice.SenderLocalpart, asAPI.Matrix.ServerName)), + }) for key, namespaceSlice := range appservice.NamespaceMap { switch key { diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go index 9271cd8b4..b6a93d398 100644 --- a/setup/config/config_jetstream.go +++ b/setup/config/config_jetstream.go @@ -19,12 +19,12 @@ type JetStream struct { InMemory bool `yaml:"in_memory"` } -func (c *JetStream) TopicFor(name string) string { +func (c *JetStream) Prefixed(name string) string { return fmt.Sprintf("%s%s", c.TopicPrefix, name) } func (c *JetStream) Durable(name string) string { - return c.TopicFor(name) + return c.Prefixed(name) } func (c *JetStream) Defaults(generate bool) { diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 37597d584..748c191b0 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -1,11 +1,13 @@ package jetstream import ( + "reflect" "strings" "sync" "time" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" "github.com/sirupsen/logrus" natsserver "github.com/nats-io/nats-server/v2/server" @@ -15,7 +17,7 @@ import ( var natsServer *natsserver.Server var natsServerMutex sync.Mutex -func Prepare(cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { +func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { return setupNATS(cfg, nil) @@ -35,7 +37,16 @@ func Prepare(cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Co panic(err) } natsServer.ConfigureLogger() - go natsServer.Start() + go func() { + process.ComponentStarted() + natsServer.Start() + }() + go func() { + <-process.WaitForShutdown() + natsServer.Shutdown() + natsServer.WaitForShutdown() + process.ComponentFinished() + }() } natsServerMutex.Unlock() if !natsServer.ReadyForConnections(time.Second * 10) { @@ -65,14 +76,35 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream } for _, stream := range streams { // streams are defined in streams.go - name := cfg.TopicFor(stream.Name) + name := cfg.Prefixed(stream.Name) info, err := s.StreamInfo(name) if err != nil && err != natsclient.ErrStreamNotFound { logrus.WithError(err).Fatal("Unable to get stream info") } + subjects := stream.Subjects + if len(subjects) == 0 { + // By default we want each stream to listen for the subjects + // that are either an exact match for the stream name, or where + // the first part of the subject is the stream name. ">" is a + // wildcard in NATS for one or more subject tokens. In the case + // that the stream is called "Foo", this will match any message + // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc. + subjects = []string{name, name + ".>"} + } + if info != nil { + switch { + case !reflect.DeepEqual(info.Config.Subjects, subjects): + fallthrough + case info.Config.Retention != stream.Retention: + fallthrough + case info.Config.Storage != stream.Storage: + if err = s.DeleteStream(name); err != nil { + logrus.WithError(err).Fatal("Unable to delete stream") + } + info = nil + } + } if info == nil { - stream.Subjects = []string{name} - // If we're trying to keep everything in memory (e.g. unit tests) // then overwrite the storage policy. if cfg.InMemory { @@ -83,8 +115,9 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream // array, otherwise we end up with namespaces on namespaces. namespaced := *stream namespaced.Name = name + namespaced.Subjects = subjects if _, err = s.AddStream(&namespaced); err != nil { - logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream") + logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream") } } } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index aa3e95cb8..aa979924b 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -1,6 +1,8 @@ package jetstream import ( + "fmt" + "regexp" "time" "github.com/nats-io/nats.go" @@ -24,10 +26,20 @@ var ( OutputReadUpdate = "OutputReadUpdate" ) +var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+") + +func Tokenise(str string) string { + return safeCharacters.ReplaceAllString(str, "_") +} + +func InputRoomEventSubj(roomID string) string { + return fmt.Sprintf("%s.%s", InputRoomEvent, Tokenise(roomID)) +} + var streams = []*nats.StreamConfig{ { Name: InputRoomEvent, - Retention: nats.WorkQueuePolicy, + Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, { diff --git a/setup/monolith.go b/setup/monolith.go index 7dbd2eeaa..fa6d962c4 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -57,7 +57,7 @@ type Monolith struct { // AddAllPublicRoutes attaches all public paths to the given router func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux *mux.Router) { clientapi.AddPublicRoutes( - csMux, synapseMux, &m.Config.ClientAPI, m.AccountDB, + process, csMux, synapseMux, &m.Config.ClientAPI, m.AccountDB, m.FedClient, m.RoomserverAPI, m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), m.FederationAPI, m.UserAPI, m.KeyAPI, diff --git a/setup/mscs/msc2946/msc2946.go b/setup/mscs/msc2946/msc2946.go index 7fb043366..61520d50e 100644 --- a/setup/mscs/msc2946/msc2946.go +++ b/setup/mscs/msc2946/msc2946.go @@ -283,11 +283,7 @@ func (w *walker) walk() util.JSONResponse { if !roomExists { // attempt to query this room over federation, as either we've never heard of it before // or we've left it and hence are not authorised (but info may be exposed regardless) - fedRes, err := w.federatedRoomInfo(rv.roomID, rv.vias) - if err != nil { - util.GetLogger(w.ctx).WithError(err).WithField("room_id", rv.roomID).Errorf("failed to query federated spaces") - continue - } + fedRes := w.federatedRoomInfo(rv.roomID, rv.vias) if fedRes != nil { discoveredChildEvents = fedRes.Room.ChildrenState discoveredRooms = append(discoveredRooms, fedRes.Room) @@ -420,15 +416,15 @@ func (w *walker) publicRoomsChunk(roomID string) *gomatrixserverlib.PublicRoom { // federatedRoomInfo returns more of the spaces graph from another server. Returns nil if this was // unsuccessful. -func (w *walker) federatedRoomInfo(roomID string, vias []string) (*gomatrixserverlib.MSC2946SpacesResponse, error) { +func (w *walker) federatedRoomInfo(roomID string, vias []string) *gomatrixserverlib.MSC2946SpacesResponse { // only do federated requests for client requests if w.caller == nil { - return nil, nil + return nil } resp, ok := w.cache.GetSpaceSummary(roomID) if ok { util.GetLogger(w.ctx).Debugf("Returning cached response for %s", roomID) - return &resp, nil + return &resp } util.GetLogger(w.ctx).Debugf("Querying %s via %+v", roomID, vias) ctx := context.Background() @@ -455,9 +451,9 @@ func (w *walker) federatedRoomInfo(roomID string, vias []string) (*gomatrixserve } w.cache.StoreSpaceSummary(roomID, res) - return &res, nil + return &res } - return nil, nil + return nil } func (w *walker) roomExists(roomID string) bool { @@ -717,23 +713,6 @@ func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEve } } -func eventKey(event *gomatrixserverlib.MSC2946StrippedEvent) string { - return event.RoomID + "|" + event.Type + "|" + event.StateKey -} - -func spaceTargetStripped(event *gomatrixserverlib.MSC2946StrippedEvent) string { - if event.StateKey == "" { - return "" // no-op - } - switch event.Type { - case ConstSpaceParentEventType: - return event.StateKey - case ConstSpaceChildEventType: - return event.StateKey - } - return "" -} - func parseInt(intstr string, defaultVal int) int { i, err := strconv.ParseInt(intstr, 10, 32) if err != nil { diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index fcb7b5b1c..40c1cd3d6 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -61,7 +61,7 @@ func NewOutputClientDataConsumer( return &OutputClientDataConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), db: store, notifier: notifier, diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 4e4c61c67..ab79998ea 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -62,7 +62,7 @@ func NewOutputReceiptEventConsumer( return &OutputReceiptEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), db: store, notifier: notifier, diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index b0beef063..bdbe77352 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -57,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer( return &OutputSendToDeviceEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"), db: store, serverName: cfg.Matrix.ServerName, diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index cae5df8a8..c2828c7fc 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -56,7 +56,7 @@ func NewOutputTypingEventConsumer( return &OutputTypingEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"), eduCache: eduCache, notifier: notifier, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 640c505c2..5bdc0fad7 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -65,7 +65,7 @@ func NewOutputRoomEventConsumer( ctx: process.Context(), cfg: cfg, jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"), db: store, notifier: notifier, diff --git a/syncapi/consumers/userapi.go b/syncapi/consumers/userapi.go index a3b2dd53d..010fa7c8e 100644 --- a/syncapi/consumers/userapi.go +++ b/syncapi/consumers/userapi.go @@ -56,7 +56,7 @@ func NewOutputNotificationDataConsumer( ctx: process.Context(), jetstream: js, durable: cfg.Matrix.JetStream.Durable("SyncAPINotificationDataConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData), db: store, notifier: notifier, stream: stream, diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 9aef5db14..36ba3a3e6 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -41,7 +41,6 @@ type messagesReq struct { roomID string from *types.TopologyToken to *types.TopologyToken - fromStream *types.StreamingToken device *userapi.Device wasToProvided bool backwardOrdering bool @@ -50,7 +49,7 @@ type messagesReq struct { type messagesResp struct { Start string `json:"start"` - StartStream string `json:"start_stream,omitempty"` // NOTSPEC: so clients can hit /messages then immediately /sync with a latest sync token + StartStream string `json:"start_stream,omitempty"` // NOTSPEC: used by Cerulean, so clients can hit /messages then immediately /sync with a latest sync token End string `json:"end"` Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` State []gomatrixserverlib.ClientEvent `json:"state"` @@ -93,6 +92,7 @@ func OnIncomingMessagesRequest( // Pagination tokens. var fromStream *types.StreamingToken fromQuery := req.URL.Query().Get("from") + toQuery := req.URL.Query().Get("to") emptyFromSupplied := fromQuery == "" if emptyFromSupplied { // NOTSPEC: We will pretend they used the latest sync token if no ?from= was provided. @@ -101,18 +101,6 @@ func OnIncomingMessagesRequest( fromQuery = currPos.String() } - from, err := types.NewTopologyTokenFromString(fromQuery) - if err != nil { - fs, err2 := types.NewStreamTokenFromString(fromQuery) - fromStream = &fs - if err2 != nil { - return util.JSONResponse{ - Code: http.StatusBadRequest, - JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err2.Error()), - } - } - } - // Direction to return events from. dir := req.URL.Query().Get("dir") if dir != "b" && dir != "f" { @@ -125,16 +113,43 @@ func OnIncomingMessagesRequest( // to have one of the two accepted values (so dir == "f" <=> !backwardOrdering). backwardOrdering := (dir == "b") + from, err := types.NewTopologyTokenFromString(fromQuery) + if err != nil { + var streamToken types.StreamingToken + if streamToken, err = types.NewStreamTokenFromString(fromQuery); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err.Error()), + } + } else { + fromStream = &streamToken + from, err = db.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, backwardOrdering) + if err != nil { + logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken) + return jsonerror.InternalServerError() + } + } + } + // Pagination tokens. To is optional, and its default value depends on the // direction ("b" or "f"). var to types.TopologyToken wasToProvided := true - if s := req.URL.Query().Get("to"); len(s) > 0 { - to, err = types.NewTopologyTokenFromString(s) + if len(toQuery) > 0 { + to, err = types.NewTopologyTokenFromString(toQuery) if err != nil { - return util.JSONResponse{ - Code: http.StatusBadRequest, - JSON: jsonerror.InvalidArgumentValue("Invalid to parameter: " + err.Error()), + var streamToken types.StreamingToken + if streamToken, err = types.NewStreamTokenFromString(toQuery); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("Invalid to parameter: " + err.Error()), + } + } else { + to, err = db.StreamToTopologicalPosition(req.Context(), roomID, streamToken.PDUPosition, !backwardOrdering) + if err != nil { + logrus.WithError(err).Errorf("Failed to get topological position for streaming token %v", streamToken) + return jsonerror.InternalServerError() + } } } } else { @@ -168,7 +183,6 @@ func OnIncomingMessagesRequest( roomID: roomID, from: &from, to: &to, - fromStream: fromStream, wasToProvided: wasToProvided, filter: filter, backwardOrdering: backwardOrdering, @@ -215,7 +229,7 @@ func OnIncomingMessagesRequest( End: end.String(), State: state, } - if emptyFromSupplied { + if fromStream != nil { res.StartStream = fromStream.String() } @@ -251,17 +265,9 @@ func (r *messagesReq) retrieveEvents() ( eventFilter := r.filter // Retrieve the events from the local database. - var streamEvents []types.StreamEvent - if r.fromStream != nil { - toStream := r.to.StreamToken() - streamEvents, err = r.db.GetEventsInStreamingRange( - r.ctx, r.fromStream, &toStream, r.roomID, eventFilter, r.backwardOrdering, - ) - } else { - streamEvents, err = r.db.GetEventsInTopologicalRange( - r.ctx, r.from, r.to, r.roomID, eventFilter.Limit, r.backwardOrdering, - ) - } + streamEvents, err := r.db.GetEventsInTopologicalRange( + r.ctx, r.from, r.to, r.roomID, eventFilter.Limit, r.backwardOrdering, + ) if err != nil { err = fmt.Errorf("GetEventsInRange: %w", err) return diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index e44766338..b6ac5be19 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -103,8 +103,6 @@ type Database interface { // DeletePeek deletes all peeks for a given room by a given user // Returns an error if there was a problem communicating with the database. DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error) - // GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit. - GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, eventFilter *gomatrixserverlib.RoomEventFilter, backwardOrdering bool) (events []types.StreamEvent, err error) // GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit. GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) // EventPositionInTopology returns the depth and stream position of the given event. @@ -149,4 +147,6 @@ type Database interface { SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) + + StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error) } diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 26689f447..14af6a949 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -472,7 +472,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( if err != nil { return } - defer rows.Close() + defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed") for rows.Next() { var ( @@ -504,7 +504,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( if err != nil { return } - defer rows.Close() + defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed") for rows.Next() { var ( diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index 57774453c..626386ba0 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -51,7 +51,7 @@ const selectEventIDsInRangeASCSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 AND (" + "(topological_position > $2 AND topological_position < $3) OR" + - "(topological_position = $4 AND stream_position <= $5)" + + "(topological_position = $4 AND stream_position >= $5)" + ") ORDER BY topological_position ASC, stream_position ASC LIMIT $6" const selectEventIDsInRangeDESCSQL = "" + @@ -76,13 +76,21 @@ const selectMaxPositionInTopologySQL = "" + const deleteTopologyForRoomSQL = "" + "DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1" +const selectStreamToTopologicalPositionAscSQL = "" + + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;" + +const selectStreamToTopologicalPositionDescSQL = "" + + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;" + type outputRoomEventsTopologyStatements struct { - insertEventInTopologyStmt *sql.Stmt - selectEventIDsInRangeASCStmt *sql.Stmt - selectEventIDsInRangeDESCStmt *sql.Stmt - selectPositionInTopologyStmt *sql.Stmt - selectMaxPositionInTopologyStmt *sql.Stmt - deleteTopologyForRoomStmt *sql.Stmt + insertEventInTopologyStmt *sql.Stmt + selectEventIDsInRangeASCStmt *sql.Stmt + selectEventIDsInRangeDESCStmt *sql.Stmt + selectPositionInTopologyStmt *sql.Stmt + selectMaxPositionInTopologyStmt *sql.Stmt + deleteTopologyForRoomStmt *sql.Stmt + selectStreamToTopologicalPositionAscStmt *sql.Stmt + selectStreamToTopologicalPositionDescStmt *sql.Stmt } func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { @@ -109,6 +117,12 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { if s.deleteTopologyForRoomStmt, err = db.Prepare(deleteTopologyForRoomSQL); err != nil { return nil, err } + if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil { + return nil, err + } + if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil { + return nil, err + } return s, nil } @@ -170,6 +184,19 @@ func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology( return } +// SelectStreamToTopologicalPosition returns the closest position of a given event +// in the topology of the room it belongs to from the given stream position. +func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition( + ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, backwardOrdering bool, +) (topoPos types.StreamPosition, err error) { + if backwardOrdering { + err = s.selectStreamToTopologicalPositionDescStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos) + } else { + err = s.selectStreamToTopologicalPositionAscStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos) + } + return +} + func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology( ctx context.Context, txn *sql.Tx, roomID string, ) (pos types.StreamPosition, spos types.StreamPosition, err error) { diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 60fe5b54d..4e4b5c0bb 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -32,7 +32,6 @@ type SyncServerDatasource struct { shared.Database db *sql.DB writer sqlutil.Writer - sqlutil.PartitionOffsetStatements } // NewDatabase creates a new sync server database @@ -43,9 +42,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e return nil, err } d.writer = sqlutil.NewDummyWriter() - if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { - return nil, err - } accountData, err := NewPostgresAccountDataTable(d.db) if err != nil { return nil, err diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 2c166eef7..9a2dc0d44 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -155,37 +155,6 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixse return d.StreamEventsToEvents(nil, streamEvents), nil } -// GetEventsInStreamingRange retrieves all of the events on a given ordering using the -// given extremities and limit. -func (d *Database) GetEventsInStreamingRange( - ctx context.Context, - from, to *types.StreamingToken, - roomID string, eventFilter *gomatrixserverlib.RoomEventFilter, - backwardOrdering bool, -) (events []types.StreamEvent, err error) { - r := types.Range{ - From: from.PDUPosition, - To: to.PDUPosition, - Backwards: backwardOrdering, - } - if backwardOrdering { - // When using backward ordering, we want the most recent events first. - if events, _, err = d.OutputEvents.SelectRecentEvents( - ctx, nil, roomID, r, eventFilter, false, false, - ); err != nil { - return - } - } else { - // When using forward ordering, we want the least recent events first. - if events, err = d.OutputEvents.SelectEarlyEvents( - ctx, nil, roomID, r, eventFilter, - ); err != nil { - return - } - } - return events, err -} - func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { return d.CurrentRoomState.SelectJoinedUsers(ctx) } @@ -513,6 +482,26 @@ func (d *Database) EventPositionInTopology( return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil } +func (d *Database) StreamToTopologicalPosition( + ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool, +) (types.TopologyToken, error) { + topoPos, err := d.Topology.SelectStreamToTopologicalPosition(ctx, nil, roomID, streamPos, backwardOrdering) + switch { + case err == sql.ErrNoRows && backwardOrdering: // no events in range, going backward + return types.TopologyToken{PDUPosition: streamPos}, nil + case err == sql.ErrNoRows && !backwardOrdering: // no events in range, going forward + topoPos, streamPos, err = d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID) + if err != nil { + return types.TopologyToken{}, fmt.Errorf("d.Topology.SelectMaxPositionInTopology: %w", err) + } + return types.TopologyToken{Depth: topoPos, PDUPosition: streamPos}, nil + case err != nil: // some other error happened + return types.TopologyToken{}, fmt.Errorf("d.Topology.SelectStreamToTopologicalPosition: %w", err) + default: + return types.TopologyToken{Depth: topoPos, PDUPosition: streamPos}, nil + } +} + func (d *Database) GetFilter( ctx context.Context, localpart string, filterID string, ) (*gomatrixserverlib.Filter, error) { diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index b9115262e..acd959696 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -514,7 +514,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( if err != nil { return } - defer rows.Close() + defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed") for rows.Next() { var ( @@ -550,7 +550,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( if err != nil { return } - defer rows.Close() + defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed") for rows.Next() { var ( diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index d34b90500..b972ae285 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -47,7 +47,7 @@ const selectEventIDsInRangeASCSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 AND (" + "(topological_position > $2 AND topological_position < $3) OR" + - "(topological_position = $4 AND stream_position <= $5)" + + "(topological_position = $4 AND stream_position >= $5)" + ") ORDER BY topological_position ASC, stream_position ASC LIMIT $6" const selectEventIDsInRangeDESCSQL = "" + @@ -65,17 +65,22 @@ const selectMaxPositionInTopologySQL = "" + "SELECT MAX(topological_position), stream_position FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 ORDER BY stream_position DESC" -const deleteTopologyForRoomSQL = "" + - "DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1" +const selectStreamToTopologicalPositionAscSQL = "" + + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;" + +const selectStreamToTopologicalPositionDescSQL = "" + + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;" type outputRoomEventsTopologyStatements struct { - db *sql.DB - insertEventInTopologyStmt *sql.Stmt - selectEventIDsInRangeASCStmt *sql.Stmt - selectEventIDsInRangeDESCStmt *sql.Stmt - selectPositionInTopologyStmt *sql.Stmt - selectMaxPositionInTopologyStmt *sql.Stmt - deleteTopologyForRoomStmt *sql.Stmt + db *sql.DB + insertEventInTopologyStmt *sql.Stmt + selectEventIDsInRangeASCStmt *sql.Stmt + selectEventIDsInRangeDESCStmt *sql.Stmt + selectPositionInTopologyStmt *sql.Stmt + selectMaxPositionInTopologyStmt *sql.Stmt + deleteTopologyForRoomStmt *sql.Stmt + selectStreamToTopologicalPositionAscStmt *sql.Stmt + selectStreamToTopologicalPositionDescStmt *sql.Stmt } func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { @@ -101,7 +106,10 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { return nil, err } - if s.deleteTopologyForRoomStmt, err = db.Prepare(deleteTopologyForRoomSQL); err != nil { + if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil { + return nil, err + } + if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil { return nil, err } return s, nil @@ -163,6 +171,19 @@ func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology( return } +// SelectStreamToTopologicalPosition returns the closest position of a given event +// in the topology of the room it belongs to from the given stream position. +func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition( + ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, backwardOrdering bool, +) (topoPos types.StreamPosition, err error) { + if backwardOrdering { + err = s.selectStreamToTopologicalPositionDescStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos) + } else { + err = s.selectStreamToTopologicalPositionAscStmt.QueryRowContext(ctx, roomID, streamPos).Scan(&topoPos) + } + return +} + func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology( ctx context.Context, txn *sql.Tx, roomID string, ) (pos types.StreamPosition, spos types.StreamPosition, err error) { diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index f5ae9fdd7..cb7e3b46f 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -28,9 +28,8 @@ import ( // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { shared.Database - db *sql.DB - writer sqlutil.Writer - sqlutil.PartitionOffsetStatements + db *sql.DB + writer sqlutil.Writer streamID streamIDStatements } @@ -50,9 +49,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e } func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (err error) { - if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { - return err - } if err = d.streamID.prepare(d.db); err != nil { return err } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 9d1078f5f..640b7dc31 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -87,6 +87,8 @@ type Topology interface { SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error) // DeleteTopologyForRoom removes all topological information for a room. This should only be done when removing the room entirely. DeleteTopologyForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error) + // SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room. + SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, forward bool) (topoPos types.StreamPosition, err error) } type CurrentRoomState interface { diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 41635c911..ed8118bfc 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -49,7 +49,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { @@ -67,18 +67,18 @@ func AddPublicRoutes( userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ JetStream: js, - Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent), + Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent), } userAPIReadUpdateProducer := &producers.UserAPIReadProducer{ JetStream: js, - Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate), + Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate), } _ = userAPIReadUpdateProducer keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), + process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), js, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) diff --git a/sytest-whitelist b/sytest-whitelist index c3a4ad92f..40bf5afac 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -239,7 +239,6 @@ Inbound federation can query room alias directory Outbound federation can query v2 /send_join Inbound federation can receive v2 /send_join Message history can be paginated -Getting messages going forward is limited for a departed room (SPEC-216) Backfill works correctly with history visibility set to joined Guest user cannot call /events globally Guest users can join guest_access rooms diff --git a/userapi/consumers/syncapi_readupdate.go b/userapi/consumers/syncapi_readupdate.go index 2e58020b4..067f93330 100644 --- a/userapi/consumers/syncapi_readupdate.go +++ b/userapi/consumers/syncapi_readupdate.go @@ -47,7 +47,7 @@ func NewOutputReadUpdateConsumer( db: store, ServerName: cfg.Matrix.ServerName, durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate), pgClient: pgClient, userAPI: userAPI, syncProducer: syncProducer, diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go index 110813274..da3cd3937 100644 --- a/userapi/consumers/syncapi_streamevent.go +++ b/userapi/consumers/syncapi_streamevent.go @@ -54,7 +54,7 @@ func NewOutputStreamEventConsumer( jetstream: js, db: store, durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent), pgClient: pgClient, userAPI: userAPI, rsAPI: rsAPI, diff --git a/userapi/storage/shared/storage.go b/userapi/storage/shared/storage.go index febf03221..72ae96ecc 100644 --- a/userapi/storage/shared/storage.go +++ b/userapi/storage/shared/storage.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "strconv" + "strings" "time" "github.com/matrix-org/gomatrixserverlib" @@ -298,7 +299,12 @@ func (d *Database) CheckAccountAvailability(ctx context.Context, localpart strin // Returns sql.ErrNoRows if no account exists which matches the given localpart. func (d *Database) GetAccountByLocalpart(ctx context.Context, localpart string, ) (*api.Account, error) { - return d.Accounts.SelectAccountByLocalpart(ctx, localpart) + // try to get the account with lowercase localpart (majority) + acc, err := d.Accounts.SelectAccountByLocalpart(ctx, strings.ToLower(localpart)) + if err == sql.ErrNoRows { + acc, err = d.Accounts.SelectAccountByLocalpart(ctx, localpart) // try with localpart as passed by the request + } + return acc, err } // SearchProfiles returns all profiles where the provided localpart or display name diff --git a/userapi/userapi.go b/userapi/userapi.go index 1e4ebcb2e..e91ce3a7a 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -46,7 +46,7 @@ func NewInternalAPI( appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI, rsAPI rsapi.RoomserverInternalAPI, pgClient pushgateway.Client, ) api.UserInternalAPI { - js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) syncProducer := producers.NewSyncAPI( db, js, @@ -54,8 +54,8 @@ func NewInternalAPI( // it's handled by clientapi, and hence uses its topic. When user // API handles it for all account data, we can remove it from // here. - cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), - cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData), + cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), + cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData), ) userAPI := &internal.UserInternalAPI{