Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/hisvismessages

This commit is contained in:
Till Faelligen 2022-08-05 14:30:15 +02:00
commit 33d964b95c
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
57 changed files with 294 additions and 245 deletions

View file

@ -21,8 +21,7 @@ As of October 2020 (current [progress below](#progress)), Dendrite has now enter
This does not mean: This does not mean:
- Dendrite is bug-free. It has not yet been battle-tested in the real world and so will be error prone initially. - Dendrite is bug-free. It has not yet been battle-tested in the real world and so will be error prone initially.
- All of the CS/Federation APIs are implemented. We are tracking progress via a script called 'Are We Synapse Yet?'. In particular, - Dendrite is feature-complete. There may be client or federation APIs that are not implemented.
presence and push notifications are entirely missing from Dendrite. See [CHANGES.md](CHANGES.md) for updates.
- Dendrite is ready for massive homeserver deployments. You cannot shard each microservice, only run each one on a different machine. - Dendrite is ready for massive homeserver deployments. You cannot shard each microservice, only run each one on a different machine.
Currently, we expect Dendrite to function well for small (10s/100s of users) homeserver deployments as well as P2P Matrix nodes in-browser or on mobile devices. Currently, we expect Dendrite to function well for small (10s/100s of users) homeserver deployments as well as P2P Matrix nodes in-browser or on mobile devices.
@ -36,6 +35,9 @@ If you have further questions, please take a look at [our FAQ](docs/FAQ.md) or j
## Requirements ## Requirements
See the [Planning your Installation](https://matrix-org.github.io/dendrite/installation/planning) page for
more information on requirements.
To build Dendrite, you will need Go 1.18 or later. To build Dendrite, you will need Go 1.18 or later.
For a usable federating Dendrite deployment, you will also need: For a usable federating Dendrite deployment, you will also need:
@ -83,11 +85,11 @@ $ ./bin/create-account --config dendrite.yaml -username alice
Then point your favourite Matrix client at `http://localhost:8008` or `https://localhost:8448`. Then point your favourite Matrix client at `http://localhost:8008` or `https://localhost:8448`.
## <a id="progress"></a> Progress ## Progress
We use a script called Are We Synapse Yet which checks Sytest compliance rates. Sytest is a black-box homeserver We use a script called Are We Synapse Yet which checks Sytest compliance rates. Sytest is a black-box homeserver
test rig with around 900 tests. The script works out how many of these tests are passing on Dendrite and it test rig with around 900 tests. The script works out how many of these tests are passing on Dendrite and it
updates with CI. As of April 2022 we're at around 83% CS API coverage and 95% Federation coverage, though check updates with CI. As of August 2022 we're at around 83% CS API coverage and 95% Federation coverage, though check
CI for the latest numbers. In practice, this means you can communicate locally and via federation with Synapse CI for the latest numbers. In practice, this means you can communicate locally and via federation with Synapse
servers such as matrix.org reasonably well, although there are still some missing features (like Search). servers such as matrix.org reasonably well, although there are still some missing features (like Search).
@ -119,53 +121,8 @@ We would be grateful for any help on issues marked as
all have related Sytests which need to pass in order for the issue to be closed. Once you've written your all have related Sytests which need to pass in order for the issue to be closed. Once you've written your
code, you can quickly run Sytest to ensure that the test names are now passing. code, you can quickly run Sytest to ensure that the test names are now passing.
For example, if the test `Local device key changes get to remote servers` was marked as failing, find the If you're new to the project, see our
test file (e.g via `grep` or via the [Contributing page](https://matrix-org.github.io/dendrite/development/contributing) to get up to speed, then
[CI log output](https://buildkite.com/matrix-dot-org/dendrite/builds/2826#39cff5de-e032-4ad0-ad26-f819e6919c42)
it's `tests/50federation/40devicelists.pl` ) then to run Sytest:
```
docker run --rm --name sytest
-v "/Users/kegan/github/sytest:/sytest"
-v "/Users/kegan/github/dendrite:/src"
-v "/Users/kegan/logs:/logs"
-v "/Users/kegan/go/:/gopath"
-e "POSTGRES=1" -e "DENDRITE_TRACE_HTTP=1"
matrixdotorg/sytest-dendrite:latest tests/50federation/40devicelists.pl
```
See [sytest.md](docs/sytest.md) for the full description of these flags.
You can try running sytest outside of docker for faster runs, but the dependencies can be temperamental
and we recommend using docker where possible.
```
cd sytest
export PERL5LIB=$HOME/lib/perl5
export PERL_MB_OPT=--install_base=$HOME
export PERL_MM_OPT=INSTALL_BASE=$HOME
./install-deps.pl
./run-tests.pl -I Dendrite::Monolith -d $PATH_TO_DENDRITE_BINARIES
```
Sometimes Sytest is testing the wrong thing or is flakey, so it will need to be patched.
Ask on `#dendrite-dev:matrix.org` if you think this is the case for you and we'll be happy to help.
If you're new to the project, see [CONTRIBUTING.md](docs/CONTRIBUTING.md) to get up to speed then
look for [Good First Issues](https://github.com/matrix-org/dendrite/labels/good%20first%20issue). If you're look for [Good First Issues](https://github.com/matrix-org/dendrite/labels/good%20first%20issue). If you're
familiar with the project, look for [Help Wanted](https://github.com/matrix-org/dendrite/labels/help-wanted) familiar with the project, look for [Help Wanted](https://github.com/matrix-org/dendrite/labels/help-wanted)
issues. issues.
## Hardware requirements
Dendrite in Monolith + SQLite works in a range of environments including iOS and in-browser via WASM.
For small homeserver installations joined on ~10s rooms on matrix.org with ~100s of users in those rooms, including some
encrypted rooms:
- Memory: uses around 100MB of RAM, with peaks at around 200MB.
- Disk space: After a few months of usage, the database grew to around 2GB (in Monolith mode).
- CPU: Brief spikes when processing events, typically idles at 1% CPU.
This means Dendrite should comfortably work on things like Raspberry Pis.

View file

@ -13,4 +13,4 @@ go build ./cmd/...
./build/scripts/find-lint.sh ./build/scripts/find-lint.sh
echo "Testing..." echo "Testing..."
go test -v ./... go test --race -v ./...

View file

@ -18,7 +18,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
@ -34,7 +33,7 @@ import (
// If the final return value is non-nil, an error occurred and the cleanup function // If the final return value is non-nil, an error occurred and the cleanup function
// is nil. // is nil.
func LoginFromJSONReader(ctx context.Context, r io.Reader, useraccountAPI uapi.UserLoginAPI, userAPI UserInternalAPIForLogin, cfg *config.ClientAPI) (*Login, LoginCleanupFunc, *util.JSONResponse) { func LoginFromJSONReader(ctx context.Context, r io.Reader, useraccountAPI uapi.UserLoginAPI, userAPI UserInternalAPIForLogin, cfg *config.ClientAPI) (*Login, LoginCleanupFunc, *util.JSONResponse) {
reqBytes, err := ioutil.ReadAll(r) reqBytes, err := io.ReadAll(r)
if err != nil { if err != nil {
err := &util.JSONResponse{ err := &util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,

View file

@ -16,7 +16,7 @@ package httputil
import ( import (
"encoding/json" "encoding/json"
"io/ioutil" "io"
"net/http" "net/http"
"unicode/utf8" "unicode/utf8"
@ -29,9 +29,9 @@ import (
func UnmarshalJSONRequest(req *http.Request, iface interface{}) *util.JSONResponse { func UnmarshalJSONRequest(req *http.Request, iface interface{}) *util.JSONResponse {
// encoding/json allows invalid utf-8, matrix does not // encoding/json allows invalid utf-8, matrix does not
// https://matrix.org/docs/spec/client_server/r0.6.1#api-standards // https://matrix.org/docs/spec/client_server/r0.6.1#api-standards
body, err := ioutil.ReadAll(req.Body) body, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("ioutil.ReadAll failed") util.GetLogger(req.Context()).WithError(err).Error("io.ReadAll failed")
resp := jsonerror.InternalServerError() resp := jsonerror.InternalServerError()
return &resp return &resp
} }

View file

@ -17,7 +17,7 @@ package routing
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
@ -101,9 +101,9 @@ func SaveAccountData(
} }
} }
body, err := ioutil.ReadAll(req.Body) body, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("ioutil.ReadAll failed") util.GetLogger(req.Context()).WithError(err).Error("io.ReadAll failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -1,7 +1,7 @@
package routing package routing
import ( import (
"io/ioutil" "io"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth"
@ -20,7 +20,7 @@ func Deactivate(
) util.JSONResponse { ) util.JSONResponse {
ctx := req.Context() ctx := req.Context()
defer req.Body.Close() // nolint:errcheck defer req.Body.Close() // nolint:errcheck
bodyBytes, err := ioutil.ReadAll(req.Body) bodyBytes, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,

View file

@ -15,7 +15,7 @@
package routing package routing
import ( import (
"io/ioutil" "io"
"net" "net"
"net/http" "net/http"
@ -175,7 +175,7 @@ func DeleteDeviceById(
}() }()
ctx := req.Context() ctx := req.Context()
defer req.Body.Close() // nolint:errcheck defer req.Body.Close() // nolint:errcheck
bodyBytes, err := ioutil.ReadAll(req.Body) bodyBytes, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,

View file

@ -23,13 +23,14 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/api" "github.com/matrix-org/dendrite/clientapi/api"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
) )
var ( var (
@ -196,14 +197,14 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO
// sliceInto returns a subslice of `slice` which honours the since/limit values given. // sliceInto returns a subslice of `slice` which honours the since/limit values given.
// //
// 0 1 2 3 4 5 6 index // 0 1 2 3 4 5 6 index
// [A, B, C, D, E, F, G] slice // [A, B, C, D, E, F, G] slice
// //
// limit=3 => A,B,C (prev='', next='3') // limit=3 => A,B,C (prev='', next='3')
// limit=3&since=3 => D,E,F (prev='0', next='6') // limit=3&since=3 => D,E,F (prev='0', next='6')
// limit=3&since=6 => G (prev='3', next='') // limit=3&since=6 => G (prev='3', next='')
// //
// A value of '-1' for prev/next indicates no position. // A value of '-1' for prev/next indicates no position.
func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (subset []gomatrixserverlib.PublicRoom, prev, next int) { func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (subset []gomatrixserverlib.PublicRoom, prev, next int) {
prev = -1 prev = -1
next = -1 next = -1

View file

@ -19,7 +19,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"net/url" "net/url"
"regexp" "regexp"
@ -371,7 +371,7 @@ func validateRecaptcha(
// Grab the body of the response from the captcha server // Grab the body of the response from the captcha server
var r recaptchaResponse var r recaptchaResponse
body, err := ioutil.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return &util.JSONResponse{ return &util.JSONResponse{
Code: http.StatusGatewayTimeout, Code: http.StatusGatewayTimeout,
@ -539,7 +539,7 @@ func Register(
cfg *config.ClientAPI, cfg *config.ClientAPI,
) util.JSONResponse { ) util.JSONResponse {
defer req.Body.Close() // nolint: errcheck defer req.Body.Close() // nolint: errcheck
reqBody, err := ioutil.ReadAll(req.Body) reqBody, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,

View file

@ -2,7 +2,7 @@ package routing
import ( import (
"bytes" "bytes"
"io/ioutil" "io"
"testing" "testing"
"github.com/patrickmn/go-cache" "github.com/patrickmn/go-cache"
@ -13,7 +13,7 @@ func TestSharedSecretRegister(t *testing.T) {
jsonStr := []byte(`{"admin":false,"mac":"f1ba8d37123866fd659b40de4bad9b0f8965c565","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice"}`) jsonStr := []byte(`{"admin":false,"mac":"f1ba8d37123866fd659b40de4bad9b0f8965c565","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice"}`)
sharedSecret := "dendritetest" sharedSecret := "dendritetest"
req, err := NewSharedSecretRegistrationRequest(ioutil.NopCloser(bytes.NewBuffer(jsonStr))) req, err := NewSharedSecretRegistrationRequest(io.NopCloser(bytes.NewBuffer(jsonStr)))
if err != nil { if err != nil {
t.Fatalf("failed to read request: %s", err) t.Fatalf("failed to read request: %s", err)
} }

View file

@ -63,9 +63,10 @@ var sendEventDuration = prometheus.NewHistogramVec(
) )
// SendEvent implements: // SendEvent implements:
// /rooms/{roomID}/send/{eventType} //
// /rooms/{roomID}/send/{eventType}/{txnID} // /rooms/{roomID}/send/{eventType}
// /rooms/{roomID}/state/{eventType}/{stateKey} // /rooms/{roomID}/send/{eventType}/{txnID}
// /rooms/{roomID}/state/{eventType}/{stateKey}
func SendEvent( func SendEvent(
req *http.Request, req *http.Request,
device *userapi.Device, device *userapi.Device,

View file

@ -38,8 +38,9 @@ type threePIDsResponse struct {
} }
// RequestEmailToken implements: // RequestEmailToken implements:
// POST /account/3pid/email/requestToken //
// POST /register/email/requestToken // POST /account/3pid/email/requestToken
// POST /register/email/requestToken
func RequestEmailToken(req *http.Request, threePIDAPI api.ClientUserAPI, cfg *config.ClientAPI) util.JSONResponse { func RequestEmailToken(req *http.Request, threePIDAPI api.ClientUserAPI, cfg *config.ClientAPI) util.JSONResponse {
var body threepid.EmailAssociationRequest var body threepid.EmailAssociationRequest
if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil { if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil {

View file

@ -22,15 +22,17 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/util"
) )
// RequestTurnServer implements: // RequestTurnServer implements:
// GET /voip/turnServer //
// GET /voip/turnServer
func RequestTurnServer(req *http.Request, device *api.Device, cfg *config.ClientAPI) util.JSONResponse { func RequestTurnServer(req *http.Request, device *api.Device, cfg *config.ClientAPI) util.JSONResponse {
turnConfig := cfg.TURN turnConfig := cfg.TURN

View file

@ -19,7 +19,6 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"regexp" "regexp"
"strings" "strings"
@ -157,7 +156,7 @@ func main() {
func getPassword(password, pwdFile string, pwdStdin bool, r io.Reader) (string, error) { func getPassword(password, pwdFile string, pwdStdin bool, r io.Reader) (string, error) {
// read password from file // read password from file
if pwdFile != "" { if pwdFile != "" {
pw, err := ioutil.ReadFile(pwdFile) pw, err := os.ReadFile(pwdFile)
if err != nil { if err != nil {
return "", fmt.Errorf("Unable to read password from file: %v", err) return "", fmt.Errorf("Unable to read password from file: %v", err)
} }
@ -166,7 +165,7 @@ func getPassword(password, pwdFile string, pwdStdin bool, r io.Reader) (string,
// read password from stdin // read password from stdin
if pwdStdin { if pwdStdin {
data, err := ioutil.ReadAll(r) data, err := io.ReadAll(r)
if err != nil { if err != nil {
return "", fmt.Errorf("Unable to read password from stdin: %v", err) return "", fmt.Errorf("Unable to read password from stdin: %v", err)
} }

View file

@ -21,7 +21,6 @@ import (
"encoding/hex" "encoding/hex"
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"net" "net"
"net/http" "net/http"
"os" "os"
@ -76,11 +75,11 @@ func main() {
if pk, sk, err = ed25519.GenerateKey(nil); err != nil { if pk, sk, err = ed25519.GenerateKey(nil); err != nil {
panic(err) panic(err)
} }
if err = ioutil.WriteFile(keyfile, sk, 0644); err != nil { if err = os.WriteFile(keyfile, sk, 0644); err != nil {
panic(err) panic(err)
} }
} else if err == nil { } else if err == nil {
if sk, err = ioutil.ReadFile(keyfile); err != nil { if sk, err = os.ReadFile(keyfile); err != nil {
panic(err) panic(err)
} }
if len(sk) != ed25519.PrivateKeySize { if len(sk) != ed25519.PrivateKeySize {

View file

@ -20,7 +20,6 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"net" "net"
"os" "os"
@ -69,7 +68,7 @@ func Setup(instanceName, storageDirectory, peerURI string) (*Node, error) {
yggfile := fmt.Sprintf("%s/%s-yggdrasil.conf", storageDirectory, instanceName) yggfile := fmt.Sprintf("%s/%s-yggdrasil.conf", storageDirectory, instanceName)
if _, err := os.Stat(yggfile); !os.IsNotExist(err) { if _, err := os.Stat(yggfile); !os.IsNotExist(err) {
yggconf, e := ioutil.ReadFile(yggfile) yggconf, e := os.ReadFile(yggfile)
if e != nil { if e != nil {
panic(err) panic(err)
} }
@ -88,7 +87,7 @@ func Setup(instanceName, storageDirectory, peerURI string) (*Node, error) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
if e := ioutil.WriteFile(yggfile, j, 0600); e != nil { if e := os.WriteFile(yggfile, j, 0600); e != nil {
n.log.Printf("Couldn't write private key to file '%s': %s\n", yggfile, e) n.log.Printf("Couldn't write private key to file '%s': %s\n", yggfile, e)
} }

View file

@ -6,7 +6,7 @@ import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"io/ioutil" "io"
"log" "log"
"net/http" "net/http"
"os" "os"
@ -47,7 +47,7 @@ const HEAD = "HEAD"
// We cannot use the dockerfile associated with the repo with each version sadly due to changes in // We cannot use the dockerfile associated with the repo with each version sadly due to changes in
// Docker versions. Specifically, earlier Dendrite versions are incompatible with newer Docker clients // Docker versions. Specifically, earlier Dendrite versions are incompatible with newer Docker clients
// due to the error: // due to the error:
// When using COPY with more than one source file, the destination must be a directory and end with a / // When using COPY with more than one source file, the destination must be a directory and end with a /
// We need to run a postgres anyway, so use the dockerfile associated with Complement instead. // We need to run a postgres anyway, so use the dockerfile associated with Complement instead.
const Dockerfile = `FROM golang:1.18-stretch as build const Dockerfile = `FROM golang:1.18-stretch as build
RUN apt-get update && apt-get install -y postgresql RUN apt-get update && apt-get install -y postgresql
@ -95,7 +95,9 @@ CMD /build/run_dendrite.sh `
const dendriteUpgradeTestLabel = "dendrite_upgrade_test" const dendriteUpgradeTestLabel = "dendrite_upgrade_test"
// downloadArchive downloads an arbitrary github archive of the form: // downloadArchive downloads an arbitrary github archive of the form:
// https://github.com/matrix-org/dendrite/archive/v0.3.11.tar.gz //
// https://github.com/matrix-org/dendrite/archive/v0.3.11.tar.gz
//
// and re-tarballs it without the top-level directory which contains branch information. It inserts // and re-tarballs it without the top-level directory which contains branch information. It inserts
// the contents of `dockerfile` as a root file `Dockerfile` in the re-tarballed directory such that // the contents of `dockerfile` as a root file `Dockerfile` in the re-tarballed directory such that
// you can directly feed the retarballed archive to `ImageBuild` to have it run said dockerfile. // you can directly feed the retarballed archive to `ImageBuild` to have it run said dockerfile.
@ -126,7 +128,7 @@ func downloadArchive(cli *http.Client, tmpDir, archiveURL string, dockerfile []b
return nil, err return nil, err
} }
// add top level Dockerfile // add top level Dockerfile
err = ioutil.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm) err = os.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to inject /Dockerfile: %w", err) return nil, fmt.Errorf("failed to inject /Dockerfile: %w", err)
} }
@ -148,7 +150,7 @@ func buildDendrite(httpClient *http.Client, dockerClient *client.Client, tmpDir,
if branchOrTagName == HEAD && *flagHead != "" { if branchOrTagName == HEAD && *flagHead != "" {
log.Printf("%s: Using %s as HEAD", branchOrTagName, *flagHead) log.Printf("%s: Using %s as HEAD", branchOrTagName, *flagHead)
// add top level Dockerfile // add top level Dockerfile
err = ioutil.WriteFile(path.Join(*flagHead, "Dockerfile"), []byte(Dockerfile), os.ModePerm) err = os.WriteFile(path.Join(*flagHead, "Dockerfile"), []byte(Dockerfile), os.ModePerm)
if err != nil { if err != nil {
return "", fmt.Errorf("custom HEAD: failed to inject /Dockerfile: %w", err) return "", fmt.Errorf("custom HEAD: failed to inject /Dockerfile: %w", err)
} }
@ -386,7 +388,7 @@ func runImage(dockerClient *client.Client, volumeName, version, imageID string)
}) })
// ignore errors when cannot get logs, it's just for debugging anyways // ignore errors when cannot get logs, it's just for debugging anyways
if err == nil { if err == nil {
logbody, err := ioutil.ReadAll(logs) logbody, err := io.ReadAll(logs)
if err == nil { if err == nil {
log.Printf("Container logs:\n\n%s\n\n", string(logbody)) log.Printf("Container logs:\n\n%s\n\n", string(logbody))
} }

View file

@ -18,9 +18,9 @@ type user struct {
} }
// runTests performs the following operations: // runTests performs the following operations:
// - register alice and bob with branch name muxed into the localpart // - register alice and bob with branch name muxed into the localpart
// - create a DM room for the 2 users and exchange messages // - create a DM room for the 2 users and exchange messages
// - create/join a public #global room and exchange messages // - create/join a public #global room and exchange messages
func runTests(baseURL, branchName string) error { func runTests(baseURL, branchName string) error {
// register 2 users // register 2 users
users := []user{ users := []user{

View file

@ -9,7 +9,6 @@ import (
"encoding/pem" "encoding/pem"
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"net/url" "net/url"
"os" "os"
@ -30,7 +29,7 @@ func main() {
os.Exit(1) os.Exit(1)
} }
data, err := ioutil.ReadFile(*requestKey) data, err := os.ReadFile(*requestKey)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View file

@ -178,13 +178,16 @@ client_api:
# TURN server information that this homeserver should send to clients. # TURN server information that this homeserver should send to clients.
turn: turn:
turn_user_lifetime: "" turn_user_lifetime: "5m"
turn_uris: turn_uris:
# - turn:turn.server.org?transport=udp # - turn:turn.server.org?transport=udp
# - turn:turn.server.org?transport=tcp # - turn:turn.server.org?transport=tcp
turn_shared_secret: "" turn_shared_secret: ""
turn_username: "" # If your TURN server requires static credentials, then you will need to enter
turn_password: "" # them here instead of supplying a shared secret. Note that these credentials
# will be visible to clients!
# turn_username: ""
# turn_password: ""
# Settings for rate-limited endpoints. Rate limiting kicks in after the threshold # Settings for rate-limited endpoints. Rate limiting kicks in after the threshold
# number of "slots" have been taken by requests from a specific host. Each "slot" # number of "slots" have been taken by requests from a specific host. Each "slot"

View file

@ -181,13 +181,16 @@ client_api:
# TURN server information that this homeserver should send to clients. # TURN server information that this homeserver should send to clients.
turn: turn:
turn_user_lifetime: "" turn_user_lifetime: "5m"
turn_uris: turn_uris:
# - turn:turn.server.org?transport=udp # - turn:turn.server.org?transport=udp
# - turn:turn.server.org?transport=tcp # - turn:turn.server.org?transport=tcp
turn_shared_secret: "" turn_shared_secret: ""
turn_username: "" # If your TURN server requires static credentials, then you will need to enter
turn_password: "" # them here instead of supplying a shared secret. Note that these credentials
# will be visible to clients!
# turn_username: ""
# turn_password: ""
# Settings for rate-limited endpoints. Rate limiting kicks in after the threshold # Settings for rate-limited endpoints. Rate limiting kicks in after the threshold
# number of "slots" have been taken by requests from a specific host. Each "slot" # number of "slots" have been taken by requests from a specific host. Each "slot"

View file

@ -64,7 +64,7 @@ comment. Please avoid doing this if you can.
We also have unit tests which we run via: We also have unit tests which we run via:
```bash ```bash
go test ./... go test --race ./...
``` ```
In general, we like submissions that come with tests. Anything that proves that the In general, we like submissions that come with tests. Anything that proves that the

View file

@ -208,9 +208,11 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rew
// joinedHostsAtEvent works out a list of matrix servers that were joined to // joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event (including peeking ones) // the room at the event (including peeking ones)
// It is important to use the state at the event for sending messages because: // It is important to use the state at the event for sending messages because:
// 1) We shouldn't send messages to servers that weren't in the room. //
// 2) If a server is kicked from the rooms it should still be told about the // 1. We shouldn't send messages to servers that weren't in the room.
// kick event, // 2. If a server is kicked from the rooms it should still be told about the
// kick event.
//
// Usually the list can be calculated locally, but sometimes it will need fetch // Usually the list can be calculated locally, but sometimes it will need fetch
// events from the room server. // events from the room server.
// Returns an error if there was a problem talking to the room server. // Returns an error if there was a problem talking to the room server.

View file

@ -6,7 +6,7 @@ import (
"crypto/ed25519" "crypto/ed25519"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"os" "os"
"testing" "testing"
@ -66,7 +66,7 @@ func TestMain(m *testing.M) {
s.cache = caching.NewRistrettoCache(8*1024*1024, time.Hour, false) s.cache = caching.NewRistrettoCache(8*1024*1024, time.Hour, false)
// Create a temporary directory for JetStream. // Create a temporary directory for JetStream.
d, err := ioutil.TempDir("./", "jetstream*") d, err := os.MkdirTemp("./", "jetstream*")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -136,7 +136,7 @@ func (m *MockRoundTripper) RoundTrip(req *http.Request) (res *http.Response, err
// And respond. // And respond.
res = &http.Response{ res = &http.Response{
StatusCode: 200, StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader(body)), Body: io.NopCloser(bytes.NewReader(body)),
} }
return return
} }

View file

@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -48,6 +49,7 @@ func (f *fedRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *rsapi.Que
// TODO: This struct isn't generic, only works for TestFederationAPIJoinThenKeyUpdate // TODO: This struct isn't generic, only works for TestFederationAPIJoinThenKeyUpdate
type fedClient struct { type fedClient struct {
fedClientMutex sync.Mutex
api.FederationClient api.FederationClient
allowJoins []*test.Room allowJoins []*test.Room
keys map[gomatrixserverlib.ServerName]struct { keys map[gomatrixserverlib.ServerName]struct {
@ -59,6 +61,8 @@ type fedClient struct {
} }
func (f *fedClient) GetServerKeys(ctx context.Context, matrixServer gomatrixserverlib.ServerName) (gomatrixserverlib.ServerKeys, error) { func (f *fedClient) GetServerKeys(ctx context.Context, matrixServer gomatrixserverlib.ServerName) (gomatrixserverlib.ServerKeys, error) {
f.fedClientMutex.Lock()
defer f.fedClientMutex.Unlock()
fmt.Println("GetServerKeys:", matrixServer) fmt.Println("GetServerKeys:", matrixServer)
var keys gomatrixserverlib.ServerKeys var keys gomatrixserverlib.ServerKeys
var keyID gomatrixserverlib.KeyID var keyID gomatrixserverlib.KeyID
@ -122,6 +126,8 @@ func (f *fedClient) MakeJoin(ctx context.Context, s gomatrixserverlib.ServerName
return return
} }
func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res gomatrixserverlib.RespSendJoin, err error) { func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res gomatrixserverlib.RespSendJoin, err error) {
f.fedClientMutex.Lock()
defer f.fedClientMutex.Unlock()
for _, r := range f.allowJoins { for _, r := range f.allowJoins {
if r.ID == event.RoomID() { if r.ID == event.RoomID() {
r.InsertEvent(f.t, event.Headered(r.Version)) r.InsertEvent(f.t, event.Headered(r.Version))
@ -134,6 +140,8 @@ func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName
} }
func (f *fedClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) { func (f *fedClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) {
f.fedClientMutex.Lock()
defer f.fedClientMutex.Unlock()
for _, edu := range t.EDUs { for _, edu := range t.EDUs {
if edu.Type == gomatrixserverlib.MDeviceListUpdate { if edu.Type == gomatrixserverlib.MDeviceListUpdate {
f.sentTxn = true f.sentTxn = true
@ -242,6 +250,8 @@ func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) {
testrig.MustPublishMsgs(t, jsctx, msg) testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
fc.fedClientMutex.Lock()
defer fc.fedClientMutex.Unlock()
if !fc.sentTxn { if !fc.sentTxn {
t.Fatalf("did not send device list update") t.Fatalf("did not send device list update")
} }

View file

@ -158,7 +158,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
oqs.queuesMutex.Lock() oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock() defer oqs.queuesMutex.Unlock()
oq, ok := oqs.queues[destination] oq, ok := oqs.queues[destination]
if !ok || oq != nil { if !ok || oq == nil {
destinationQueueTotal.Inc() destinationQueueTotal.Inc()
oq = &destinationQueue{ oq = &destinationQueue{
queues: oqs, queues: oqs,

View file

@ -21,13 +21,14 @@ import (
"sort" "sort"
"time" "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
// MakeJoin implements the /make_join API // MakeJoin implements the /make_join API
@ -435,13 +436,13 @@ func SendJoin(
// a restricted room join. If the room version does not support restricted // a restricted room join. If the room version does not support restricted
// joins then this function returns with no side effects. This returns three // joins then this function returns with no side effects. This returns three
// values: // values:
// * an optional JSON response body (i.e. M_UNABLE_TO_AUTHORISE_JOIN) which // - an optional JSON response body (i.e. M_UNABLE_TO_AUTHORISE_JOIN) which
// should always be sent back to the client if one is specified // should always be sent back to the client if one is specified
// * a user ID of an authorising user, typically a user that has power to // - a user ID of an authorising user, typically a user that has power to
// issue invites in the room, if one has been found // issue invites in the room, if one has been found
// * an error if there was a problem finding out if this was allowable, // - an error if there was a problem finding out if this was allowable,
// like if the room version isn't known or a problem happened talking to // like if the room version isn't known or a problem happened talking to
// the roomserver // the roomserver
func checkRestrictedJoin( func checkRestrictedJoin(
httpReq *http.Request, httpReq *http.Request,
rsAPI api.FederationRoomserverAPI, rsAPI api.FederationRoomserverAPI,

View file

@ -14,6 +14,7 @@ type lazyLoadingCacheKey struct {
type LazyLoadCache interface { type LazyLoadCache interface {
StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string)
IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool)
InvalidateLazyLoadedUser(device *userapi.Device, roomID, userID string)
} }
func (c Caches) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) { func (c Caches) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) {
@ -33,3 +34,12 @@ func (c Caches) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID st
TargetUserID: userID, TargetUserID: userID,
}) })
} }
func (c Caches) InvalidateLazyLoadedUser(device *userapi.Device, roomID, userID string) {
c.LazyLoading.Unset(lazyLoadingCacheKey{
UserID: device.UserID,
DeviceID: device.ID,
RoomID: roomID,
TargetUserID: userID,
})
}

View file

@ -27,9 +27,10 @@ import (
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dugong" "github.com/matrix-org/dugong"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/setup/config"
) )
type utcFormatter struct { type utcFormatter struct {
@ -145,7 +146,7 @@ func setupFileHook(hook config.LogrusHook, level logrus.Level, componentName str
}) })
} }
//CloseAndLogIfError Closes io.Closer and logs the error if any // CloseAndLogIfError Closes io.Closer and logs the error if any
func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string) { func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string) {
if closer == nil { if closer == nil {
return return

View file

@ -18,7 +18,7 @@
package internal package internal
import ( import (
"io/ioutil" "io"
"log/syslog" "log/syslog"
"github.com/MFAshby/stdemuxerhook" "github.com/MFAshby/stdemuxerhook"
@ -63,7 +63,7 @@ func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
setupStdLogHook(logrus.InfoLevel) setupStdLogHook(logrus.InfoLevel)
} }
// Hooks are now configured for stdout/err, so throw away the default logger output // Hooks are now configured for stdout/err, so throw away the default logger output
logrus.SetOutput(ioutil.Discard) logrus.SetOutput(io.Discard)
} }
func checkSyslogHookParams(params map[string]interface{}) { func checkSyslogHookParams(params map[string]interface{}) {

View file

@ -22,12 +22,13 @@ import (
"sync" "sync"
"time" "time"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api"
) )
var ( var (
@ -66,12 +67,14 @@ func init() {
// - We don't have unbounded growth in proportion to the number of servers (this is more important in a P2P world where // - We don't have unbounded growth in proportion to the number of servers (this is more important in a P2P world where
// we have many many servers) // we have many many servers)
// - We can adjust concurrency (at the cost of memory usage) by tuning N, to accommodate mobile devices vs servers. // - We can adjust concurrency (at the cost of memory usage) by tuning N, to accommodate mobile devices vs servers.
//
// The downsides are that: // The downsides are that:
// - Query requests can get queued behind other servers if they hash to the same worker, even if there are other free // - Query requests can get queued behind other servers if they hash to the same worker, even if there are other free
// workers elsewhere. Whilst suboptimal, provided we cap how long a single request can last (e.g using context timeouts) // workers elsewhere. Whilst suboptimal, provided we cap how long a single request can last (e.g using context timeouts)
// we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests // we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests
// (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse // (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse
// than being stuck behind foo.bar // than being stuck behind foo.bar
//
// In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is // In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is
// set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried. // set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried.
type DeviceListUpdater struct { type DeviceListUpdater struct {

View file

@ -18,7 +18,7 @@ import (
"context" "context"
"crypto/ed25519" "crypto/ed25519"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"net/url" "net/url"
"reflect" "reflect"
@ -27,8 +27,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/keyserver/api"
) )
var ( var (
@ -202,7 +203,7 @@ func TestUpdateNoPrevID(t *testing.T) {
} }
return &http.Response{ return &http.Response{
StatusCode: 200, StatusCode: 200,
Body: ioutil.NopCloser(strings.NewReader(` Body: io.NopCloser(strings.NewReader(`
{ {
"user_id": "` + remoteUserID + `", "user_id": "` + remoteUserID + `",
"stream_id": 5, "stream_id": 5,
@ -317,7 +318,7 @@ func TestDebounce(t *testing.T) {
// now send the response over federation // now send the response over federation
fedCh <- &http.Response{ fedCh <- &http.Response{
StatusCode: 200, StatusCode: 200,
Body: ioutil.NopCloser(strings.NewReader(` Body: io.NopCloser(strings.NewReader(`
{ {
"user_id": "` + userID + `", "user_id": "` + userID + `",
"stream_id": 5, "stream_id": 5,

View file

@ -3,6 +3,7 @@ package storage_test
import ( import (
"context" "context"
"reflect" "reflect"
"sync"
"testing" "testing"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
@ -103,6 +104,9 @@ func TestKeyChangesUpperLimit(t *testing.T) {
}) })
} }
var dbLock sync.Mutex
var deviceArray = []string{"AAA", "another_device"}
// The purpose of this test is to make sure that the storage layer is generating sequential stream IDs per user, // The purpose of this test is to make sure that the storage layer is generating sequential stream IDs per user,
// and that they are returned correctly when querying for device keys. // and that they are returned correctly when querying for device keys.
func TestDeviceKeysStreamIDGeneration(t *testing.T) { func TestDeviceKeysStreamIDGeneration(t *testing.T) {
@ -169,8 +173,11 @@ func TestDeviceKeysStreamIDGeneration(t *testing.T) {
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID) t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID)
} }
dbLock.Lock()
defer dbLock.Unlock()
// Querying for device keys returns the latest stream IDs // Querying for device keys returns the latest stream IDs
msgs, err = db.DeviceKeysForUser(ctx, alice, []string{"AAA", "another_device"}, false) msgs, err = db.DeviceKeysForUser(ctx, alice, deviceArray, false)
if err != nil { if err != nil {
t.Fatalf("DeviceKeysForUser returned error: %s", err) t.Fatalf("DeviceKeysForUser returned error: %s", err)
} }

View file

@ -21,7 +21,6 @@ import (
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@ -180,7 +179,7 @@ func createTempDir(baseDirectory config.Path) (types.Path, error) {
if err := os.MkdirAll(baseTmpDir, 0770); err != nil { if err := os.MkdirAll(baseTmpDir, 0770); err != nil {
return "", fmt.Errorf("failed to create base temp dir: %w", err) return "", fmt.Errorf("failed to create base temp dir: %w", err)
} }
tmpDir, err := ioutil.TempDir(baseTmpDir, "") tmpDir, err := os.MkdirTemp(baseTmpDir, "")
if err != nil { if err != nil {
return "", fmt.Errorf("failed to create temp dir: %w", err) return "", fmt.Errorf("failed to create temp dir: %w", err)
} }

View file

@ -19,7 +19,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"mime" "mime"
"net/http" "net/http"
"net/url" "net/url"
@ -695,7 +694,7 @@ func (r *downloadRequest) GetContentLengthAndReader(contentLengthHeader string,
// We successfully parsed the Content-Length, so we'll return a limited // We successfully parsed the Content-Length, so we'll return a limited
// reader that restricts us to reading only up to this size. // reader that restricts us to reading only up to this size.
reader = ioutil.NopCloser(io.LimitReader(*body, parsedLength)) reader = io.NopCloser(io.LimitReader(*body, parsedLength))
contentLength = parsedLength contentLength = parsedLength
} else { } else {
// Content-Length header is missing. If we have a maximum file size // Content-Length header is missing. If we have a maximum file size
@ -704,7 +703,7 @@ func (r *downloadRequest) GetContentLengthAndReader(contentLengthHeader string,
// ultimately it will get rewritten later when the temp file is written // ultimately it will get rewritten later when the temp file is written
// to disk. // to disk.
if maxFileSizeBytes > 0 { if maxFileSizeBytes > 0 {
reader = ioutil.NopCloser(io.LimitReader(*body, int64(maxFileSizeBytes))) reader = io.NopCloser(io.LimitReader(*body, int64(maxFileSizeBytes)))
} }
contentLength = 0 contentLength = 0
} }

View file

@ -25,6 +25,11 @@ import (
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
fedapi "github.com/matrix-org/dendrite/federationapi/api" fedapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -35,10 +40,6 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
) )
// Inputer is responsible for consuming from the roomserver input // Inputer is responsible for consuming from the roomserver input
@ -60,9 +61,9 @@ import (
// per-room durable consumers will only progress through the stream // per-room durable consumers will only progress through the stream
// as events are processed. // as events are processed.
// //
// A BC * -> positions of each consumer (* = ephemeral) // A BC * -> positions of each consumer (* = ephemeral)
// ⌄ ⌄⌄ ⌄ // ⌄ ⌄⌄ ⌄
// ABAABCAABCAA -> newest (letter = subject for each message) // ABAABCAABCAA -> newest (letter = subject for each message)
// //
// In this example, A is still processing an event but has two // In this example, A is still processing an event but has two
// pending events to process afterwards. Both B and C are caught // pending events to process afterwards. Both B and C are caught

View file

@ -20,32 +20,32 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
) )
// updateLatestEvents updates the list of latest events for this room in the database and writes the // updateLatestEvents updates the list of latest events for this room in the database and writes the
// event to the output log. // event to the output log.
// The latest events are the events that aren't referenced by another event in the database: // The latest events are the events that aren't referenced by another event in the database:
// //
// Time goes down the page. 1 is the m.room.create event (root). // Time goes down the page. 1 is the m.room.create event (root).
// // 1 After storing 1 the latest events are {1}
// 1 After storing 1 the latest events are {1} // | After storing 2 the latest events are {2}
// | After storing 2 the latest events are {2} // 2 After storing 3 the latest events are {3}
// 2 After storing 3 the latest events are {3} // / \ After storing 4 the latest events are {3,4}
// / \ After storing 4 the latest events are {3,4} // 3 4 After storing 5 the latest events are {5,4}
// 3 4 After storing 5 the latest events are {5,4} // | | After storing 6 the latest events are {5,6}
// | | After storing 6 the latest events are {5,6} // 5 6 <--- latest After storing 7 the latest events are {6,7}
// 5 6 <--- latest After storing 7 the latest events are {6,7} // |
// | // 7 <----- latest
// 7 <----- latest
// //
// Can only be called once at a time // Can only be called once at a time
func (r *Inputer) updateLatestEvents( func (r *Inputer) updateLatestEvents(

View file

@ -19,6 +19,10 @@ import (
"fmt" "fmt"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -26,9 +30,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/internal/helpers"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
// the max number of servers to backfill from per request. If this is too low we may fail to backfill when // the max number of servers to backfill from per request. If this is too low we may fail to backfill when
@ -522,8 +523,9 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion,
} }
// joinEventsFromHistoryVisibility returns all CURRENTLY joined members if our server can read the room history // joinEventsFromHistoryVisibility returns all CURRENTLY joined members if our server can read the room history
//
// TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just // TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just
// pull all events and then filter by that table. // pull all events and then filter by that table.
func joinEventsFromHistoryVisibility( func joinEventsFromHistoryVisibility(
ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry, ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry,
thisServer gomatrixserverlib.ServerName) ([]types.Event, error) { thisServer gomatrixserverlib.ServerName) ([]types.Event, error) {

View file

@ -5,9 +5,10 @@ import (
"database/sql" "database/sql"
"errors" "errors"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/roomserver/types"
) )
var OptimisationNotSupportedError = errors.New("optimisation not supported") var OptimisationNotSupportedError = errors.New("optimisation not supported")
@ -178,7 +179,7 @@ type StrippedEvent struct {
} }
// ExtractContentValue from the given state event. For example, given an m.room.name event with: // ExtractContentValue from the given state event. For example, given an m.room.name event with:
// content: { name: "Foo" } // content: { name: "Foo" }
// this returns "Foo". // this returns "Foo".
func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string { func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string {
content := ev.Content() content := ev.Content()

View file

@ -17,7 +17,7 @@ main() {
if [ -d ../sytest ]; then if [ -d ../sytest ]; then
local tmpdir local tmpdir
tmpdir="$(mktemp -d --tmpdir run-systest.XXXXXXXXXX)" tmpdir="$(mktemp -d -t run-systest.XXXXXXXXXX)"
trap "rm -r '$tmpdir'" EXIT trap "rm -r '$tmpdir'" EXIT
if [ -z "$DISABLE_BUILDING_SYTEST" ]; then if [ -z "$DISABLE_BUILDING_SYTEST" ]; then

View file

@ -19,8 +19,8 @@ import (
"encoding/pem" "encoding/pem"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/url" "net/url"
"os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings" "strings"
@ -191,7 +191,7 @@ type ConfigErrors []string
// Load a yaml config file for a server run as multiple processes or as a monolith. // Load a yaml config file for a server run as multiple processes or as a monolith.
// Checks the config to ensure that it is valid. // Checks the config to ensure that it is valid.
func Load(configPath string, monolith bool) (*Dendrite, error) { func Load(configPath string, monolith bool) (*Dendrite, error) {
configData, err := ioutil.ReadFile(configPath) configData, err := os.ReadFile(configPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -199,9 +199,9 @@ func Load(configPath string, monolith bool) (*Dendrite, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Pass the current working directory and ioutil.ReadFile so that they can // Pass the current working directory and os.ReadFile so that they can
// be mocked in the tests // be mocked in the tests
return loadConfig(basePath, configData, ioutil.ReadFile, monolith) return loadConfig(basePath, configData, os.ReadFile, monolith)
} }
func loadConfig( func loadConfig(
@ -530,7 +530,7 @@ func (config *Dendrite) KeyServerURL() string {
// SetupTracing configures the opentracing using the supplied configuration. // SetupTracing configures the opentracing using the supplied configuration.
func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) { func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) {
if !config.Tracing.Enabled { if !config.Tracing.Enabled {
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil return io.NopCloser(bytes.NewReader([]byte{})), nil
} }
return config.Tracing.Jaeger.InitGlobalTracer( return config.Tracing.Jaeger.InitGlobalTracer(
serviceName, serviceName,

View file

@ -16,7 +16,7 @@ package config
import ( import (
"fmt" "fmt"
"io/ioutil" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings" "strings"
@ -181,7 +181,7 @@ func loadAppServices(config *AppServiceAPI, derived *Derived) error {
} }
// Read the application service's config file // Read the application service's config file
configData, err := ioutil.ReadFile(absPath) configData, err := os.ReadFile(absPath)
if err != nil { if err != nil {
return err return err
} }

View file

@ -14,16 +14,16 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
natsserver "github.com/nats-io/nats-server/v2/server" natsserver "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go" natsclient "github.com/nats-io/nats.go"
) )
type NATSInstance struct { type NATSInstance struct {
*natsserver.Server *natsserver.Server
sync.Mutex
} }
func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) { var natsLock sync.Mutex
func DeleteAllStreams(js natsclient.JetStreamContext, cfg *config.JetStream) {
for _, stream := range streams { // streams are defined in streams.go for _, stream := range streams { // streams are defined in streams.go
name := cfg.Prefixed(stream.Name) name := cfg.Prefixed(stream.Name)
_ = js.DeleteStream(name) _ = js.DeleteStream(name)
@ -31,11 +31,12 @@ func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
} }
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
natsLock.Lock()
defer natsLock.Unlock()
// check if we need an in-process NATS Server // check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 { if len(cfg.Addresses) != 0 {
return setupNATS(process, cfg, nil) return setupNATS(process, cfg, nil)
} }
s.Lock()
if s.Server == nil { if s.Server == nil {
var err error var err error
s.Server, err = natsserver.NewServer(&natsserver.Options{ s.Server, err = natsserver.NewServer(&natsserver.Options{
@ -63,7 +64,6 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
process.ComponentFinished() process.ComponentFinished()
}() }()
} }
s.Unlock()
if !s.ReadyForConnections(time.Second * 10) { if !s.ReadyForConnections(time.Second * 10) {
logrus.Fatalln("NATS did not start in time") logrus.Fatalln("NATS did not start in time")
} }
@ -77,9 +77,9 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
if nc == nil { if nc == nil {
var err error var err error
opts := []nats.Option{} opts := []natsclient.Option{}
if cfg.DisableTLSValidation { if cfg.DisableTLSValidation {
opts = append(opts, nats.Secure(&tls.Config{ opts = append(opts, natsclient.Secure(&tls.Config{
InsecureSkipVerify: true, InsecureSkipVerify: true,
})) }))
} }

View file

@ -7,7 +7,7 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"io/ioutil" "io"
"net/http" "net/http"
"sort" "sort"
"strings" "strings"
@ -15,6 +15,8 @@ import (
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/internal/hooks"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
roomserver "github.com/matrix-org/dendrite/roomserver/api" roomserver "github.com/matrix-org/dendrite/roomserver/api"
@ -22,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/mscs/msc2836" "github.com/matrix-org/dendrite/setup/mscs/msc2836"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
) )
var ( var (
@ -32,15 +33,17 @@ var (
) )
// Basic sanity check of MSC2836 logic. Injects a thread that looks like: // Basic sanity check of MSC2836 logic. Injects a thread that looks like:
// A //
// | // A
// B // |
// / \ // B
// C D // / \
// /|\ // C D
// E F G // /|\
// | // E F G
// H // |
// H
//
// And makes sure POST /event_relationships works with various parameters // And makes sure POST /event_relationships works with various parameters
func TestMSC2836(t *testing.T) { func TestMSC2836(t *testing.T) {
alice := "@alice:localhost" alice := "@alice:localhost"
@ -425,12 +428,12 @@ func postRelationships(t *testing.T, expectCode int, accessToken string, req *ms
t.Fatalf("failed to do request: %s", err) t.Fatalf("failed to do request: %s", err)
} }
if res.StatusCode != expectCode { if res.StatusCode != expectCode {
body, _ := ioutil.ReadAll(res.Body) body, _ := io.ReadAll(res.Body)
t.Fatalf("wrong response code, got %d want %d - body: %s", res.StatusCode, expectCode, string(body)) t.Fatalf("wrong response code, got %d want %d - body: %s", res.StatusCode, expectCode, string(body))
} }
if res.StatusCode == 200 { if res.StatusCode == 200 {
var result msc2836.EventRelationshipResponse var result msc2836.EventRelationshipResponse
body, err := ioutil.ReadAll(res.Body) body, err := io.ReadAll(res.Body)
if err != nil { if err != nil {
t.Fatalf("response 200 OK but failed to read response body: %s", err) t.Fatalf("response 200 OK but failed to read response body: %s", err)
} }

View file

@ -6,12 +6,13 @@ import (
"sort" "sort"
"testing" "testing"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
) )
var ( var (
@ -364,13 +365,14 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
// tests that joining/leaving the SAME room puts users in `left` if the final state is leave. // tests that joining/leaving the SAME room puts users in `left` if the final state is leave.
// NB: Consider the case: // NB: Consider the case:
// - Alice and Bob are in a room. // - Alice and Bob are in a room.
// - Alice goes offline, Charlie joins, sends encrypted messages then leaves the room. // - Alice goes offline, Charlie joins, sends encrypted messages then leaves the room.
// - Alice comes back online. Technically nothing has changed in the set of users between those two points in time, // - Alice comes back online. Technically nothing has changed in the set of users between those two points in time,
// it's still just (Alice,Bob) but then we won't be tracking Charlie -- is this okay though? It's device keys // it's still just (Alice,Bob) but then we won't be tracking Charlie -- is this okay though? It's device keys
// which are only relevant when actively sending events I think? And if Alice does need the keys she knows // which are only relevant when actively sending events I think? And if Alice does need the keys she knows
// charlie's (user_id, device_id) so can just hit /keys/query - no need to keep updated about it because she // charlie's (user_id, device_id) so can just hit /keys/query - no need to keep updated about it because she
// doesn't share any rooms with him. // doesn't share any rooms with him.
//
// Ergo, we put them in `left` as it is simpler. // Ergo, we put them in `left` as it is simpler.
func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
newShareUser := "@berta:localhost" newShareUser := "@berta:localhost"

View file

@ -16,16 +16,17 @@ package routing
import ( import (
"encoding/json" "encoding/json"
"io/ioutil" "io"
"net/http" "net/http"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/tidwall/gjson"
) )
// GetFilter implements GET /_matrix/client/r0/user/{userId}/filter/{filterId} // GetFilter implements GET /_matrix/client/r0/user/{userId}/filter/{filterId}
@ -65,7 +66,9 @@ type filterResponse struct {
FilterID string `json:"filter_id"` FilterID string `json:"filter_id"`
} }
//PutFilter implements POST /_matrix/client/r0/user/{userId}/filter // PutFilter implements
//
// POST /_matrix/client/r0/user/{userId}/filter
func PutFilter( func PutFilter(
req *http.Request, device *api.Device, syncDB storage.Database, userID string, req *http.Request, device *api.Device, syncDB storage.Database, userID string,
) util.JSONResponse { ) util.JSONResponse {
@ -85,7 +88,7 @@ func PutFilter(
var filter gomatrixserverlib.Filter var filter gomatrixserverlib.Filter
defer req.Body.Close() // nolint:errcheck defer req.Body.Close() // nolint:errcheck
body, err := ioutil.ReadAll(req.Body) body, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,

View file

@ -21,6 +21,10 @@ import (
"sort" "sort"
"time" "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -30,9 +34,6 @@ import (
"github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
type messagesReq struct { type messagesReq struct {
@ -264,7 +265,7 @@ func (m *messagesResp) applyLazyLoadMembers(
} }
} }
for _, evt := range membershipToUser { for _, evt := range membershipToUser {
m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync)) m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll))
} }
} }

View file

@ -58,7 +58,7 @@ const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_
(user_id, room_id, notification_count, highlight_count) (user_id, room_id, notification_count, highlight_count)
VALUES ($1, $2, $3, $4) VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, room_id) ON CONFLICT (user_id, room_id)
DO UPDATE SET notification_count = $3, highlight_count = $4 DO UPDATE SET id = nextval('syncapi_notification_data_id_seq'), notification_count = $3, highlight_count = $4
RETURNING id` RETURNING id`
const selectUserUnreadNotificationCountsSQL = `SELECT const selectUserUnreadNotificationCountsSQL = `SELECT

View file

@ -25,12 +25,14 @@ import (
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
) )
func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { func NewSqliteNotificationDataTable(db *sql.DB, streamID *StreamIDStatements) (tables.NotificationData, error) {
_, err := db.Exec(notificationDataSchema) _, err := db.Exec(notificationDataSchema)
if err != nil { if err != nil {
return nil, err return nil, err
} }
r := &notificationDataStatements{} r := &notificationDataStatements{
streamIDStatements: streamID,
}
return r, sqlutil.StatementList{ return r, sqlutil.StatementList{
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL}, {&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
{&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL}, {&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
@ -39,6 +41,7 @@ func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error)
} }
type notificationDataStatements struct { type notificationDataStatements struct {
streamIDStatements *StreamIDStatements
upsertRoomUnreadCounts *sql.Stmt upsertRoomUnreadCounts *sql.Stmt
selectUserUnreadCounts *sql.Stmt selectUserUnreadCounts *sql.Stmt
selectMaxID *sql.Stmt selectMaxID *sql.Stmt
@ -58,8 +61,7 @@ const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_
(user_id, room_id, notification_count, highlight_count) (user_id, room_id, notification_count, highlight_count)
VALUES ($1, $2, $3, $4) VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, room_id) ON CONFLICT (user_id, room_id)
DO UPDATE SET notification_count = $3, highlight_count = $4 DO UPDATE SET id = $5, notification_count = $6, highlight_count = $7`
RETURNING id`
const selectUserUnreadNotificationCountsSQL = `SELECT const selectUserUnreadNotificationCountsSQL = `SELECT
id, room_id, notification_count, highlight_count id, room_id, notification_count, highlight_count
@ -71,7 +73,11 @@ const selectUserUnreadNotificationCountsSQL = `SELECT
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data` const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) { func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos) pos, err = r.streamIDStatements.nextNotificationID(ctx, nil)
if err != nil {
return
}
_, err = r.upsertRoomUnreadCounts.ExecContext(ctx, userID, roomID, notificationCount, highlightCount, pos, notificationCount, highlightCount)
return return
} }

View file

@ -26,6 +26,8 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0)
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0) INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0)
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("notification", 0)
ON CONFLICT DO NOTHING;
` `
const increaseStreamIDStmt = "" + const increaseStreamIDStmt = "" +
@ -78,3 +80,9 @@ func (s *StreamIDStatements) nextPresenceID(ctx context.Context, txn *sql.Tx) (p
err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos) err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos)
return return
} }
func (s *StreamIDStatements) nextNotificationID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
err = increaseStmt.QueryRowContext(ctx, "notification").Scan(&pos)
return
}

View file

@ -97,7 +97,7 @@ func (d *SyncServerDatasource) prepare(ctx context.Context) (err error) {
if err != nil { if err != nil {
return err return err
} }
notificationData, err := NewSqliteNotificationDataTable(d.db) notificationData, err := NewSqliteNotificationDataTable(d.db, &d.streamID)
if err != nil { if err != nil {
return err return err
} }

View file

@ -18,10 +18,11 @@ import (
"context" "context"
"database/sql" "database/sql"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
) )
type AccountData interface { type AccountData interface {
@ -122,12 +123,14 @@ type CurrentRoomState interface {
// //
// We persist the previous event IDs as well, one per row, so when we do fetch even // We persist the previous event IDs as well, one per row, so when we do fetch even
// earlier events we can simply delete rows which referenced it. Consider the graph: // earlier events we can simply delete rows which referenced it. Consider the graph:
// A //
// | Event C has 1 prev_event ID: A. // A
// B C // | Event C has 1 prev_event ID: A.
// |___| Event D has 2 prev_event IDs: B and C. // B C
// | // |___| Event D has 2 prev_event IDs: B and C.
// D // |
// D
//
// The earliest known event we have is D, so this table has 2 rows. // The earliest known event we have is D, so this table has 2 rows.
// A backfill request gives us C but not B. We delete rows where prev_event=C. This // A backfill request gives us C but not B. We delete rows where prev_event=C. This
// still means that D is a backwards extremity as we do not have event B. However, event // still means that D is a backwards extremity as we do not have event B. However, event

View file

@ -14,10 +14,13 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"go.uber.org/atomic" "go.uber.org/atomic"
"github.com/matrix-org/dendrite/syncapi/notifier"
) )
// The max number of per-room goroutines to have running. // The max number of per-room goroutines to have running.
@ -37,6 +40,7 @@ type PDUStreamProvider struct {
// userID+deviceID -> lazy loading cache // userID+deviceID -> lazy loading cache
lazyLoadCache caching.LazyLoadCache lazyLoadCache caching.LazyLoadCache
rsAPI roomserverAPI.SyncRoomserverAPI rsAPI roomserverAPI.SyncRoomserverAPI
notifier *notifier.Notifier
} }
func (p *PDUStreamProvider) worker() { func (p *PDUStreamProvider) worker() {
@ -103,6 +107,15 @@ func (p *PDUStreamProvider) CompleteSync(
req.Log.WithError(err).Error("unable to update event filter with ignored users") req.Log.WithError(err).Error("unable to update event filter with ignored users")
} }
// Invalidate the lazyLoadCache, otherwise we end up with missing displaynames/avatars
// TODO: This might be inefficient, when joined to many and/or large rooms.
for _, roomID := range joinedRoomIDs {
joinedUsers := p.notifier.JoinedUsers(roomID)
for _, sharedUser := range joinedUsers {
p.lazyLoadCache.InvalidateLazyLoadedUser(req.Device, roomID, sharedUser)
}
}
// Build up a /sync response. Add joined rooms. // Build up a /sync response. Add joined rooms.
var reqMutex sync.Mutex var reqMutex sync.Mutex
var reqWaitGroup sync.WaitGroup var reqWaitGroup sync.WaitGroup

View file

@ -34,6 +34,7 @@ func NewSyncStreamProviders(
StreamProvider: StreamProvider{DB: d}, StreamProvider: StreamProvider{DB: d},
lazyLoadCache: lazyLoadCache, lazyLoadCache: lazyLoadCache,
rsAPI: rsAPI, rsAPI: rsAPI,
notifier: notifier,
}, },
TypingStreamProvider: &TypingStreamProvider{ TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d}, StreamProvider: StreamProvider{DB: d},

View file

@ -12,10 +12,13 @@ import (
) )
type dummyPublisher struct { type dummyPublisher struct {
lock sync.Mutex
count int count int
} }
func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, statusMsg *string) error { func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, statusMsg *string) error {
d.lock.Lock()
defer d.lock.Unlock()
d.count++ d.count++
return nil return nil
} }
@ -125,11 +128,15 @@ func TestRequestPool_updatePresence(t *testing.T) {
go rp.cleanPresence(db, time.Millisecond*50) go rp.cleanPresence(db, time.Millisecond*50)
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
publisher.lock.Lock()
beforeCount := publisher.count beforeCount := publisher.count
publisher.lock.Unlock()
rp.updatePresence(db, tt.args.presence, tt.args.userID) rp.updatePresence(db, tt.args.presence, tt.args.userID)
publisher.lock.Lock()
if tt.wantIncrease && publisher.count <= beforeCount { if tt.wantIncrease && publisher.count <= beforeCount {
t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount) t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount)
} }
publisher.lock.Unlock()
time.Sleep(tt.args.sleep) time.Sleep(tt.args.sleep)
}) })
} }

View file

@ -22,7 +22,6 @@ import (
"encoding/pem" "encoding/pem"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"math/big" "math/big"
"os" "os"
"strings" "strings"
@ -144,7 +143,7 @@ func NewTLSKeyWithAuthority(serverName, tlsKeyPath, tlsCertPath, authorityKeyPat
} }
// load the authority key // load the authority key
dat, err := ioutil.ReadFile(authorityKeyPath) dat, err := os.ReadFile(authorityKeyPath)
if err != nil { if err != nil {
return err return err
} }
@ -158,7 +157,7 @@ func NewTLSKeyWithAuthority(serverName, tlsKeyPath, tlsCertPath, authorityKeyPat
} }
// load the authority certificate // load the authority certificate
dat, err = ioutil.ReadFile(authorityCertPath) dat, err = os.ReadFile(authorityCertPath)
if err != nil { if err != nil {
return err return err
} }

View file

@ -20,13 +20,14 @@ import (
"strings" "strings"
"time" "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables" "github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/userapi/types" "github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
) )
const userDailyVisitsSchema = ` const userDailyVisitsSchema = `
@ -297,11 +298,10 @@ func (s *statsStatements) monthlyUsers(ctx context.Context, txn *sql.Tx) (result
return return
} }
/* R30Users counts the number of 30 day retained users, defined as: // R30Users counts the number of 30 day retained users, defined as:
- Users who have created their accounts more than 30 days ago // - Users who have created their accounts more than 30 days ago
- Where last seen at most 30 days ago // - Where last seen at most 30 days ago
- Where account creation and last_seen are > 30 days apart // - Where account creation and last_seen are > 30 days apart
*/
func (s *statsStatements) r30Users(ctx context.Context, txn *sql.Tx) (map[string]int64, error) { func (s *statsStatements) r30Users(ctx context.Context, txn *sql.Tx) (map[string]int64, error) {
stmt := sqlutil.TxStmt(txn, s.countR30UsersStmt) stmt := sqlutil.TxStmt(txn, s.countR30UsersStmt)
lastSeenAfter := time.Now().AddDate(0, 0, -30) lastSeenAfter := time.Now().AddDate(0, 0, -30)
@ -334,7 +334,8 @@ func (s *statsStatements) r30Users(ctx context.Context, txn *sql.Tx) (map[string
return result, rows.Err() return result, rows.Err()
} }
/* R30UsersV2 counts the number of 30 day retained users, defined as users that: /*
R30UsersV2 counts the number of 30 day retained users, defined as users that:
- Appear more than once in the past 60 days - Appear more than once in the past 60 days
- Have more than 30 days between the most and least recent appearances that occurred in the past 60 days. - Have more than 30 days between the most and least recent appearances that occurred in the past 60 days.
*/ */