diff --git a/README.md b/README.md index 8f54db7b7..10e2b1b86 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,7 @@ As of October 2020 (current [progress below](#progress)), Dendrite has now enter This does not mean: - Dendrite is bug-free. It has not yet been battle-tested in the real world and so will be error prone initially. -- All of the CS/Federation APIs are implemented. We are tracking progress via a script called 'Are We Synapse Yet?'. In particular, - presence and push notifications are entirely missing from Dendrite. See [CHANGES.md](CHANGES.md) for updates. +- Dendrite is feature-complete. There may be client or federation APIs that are not implemented. - Dendrite is ready for massive homeserver deployments. You cannot shard each microservice, only run each one on a different machine. Currently, we expect Dendrite to function well for small (10s/100s of users) homeserver deployments as well as P2P Matrix nodes in-browser or on mobile devices. @@ -36,6 +35,9 @@ If you have further questions, please take a look at [our FAQ](docs/FAQ.md) or j ## Requirements +See the [Planning your Installation](https://matrix-org.github.io/dendrite/installation/planning) page for +more information on requirements. + To build Dendrite, you will need Go 1.18 or later. For a usable federating Dendrite deployment, you will also need: @@ -83,11 +85,11 @@ $ ./bin/create-account --config dendrite.yaml -username alice Then point your favourite Matrix client at `http://localhost:8008` or `https://localhost:8448`. -## Progress +## Progress We use a script called Are We Synapse Yet which checks Sytest compliance rates. Sytest is a black-box homeserver test rig with around 900 tests. The script works out how many of these tests are passing on Dendrite and it -updates with CI. As of April 2022 we're at around 83% CS API coverage and 95% Federation coverage, though check +updates with CI. As of August 2022 we're at around 83% CS API coverage and 95% Federation coverage, though check CI for the latest numbers. In practice, this means you can communicate locally and via federation with Synapse servers such as matrix.org reasonably well, although there are still some missing features (like Search). @@ -119,53 +121,8 @@ We would be grateful for any help on issues marked as all have related Sytests which need to pass in order for the issue to be closed. Once you've written your code, you can quickly run Sytest to ensure that the test names are now passing. -For example, if the test `Local device key changes get to remote servers` was marked as failing, find the -test file (e.g via `grep` or via the -[CI log output](https://buildkite.com/matrix-dot-org/dendrite/builds/2826#39cff5de-e032-4ad0-ad26-f819e6919c42) -it's `tests/50federation/40devicelists.pl` ) then to run Sytest: - -``` -docker run --rm --name sytest --v "/Users/kegan/github/sytest:/sytest" --v "/Users/kegan/github/dendrite:/src" --v "/Users/kegan/logs:/logs" --v "/Users/kegan/go/:/gopath" --e "POSTGRES=1" -e "DENDRITE_TRACE_HTTP=1" -matrixdotorg/sytest-dendrite:latest tests/50federation/40devicelists.pl -``` - -See [sytest.md](docs/sytest.md) for the full description of these flags. - -You can try running sytest outside of docker for faster runs, but the dependencies can be temperamental -and we recommend using docker where possible. - -``` -cd sytest -export PERL5LIB=$HOME/lib/perl5 -export PERL_MB_OPT=--install_base=$HOME -export PERL_MM_OPT=INSTALL_BASE=$HOME -./install-deps.pl - -./run-tests.pl -I Dendrite::Monolith -d $PATH_TO_DENDRITE_BINARIES -``` - -Sometimes Sytest is testing the wrong thing or is flakey, so it will need to be patched. -Ask on `#dendrite-dev:matrix.org` if you think this is the case for you and we'll be happy to help. - -If you're new to the project, see [CONTRIBUTING.md](docs/CONTRIBUTING.md) to get up to speed then +If you're new to the project, see our +[Contributing page](https://matrix-org.github.io/dendrite/development/contributing) to get up to speed, then look for [Good First Issues](https://github.com/matrix-org/dendrite/labels/good%20first%20issue). If you're familiar with the project, look for [Help Wanted](https://github.com/matrix-org/dendrite/labels/help-wanted) issues. - -## Hardware requirements - -Dendrite in Monolith + SQLite works in a range of environments including iOS and in-browser via WASM. - -For small homeserver installations joined on ~10s rooms on matrix.org with ~100s of users in those rooms, including some -encrypted rooms: - -- Memory: uses around 100MB of RAM, with peaks at around 200MB. -- Disk space: After a few months of usage, the database grew to around 2GB (in Monolith mode). -- CPU: Brief spikes when processing events, typically idles at 1% CPU. - -This means Dendrite should comfortably work on things like Raspberry Pis. diff --git a/build/scripts/build-test-lint.sh b/build/scripts/build-test-lint.sh index 8f0b775b1..32f89c076 100755 --- a/build/scripts/build-test-lint.sh +++ b/build/scripts/build-test-lint.sh @@ -13,4 +13,4 @@ go build ./cmd/... ./build/scripts/find-lint.sh echo "Testing..." -go test -v ./... +go test --race -v ./... diff --git a/clientapi/auth/login.go b/clientapi/auth/login.go index 8047bc08d..fc1f1a2e6 100644 --- a/clientapi/auth/login.go +++ b/clientapi/auth/login.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "io" - "io/ioutil" "net/http" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" @@ -42,7 +41,7 @@ func LoginFromJSONReader( userInteractiveAuth *UserInteractive, cfg *config.ClientAPI, ) (*Login, LoginCleanupFunc, *util.JSONResponse) { - reqBytes, err := ioutil.ReadAll(r) + reqBytes, err := io.ReadAll(r) if err != nil { err := &util.JSONResponse{ Code: http.StatusBadRequest, diff --git a/clientapi/httputil/httputil.go b/clientapi/httputil/httputil.go index b47701368..74f84f1e7 100644 --- a/clientapi/httputil/httputil.go +++ b/clientapi/httputil/httputil.go @@ -16,7 +16,7 @@ package httputil import ( "encoding/json" - "io/ioutil" + "io" "net/http" "unicode/utf8" @@ -29,9 +29,9 @@ import ( func UnmarshalJSONRequest(req *http.Request, iface interface{}) *util.JSONResponse { // encoding/json allows invalid utf-8, matrix does not // https://matrix.org/docs/spec/client_server/r0.6.1#api-standards - body, err := ioutil.ReadAll(req.Body) + body, err := io.ReadAll(req.Body) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("ioutil.ReadAll failed") + util.GetLogger(req.Context()).WithError(err).Error("io.ReadAll failed") resp := jsonerror.InternalServerError() return &resp } diff --git a/clientapi/routing/account_data.go b/clientapi/routing/account_data.go index 0d3a49495..b28f0bb1f 100644 --- a/clientapi/routing/account_data.go +++ b/clientapi/routing/account_data.go @@ -17,7 +17,7 @@ package routing import ( "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "github.com/matrix-org/dendrite/clientapi/httputil" @@ -101,9 +101,9 @@ func SaveAccountData( } } - body, err := ioutil.ReadAll(req.Body) + body, err := io.ReadAll(req.Body) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("ioutil.ReadAll failed") + util.GetLogger(req.Context()).WithError(err).Error("io.ReadAll failed") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/deactivate.go b/clientapi/routing/deactivate.go index c8aa6a3bc..f213db7f3 100644 --- a/clientapi/routing/deactivate.go +++ b/clientapi/routing/deactivate.go @@ -1,7 +1,7 @@ package routing import ( - "io/ioutil" + "io" "net/http" "github.com/matrix-org/dendrite/clientapi/auth" @@ -20,7 +20,7 @@ func Deactivate( ) util.JSONResponse { ctx := req.Context() defer req.Body.Close() // nolint:errcheck - bodyBytes, err := ioutil.ReadAll(req.Body) + bodyBytes, err := io.ReadAll(req.Body) if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, diff --git a/clientapi/routing/device.go b/clientapi/routing/device.go index bb1cf47bd..e3a02661c 100644 --- a/clientapi/routing/device.go +++ b/clientapi/routing/device.go @@ -15,7 +15,7 @@ package routing import ( - "io/ioutil" + "io" "net" "net/http" @@ -175,7 +175,7 @@ func DeleteDeviceById( }() ctx := req.Context() defer req.Body.Close() // nolint:errcheck - bodyBytes, err := ioutil.ReadAll(req.Body) + bodyBytes, err := io.ReadAll(req.Body) if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, diff --git a/clientapi/routing/directory_public.go b/clientapi/routing/directory_public.go index c3e6141b2..8ddb3267a 100644 --- a/clientapi/routing/directory_public.go +++ b/clientapi/routing/directory_public.go @@ -23,13 +23,14 @@ import ( "strings" "sync" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/matrix-org/dendrite/clientapi/api" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" ) var ( @@ -196,14 +197,14 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO // sliceInto returns a subslice of `slice` which honours the since/limit values given. // -// 0 1 2 3 4 5 6 index -// [A, B, C, D, E, F, G] slice +// 0 1 2 3 4 5 6 index +// [A, B, C, D, E, F, G] slice // -// limit=3 => A,B,C (prev='', next='3') -// limit=3&since=3 => D,E,F (prev='0', next='6') -// limit=3&since=6 => G (prev='3', next='') +// limit=3 => A,B,C (prev='', next='3') +// limit=3&since=3 => D,E,F (prev='0', next='6') +// limit=3&since=6 => G (prev='3', next='') // -// A value of '-1' for prev/next indicates no position. +// A value of '-1' for prev/next indicates no position. func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (subset []gomatrixserverlib.PublicRoom, prev, next int) { prev = -1 next = -1 diff --git a/clientapi/routing/register.go b/clientapi/routing/register.go index 63a8db6b2..b0e8746a6 100644 --- a/clientapi/routing/register.go +++ b/clientapi/routing/register.go @@ -19,7 +19,7 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "net/url" "regexp" @@ -388,7 +388,7 @@ func validateRecaptcha( // Grab the body of the response from the captcha server var r recaptchaResponse - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return &util.JSONResponse{ Code: http.StatusGatewayTimeout, @@ -556,7 +556,7 @@ func Register( cfg *config.ClientAPI, ) util.JSONResponse { defer req.Body.Close() // nolint: errcheck - reqBody, err := ioutil.ReadAll(req.Body) + reqBody, err := io.ReadAll(req.Body) if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, diff --git a/clientapi/routing/register_secret_test.go b/clientapi/routing/register_secret_test.go index e702b2152..a2ed35853 100644 --- a/clientapi/routing/register_secret_test.go +++ b/clientapi/routing/register_secret_test.go @@ -2,7 +2,7 @@ package routing import ( "bytes" - "io/ioutil" + "io" "testing" "github.com/patrickmn/go-cache" @@ -13,7 +13,7 @@ func TestSharedSecretRegister(t *testing.T) { jsonStr := []byte(`{"admin":false,"mac":"f1ba8d37123866fd659b40de4bad9b0f8965c565","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice"}`) sharedSecret := "dendritetest" - req, err := NewSharedSecretRegistrationRequest(ioutil.NopCloser(bytes.NewBuffer(jsonStr))) + req, err := NewSharedSecretRegistrationRequest(io.NopCloser(bytes.NewBuffer(jsonStr))) if err != nil { t.Fatalf("failed to read request: %s", err) } diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index 2e864adef..85f1053f3 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -63,9 +63,10 @@ var sendEventDuration = prometheus.NewHistogramVec( ) // SendEvent implements: -// /rooms/{roomID}/send/{eventType} -// /rooms/{roomID}/send/{eventType}/{txnID} -// /rooms/{roomID}/state/{eventType}/{stateKey} +// +// /rooms/{roomID}/send/{eventType} +// /rooms/{roomID}/send/{eventType}/{txnID} +// /rooms/{roomID}/state/{eventType}/{stateKey} func SendEvent( req *http.Request, device *userapi.Device, diff --git a/clientapi/routing/threepid.go b/clientapi/routing/threepid.go index 94b658ee3..4b7989ecb 100644 --- a/clientapi/routing/threepid.go +++ b/clientapi/routing/threepid.go @@ -38,8 +38,9 @@ type threePIDsResponse struct { } // RequestEmailToken implements: -// POST /account/3pid/email/requestToken -// POST /register/email/requestToken +// +// POST /account/3pid/email/requestToken +// POST /register/email/requestToken func RequestEmailToken(req *http.Request, threePIDAPI api.ClientUserAPI, cfg *config.ClientAPI) util.JSONResponse { var body threepid.EmailAssociationRequest if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil { diff --git a/clientapi/routing/voip.go b/clientapi/routing/voip.go index c7ddaabcf..f0f69ce3c 100644 --- a/clientapi/routing/voip.go +++ b/clientapi/routing/voip.go @@ -22,15 +22,17 @@ import ( "net/http" "time" + "github.com/matrix-org/gomatrix" + "github.com/matrix-org/util" + "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrix" - "github.com/matrix-org/util" ) // RequestTurnServer implements: -// GET /voip/turnServer +// +// GET /voip/turnServer func RequestTurnServer(req *http.Request, device *api.Device, cfg *config.ClientAPI) util.JSONResponse { turnConfig := cfg.TURN diff --git a/cmd/create-account/main.go b/cmd/create-account/main.go index 7f6d5105e..92179a049 100644 --- a/cmd/create-account/main.go +++ b/cmd/create-account/main.go @@ -19,7 +19,6 @@ import ( "flag" "fmt" "io" - "io/ioutil" "os" "regexp" "strings" @@ -157,7 +156,7 @@ func main() { func getPassword(password, pwdFile string, pwdStdin bool, r io.Reader) (string, error) { // read password from file if pwdFile != "" { - pw, err := ioutil.ReadFile(pwdFile) + pw, err := os.ReadFile(pwdFile) if err != nil { return "", fmt.Errorf("Unable to read password from file: %v", err) } @@ -166,7 +165,7 @@ func getPassword(password, pwdFile string, pwdStdin bool, r io.Reader) (string, // read password from stdin if pwdStdin { - data, err := ioutil.ReadAll(r) + data, err := io.ReadAll(r) if err != nil { return "", fmt.Errorf("Unable to read password from stdin: %v", err) } diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 8fa935ddf..75f29fe27 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -21,7 +21,6 @@ import ( "encoding/hex" "flag" "fmt" - "io/ioutil" "net" "net/http" "os" @@ -76,11 +75,11 @@ func main() { if pk, sk, err = ed25519.GenerateKey(nil); err != nil { panic(err) } - if err = ioutil.WriteFile(keyfile, sk, 0644); err != nil { + if err = os.WriteFile(keyfile, sk, 0644); err != nil { panic(err) } } else if err == nil { - if sk, err = ioutil.ReadFile(keyfile); err != nil { + if sk, err = os.ReadFile(keyfile); err != nil { panic(err) } if len(sk) != ed25519.PrivateKeySize { diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go index d93272e2e..ff3c73ec8 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go @@ -20,7 +20,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "io/ioutil" "log" "net" "os" @@ -69,7 +68,7 @@ func Setup(instanceName, storageDirectory, peerURI string) (*Node, error) { yggfile := fmt.Sprintf("%s/%s-yggdrasil.conf", storageDirectory, instanceName) if _, err := os.Stat(yggfile); !os.IsNotExist(err) { - yggconf, e := ioutil.ReadFile(yggfile) + yggconf, e := os.ReadFile(yggfile) if e != nil { panic(err) } @@ -88,7 +87,7 @@ func Setup(instanceName, storageDirectory, peerURI string) (*Node, error) { if err != nil { panic(err) } - if e := ioutil.WriteFile(yggfile, j, 0600); e != nil { + if e := os.WriteFile(yggfile, j, 0600); e != nil { n.log.Printf("Couldn't write private key to file '%s': %s\n", yggfile, e) } diff --git a/cmd/dendrite-upgrade-tests/main.go b/cmd/dendrite-upgrade-tests/main.go index 39843dccb..dce22472d 100644 --- a/cmd/dendrite-upgrade-tests/main.go +++ b/cmd/dendrite-upgrade-tests/main.go @@ -6,7 +6,7 @@ import ( "encoding/json" "flag" "fmt" - "io/ioutil" + "io" "log" "net/http" "os" @@ -47,7 +47,7 @@ const HEAD = "HEAD" // We cannot use the dockerfile associated with the repo with each version sadly due to changes in // Docker versions. Specifically, earlier Dendrite versions are incompatible with newer Docker clients // due to the error: -// When using COPY with more than one source file, the destination must be a directory and end with a / +// When using COPY with more than one source file, the destination must be a directory and end with a / // We need to run a postgres anyway, so use the dockerfile associated with Complement instead. const Dockerfile = `FROM golang:1.18-stretch as build RUN apt-get update && apt-get install -y postgresql @@ -95,7 +95,9 @@ CMD /build/run_dendrite.sh ` const dendriteUpgradeTestLabel = "dendrite_upgrade_test" // downloadArchive downloads an arbitrary github archive of the form: -// https://github.com/matrix-org/dendrite/archive/v0.3.11.tar.gz +// +// https://github.com/matrix-org/dendrite/archive/v0.3.11.tar.gz +// // and re-tarballs it without the top-level directory which contains branch information. It inserts // the contents of `dockerfile` as a root file `Dockerfile` in the re-tarballed directory such that // you can directly feed the retarballed archive to `ImageBuild` to have it run said dockerfile. @@ -126,7 +128,7 @@ func downloadArchive(cli *http.Client, tmpDir, archiveURL string, dockerfile []b return nil, err } // add top level Dockerfile - err = ioutil.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm) + err = os.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm) if err != nil { return nil, fmt.Errorf("failed to inject /Dockerfile: %w", err) } @@ -148,7 +150,7 @@ func buildDendrite(httpClient *http.Client, dockerClient *client.Client, tmpDir, if branchOrTagName == HEAD && *flagHead != "" { log.Printf("%s: Using %s as HEAD", branchOrTagName, *flagHead) // add top level Dockerfile - err = ioutil.WriteFile(path.Join(*flagHead, "Dockerfile"), []byte(Dockerfile), os.ModePerm) + err = os.WriteFile(path.Join(*flagHead, "Dockerfile"), []byte(Dockerfile), os.ModePerm) if err != nil { return "", fmt.Errorf("custom HEAD: failed to inject /Dockerfile: %w", err) } @@ -386,7 +388,7 @@ func runImage(dockerClient *client.Client, volumeName, version, imageID string) }) // ignore errors when cannot get logs, it's just for debugging anyways if err == nil { - logbody, err := ioutil.ReadAll(logs) + logbody, err := io.ReadAll(logs) if err == nil { log.Printf("Container logs:\n\n%s\n\n", string(logbody)) } diff --git a/cmd/dendrite-upgrade-tests/tests.go b/cmd/dendrite-upgrade-tests/tests.go index e02af92a9..ff1e09dda 100644 --- a/cmd/dendrite-upgrade-tests/tests.go +++ b/cmd/dendrite-upgrade-tests/tests.go @@ -18,9 +18,9 @@ type user struct { } // runTests performs the following operations: -// - register alice and bob with branch name muxed into the localpart -// - create a DM room for the 2 users and exchange messages -// - create/join a public #global room and exchange messages +// - register alice and bob with branch name muxed into the localpart +// - create a DM room for the 2 users and exchange messages +// - create/join a public #global room and exchange messages func runTests(baseURL, branchName string) error { // register 2 users users := []user{ diff --git a/cmd/furl/main.go b/cmd/furl/main.go index 75e223388..f59f9c8ce 100644 --- a/cmd/furl/main.go +++ b/cmd/furl/main.go @@ -9,7 +9,6 @@ import ( "encoding/pem" "flag" "fmt" - "io/ioutil" "net/url" "os" @@ -30,7 +29,7 @@ func main() { os.Exit(1) } - data, err := ioutil.ReadFile(*requestKey) + data, err := os.ReadFile(*requestKey) if err != nil { panic(err) } diff --git a/dendrite-sample.monolith.yaml b/dendrite-sample.monolith.yaml index 44d424e27..2ca0f41e9 100644 --- a/dendrite-sample.monolith.yaml +++ b/dendrite-sample.monolith.yaml @@ -188,13 +188,16 @@ client_api: # TURN server information that this homeserver should send to clients. turn: - turn_user_lifetime: "" + turn_user_lifetime: "5m" turn_uris: # - turn:turn.server.org?transport=udp # - turn:turn.server.org?transport=tcp turn_shared_secret: "" - turn_username: "" - turn_password: "" + # If your TURN server requires static credentials, then you will need to enter + # them here instead of supplying a shared secret. Note that these credentials + # will be visible to clients! + # turn_username: "" + # turn_password: "" # Settings for rate-limited endpoints. Rate limiting kicks in after the threshold # number of "slots" have been taken by requests from a specific host. Each "slot" diff --git a/dendrite-sample.polylith.yaml b/dendrite-sample.polylith.yaml index c5225f5c8..74405e940 100644 --- a/dendrite-sample.polylith.yaml +++ b/dendrite-sample.polylith.yaml @@ -191,13 +191,16 @@ client_api: # TURN server information that this homeserver should send to clients. turn: - turn_user_lifetime: "" + turn_user_lifetime: "5m" turn_uris: # - turn:turn.server.org?transport=udp # - turn:turn.server.org?transport=tcp turn_shared_secret: "" - turn_username: "" - turn_password: "" + # If your TURN server requires static credentials, then you will need to enter + # them here instead of supplying a shared secret. Note that these credentials + # will be visible to clients! + # turn_username: "" + # turn_password: "" # Settings for rate-limited endpoints. Rate limiting kicks in after the threshold # number of "slots" have been taken by requests from a specific host. Each "slot" diff --git a/docs/CONTRIBUTING.md b/docs/CONTRIBUTING.md index 169224b9e..771af9ecf 100644 --- a/docs/CONTRIBUTING.md +++ b/docs/CONTRIBUTING.md @@ -64,7 +64,7 @@ comment. Please avoid doing this if you can. We also have unit tests which we run via: ```bash -go test ./... +go test --race ./... ``` In general, we like submissions that come with tests. Anything that proves that the diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index e50ec66ad..2622ecb3f 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -208,9 +208,11 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rew // joinedHostsAtEvent works out a list of matrix servers that were joined to // the room at the event (including peeking ones) // It is important to use the state at the event for sending messages because: -// 1) We shouldn't send messages to servers that weren't in the room. -// 2) If a server is kicked from the rooms it should still be told about the -// kick event, +// +// 1. We shouldn't send messages to servers that weren't in the room. +// 2. If a server is kicked from the rooms it should still be told about the +// kick event. +// // Usually the list can be calculated locally, but sometimes it will need fetch // events from the room server. // Returns an error if there was a problem talking to the room server. diff --git a/federationapi/federationapi_keys_test.go b/federationapi/federationapi_keys_test.go index d1bfe1847..9c3446222 100644 --- a/federationapi/federationapi_keys_test.go +++ b/federationapi/federationapi_keys_test.go @@ -6,7 +6,7 @@ import ( "crypto/ed25519" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "os" "testing" @@ -66,7 +66,7 @@ func TestMain(m *testing.M) { s.cache = caching.NewRistrettoCache(8*1024*1024, time.Hour, false) // Create a temporary directory for JetStream. - d, err := ioutil.TempDir("./", "jetstream*") + d, err := os.MkdirTemp("./", "jetstream*") if err != nil { panic(err) } @@ -136,7 +136,7 @@ func (m *MockRoundTripper) RoundTrip(req *http.Request) (res *http.Response, err // And respond. res = &http.Response{ StatusCode: 200, - Body: ioutil.NopCloser(bytes.NewReader(body)), + Body: io.NopCloser(bytes.NewReader(body)), } return } diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go index ae244c566..8884e34c6 100644 --- a/federationapi/federationapi_test.go +++ b/federationapi/federationapi_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "testing" "time" @@ -48,6 +49,7 @@ func (f *fedRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *rsapi.Que // TODO: This struct isn't generic, only works for TestFederationAPIJoinThenKeyUpdate type fedClient struct { + fedClientMutex sync.Mutex api.FederationClient allowJoins []*test.Room keys map[gomatrixserverlib.ServerName]struct { @@ -59,6 +61,8 @@ type fedClient struct { } func (f *fedClient) GetServerKeys(ctx context.Context, matrixServer gomatrixserverlib.ServerName) (gomatrixserverlib.ServerKeys, error) { + f.fedClientMutex.Lock() + defer f.fedClientMutex.Unlock() fmt.Println("GetServerKeys:", matrixServer) var keys gomatrixserverlib.ServerKeys var keyID gomatrixserverlib.KeyID @@ -122,6 +126,8 @@ func (f *fedClient) MakeJoin(ctx context.Context, s gomatrixserverlib.ServerName return } func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res gomatrixserverlib.RespSendJoin, err error) { + f.fedClientMutex.Lock() + defer f.fedClientMutex.Unlock() for _, r := range f.allowJoins { if r.ID == event.RoomID() { r.InsertEvent(f.t, event.Headered(r.Version)) @@ -134,6 +140,8 @@ func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName } func (f *fedClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) { + f.fedClientMutex.Lock() + defer f.fedClientMutex.Unlock() for _, edu := range t.EDUs { if edu.Type == gomatrixserverlib.MDeviceListUpdate { f.sentTxn = true @@ -242,6 +250,8 @@ func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) { testrig.MustPublishMsgs(t, jsctx, msg) time.Sleep(500 * time.Millisecond) + fc.fedClientMutex.Lock() + defer fc.fedClientMutex.Unlock() if !fc.sentTxn { t.Fatalf("did not send device list update") } diff --git a/federationapi/queue/queue.go b/federationapi/queue/queue.go index 4c25c4ce6..88664fcf9 100644 --- a/federationapi/queue/queue.go +++ b/federationapi/queue/queue.go @@ -158,7 +158,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d oqs.queuesMutex.Lock() defer oqs.queuesMutex.Unlock() oq, ok := oqs.queues[destination] - if !ok || oq != nil { + if !ok || oq == nil { destinationQueueTotal.Inc() oq = &destinationQueue{ queues: oqs, diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index 30406a155..9ad3bd8eb 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -21,13 +21,14 @@ import ( "sort" "time" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/sirupsen/logrus" ) // MakeJoin implements the /make_join API @@ -435,13 +436,13 @@ func SendJoin( // a restricted room join. If the room version does not support restricted // joins then this function returns with no side effects. This returns three // values: -// * an optional JSON response body (i.e. M_UNABLE_TO_AUTHORISE_JOIN) which -// should always be sent back to the client if one is specified -// * a user ID of an authorising user, typically a user that has power to -// issue invites in the room, if one has been found -// * an error if there was a problem finding out if this was allowable, -// like if the room version isn't known or a problem happened talking to -// the roomserver +// - an optional JSON response body (i.e. M_UNABLE_TO_AUTHORISE_JOIN) which +// should always be sent back to the client if one is specified +// - a user ID of an authorising user, typically a user that has power to +// issue invites in the room, if one has been found +// - an error if there was a problem finding out if this was allowable, +// like if the room version isn't known or a problem happened talking to +// the roomserver func checkRestrictedJoin( httpReq *http.Request, rsAPI api.FederationRoomserverAPI, diff --git a/internal/caching/cache_lazy_load_members.go b/internal/caching/cache_lazy_load_members.go index 0d7009c94..390334da7 100644 --- a/internal/caching/cache_lazy_load_members.go +++ b/internal/caching/cache_lazy_load_members.go @@ -14,6 +14,7 @@ type lazyLoadingCacheKey struct { type LazyLoadCache interface { StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) + InvalidateLazyLoadedUser(device *userapi.Device, roomID, userID string) } func (c Caches) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) { @@ -33,3 +34,12 @@ func (c Caches) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID st TargetUserID: userID, }) } + +func (c Caches) InvalidateLazyLoadedUser(device *userapi.Device, roomID, userID string) { + c.LazyLoading.Unset(lazyLoadingCacheKey{ + UserID: device.UserID, + DeviceID: device.ID, + RoomID: roomID, + TargetUserID: userID, + }) +} diff --git a/internal/log.go b/internal/log.go index bba0ac6e6..a171555ab 100644 --- a/internal/log.go +++ b/internal/log.go @@ -27,9 +27,10 @@ import ( "github.com/matrix-org/util" - "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dugong" "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/setup/config" ) type utcFormatter struct { @@ -145,7 +146,7 @@ func setupFileHook(hook config.LogrusHook, level logrus.Level, componentName str }) } -//CloseAndLogIfError Closes io.Closer and logs the error if any +// CloseAndLogIfError Closes io.Closer and logs the error if any func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string) { if closer == nil { return diff --git a/internal/log_unix.go b/internal/log_unix.go index 1e1094f23..75332af73 100644 --- a/internal/log_unix.go +++ b/internal/log_unix.go @@ -18,7 +18,7 @@ package internal import ( - "io/ioutil" + "io" "log/syslog" "github.com/MFAshby/stdemuxerhook" @@ -63,7 +63,7 @@ func SetupHookLogging(hooks []config.LogrusHook, componentName string) { setupStdLogHook(logrus.InfoLevel) } // Hooks are now configured for stdout/err, so throw away the default logger output - logrus.SetOutput(ioutil.Discard) + logrus.SetOutput(io.Discard) } func checkSyslogHookParams(params map[string]interface{}) { diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index acbcd5b8f..e317755e4 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -22,12 +22,13 @@ import ( "sync" "time" - fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" - "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" + + fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/keyserver/api" ) var ( @@ -66,12 +67,14 @@ func init() { // - We don't have unbounded growth in proportion to the number of servers (this is more important in a P2P world where // we have many many servers) // - We can adjust concurrency (at the cost of memory usage) by tuning N, to accommodate mobile devices vs servers. +// // The downsides are that: // - Query requests can get queued behind other servers if they hash to the same worker, even if there are other free // workers elsewhere. Whilst suboptimal, provided we cap how long a single request can last (e.g using context timeouts) // we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests // (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse // than being stuck behind foo.bar +// // In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is // set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried. type DeviceListUpdater struct { diff --git a/keyserver/internal/device_list_update_test.go b/keyserver/internal/device_list_update_test.go index 0033a5086..0c2405f34 100644 --- a/keyserver/internal/device_list_update_test.go +++ b/keyserver/internal/device_list_update_test.go @@ -18,7 +18,7 @@ import ( "context" "crypto/ed25519" "fmt" - "io/ioutil" + "io" "net/http" "net/url" "reflect" @@ -27,8 +27,9 @@ import ( "testing" "time" - "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/gomatrixserverlib" + + "github.com/matrix-org/dendrite/keyserver/api" ) var ( @@ -202,7 +203,7 @@ func TestUpdateNoPrevID(t *testing.T) { } return &http.Response{ StatusCode: 200, - Body: ioutil.NopCloser(strings.NewReader(` + Body: io.NopCloser(strings.NewReader(` { "user_id": "` + remoteUserID + `", "stream_id": 5, @@ -317,7 +318,7 @@ func TestDebounce(t *testing.T) { // now send the response over federation fedCh <- &http.Response{ StatusCode: 200, - Body: ioutil.NopCloser(strings.NewReader(` + Body: io.NopCloser(strings.NewReader(` { "user_id": "` + userID + `", "stream_id": 5, diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go index 44cfb5f2a..e7a2af7c2 100644 --- a/keyserver/storage/storage_test.go +++ b/keyserver/storage/storage_test.go @@ -3,6 +3,7 @@ package storage_test import ( "context" "reflect" + "sync" "testing" "github.com/matrix-org/dendrite/keyserver/api" @@ -103,6 +104,9 @@ func TestKeyChangesUpperLimit(t *testing.T) { }) } +var dbLock sync.Mutex +var deviceArray = []string{"AAA", "another_device"} + // The purpose of this test is to make sure that the storage layer is generating sequential stream IDs per user, // and that they are returned correctly when querying for device keys. func TestDeviceKeysStreamIDGeneration(t *testing.T) { @@ -169,8 +173,11 @@ func TestDeviceKeysStreamIDGeneration(t *testing.T) { t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID) } + dbLock.Lock() + defer dbLock.Unlock() // Querying for device keys returns the latest stream IDs - msgs, err = db.DeviceKeysForUser(ctx, alice, []string{"AAA", "another_device"}, false) + msgs, err = db.DeviceKeysForUser(ctx, alice, deviceArray, false) + if err != nil { t.Fatalf("DeviceKeysForUser returned error: %s", err) } diff --git a/mediaapi/fileutils/fileutils.go b/mediaapi/fileutils/fileutils.go index 754e4644b..2e719dc82 100644 --- a/mediaapi/fileutils/fileutils.go +++ b/mediaapi/fileutils/fileutils.go @@ -21,7 +21,6 @@ import ( "encoding/base64" "fmt" "io" - "io/ioutil" "os" "path/filepath" "strings" @@ -180,7 +179,7 @@ func createTempDir(baseDirectory config.Path) (types.Path, error) { if err := os.MkdirAll(baseTmpDir, 0770); err != nil { return "", fmt.Errorf("failed to create base temp dir: %w", err) } - tmpDir, err := ioutil.TempDir(baseTmpDir, "") + tmpDir, err := os.MkdirTemp(baseTmpDir, "") if err != nil { return "", fmt.Errorf("failed to create temp dir: %w", err) } diff --git a/mediaapi/routing/download.go b/mediaapi/routing/download.go index 10b25a5cd..c9299b1fc 100644 --- a/mediaapi/routing/download.go +++ b/mediaapi/routing/download.go @@ -19,7 +19,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "mime" "net/http" "net/url" @@ -695,7 +694,7 @@ func (r *downloadRequest) GetContentLengthAndReader(contentLengthHeader string, // We successfully parsed the Content-Length, so we'll return a limited // reader that restricts us to reading only up to this size. - reader = ioutil.NopCloser(io.LimitReader(*body, parsedLength)) + reader = io.NopCloser(io.LimitReader(*body, parsedLength)) contentLength = parsedLength } else { // Content-Length header is missing. If we have a maximum file size @@ -704,7 +703,7 @@ func (r *downloadRequest) GetContentLengthAndReader(contentLengthHeader string, // ultimately it will get rewritten later when the temp file is written // to disk. if maxFileSizeBytes > 0 { - reader = ioutil.NopCloser(io.LimitReader(*body, int64(maxFileSizeBytes))) + reader = io.NopCloser(io.LimitReader(*body, int64(maxFileSizeBytes))) } contentLength = 0 } diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index ecd4ecbb5..339c9796c 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -25,6 +25,11 @@ import ( "github.com/Arceliar/phony" "github.com/getsentry/sentry-go" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + fedapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/api" @@ -35,10 +40,6 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" ) // Inputer is responsible for consuming from the roomserver input @@ -60,9 +61,9 @@ import ( // per-room durable consumers will only progress through the stream // as events are processed. // -// A BC * -> positions of each consumer (* = ephemeral) -// ⌄ ⌄⌄ ⌄ -// ABAABCAABCAA -> newest (letter = subject for each message) +// A BC * -> positions of each consumer (* = ephemeral) +// ⌄ ⌄⌄ ⌄ +// ABAABCAABCAA -> newest (letter = subject for each message) // // In this example, A is still processing an event but has two // pending events to process afterwards. Both B and C are caught diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index f7d15fdb5..d6efad79d 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -20,32 +20,32 @@ import ( "context" "fmt" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/opentracing/opentracing-go" + "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/opentracing/opentracing-go" - "github.com/sirupsen/logrus" ) // updateLatestEvents updates the list of latest events for this room in the database and writes the // event to the output log. // The latest events are the events that aren't referenced by another event in the database: // -// Time goes down the page. 1 is the m.room.create event (root). -// -// 1 After storing 1 the latest events are {1} -// | After storing 2 the latest events are {2} -// 2 After storing 3 the latest events are {3} -// / \ After storing 4 the latest events are {3,4} -// 3 4 After storing 5 the latest events are {5,4} -// | | After storing 6 the latest events are {5,6} -// 5 6 <--- latest After storing 7 the latest events are {6,7} -// | -// 7 <----- latest +// Time goes down the page. 1 is the m.room.create event (root). +// 1 After storing 1 the latest events are {1} +// | After storing 2 the latest events are {2} +// 2 After storing 3 the latest events are {3} +// / \ After storing 4 the latest events are {3,4} +// 3 4 After storing 5 the latest events are {5,4} +// | | After storing 6 the latest events are {5,6} +// 5 6 <--- latest After storing 7 the latest events are {6,7} +// | +// 7 <----- latest // // Can only be called once at a time func (r *Inputer) updateLatestEvents( diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index 5b7ed22ee..298ba04f6 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -19,6 +19,10 @@ import ( "fmt" "github.com/getsentry/sentry-go" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" @@ -26,9 +30,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/sirupsen/logrus" ) // the max number of servers to backfill from per request. If this is too low we may fail to backfill when @@ -522,8 +523,9 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, } // joinEventsFromHistoryVisibility returns all CURRENTLY joined members if our server can read the room history +// // TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just -// pull all events and then filter by that table. +// pull all events and then filter by that table. func joinEventsFromHistoryVisibility( ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry, thisServer gomatrixserverlib.ServerName) ([]types.Event, error) { diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 58c43ac45..0bc389b80 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -5,9 +5,10 @@ import ( "database/sql" "errors" - "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/tidwall/gjson" + + "github.com/matrix-org/dendrite/roomserver/types" ) var OptimisationNotSupportedError = errors.New("optimisation not supported") @@ -178,7 +179,7 @@ type StrippedEvent struct { } // ExtractContentValue from the given state event. For example, given an m.room.name event with: -// content: { name: "Foo" } +// content: { name: "Foo" } // this returns "Foo". func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string { content := ev.Content() diff --git a/run-sytest.sh b/run-sytest.sh index 47635fd12..e23982397 100755 --- a/run-sytest.sh +++ b/run-sytest.sh @@ -17,7 +17,7 @@ main() { if [ -d ../sytest ]; then local tmpdir - tmpdir="$(mktemp -d --tmpdir run-systest.XXXXXXXXXX)" + tmpdir="$(mktemp -d -t run-systest.XXXXXXXXXX)" trap "rm -r '$tmpdir'" EXIT if [ -z "$DISABLE_BUILDING_SYTEST" ]; then diff --git a/setup/config/config.go b/setup/config/config.go index f8f18c595..b2dc08edd 100644 --- a/setup/config/config.go +++ b/setup/config/config.go @@ -19,8 +19,8 @@ import ( "encoding/pem" "fmt" "io" - "io/ioutil" "net/url" + "os" "path/filepath" "regexp" "strings" @@ -192,7 +192,7 @@ type ConfigErrors []string // Load a yaml config file for a server run as multiple processes or as a monolith. // Checks the config to ensure that it is valid. func Load(configPath string, monolith bool) (*Dendrite, error) { - configData, err := ioutil.ReadFile(configPath) + configData, err := os.ReadFile(configPath) if err != nil { return nil, err } @@ -200,9 +200,9 @@ func Load(configPath string, monolith bool) (*Dendrite, error) { if err != nil { return nil, err } - // Pass the current working directory and ioutil.ReadFile so that they can + // Pass the current working directory and os.ReadFile so that they can // be mocked in the tests - return loadConfig(basePath, configData, ioutil.ReadFile, monolith) + return loadConfig(basePath, configData, os.ReadFile, monolith) } func loadConfig( @@ -541,7 +541,7 @@ func (config *Dendrite) KeyServerURL() string { // SetupTracing configures the opentracing using the supplied configuration. func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) { if !config.Tracing.Enabled { - return ioutil.NopCloser(bytes.NewReader([]byte{})), nil + return io.NopCloser(bytes.NewReader([]byte{})), nil } return config.Tracing.Jaeger.InitGlobalTracer( serviceName, diff --git a/setup/config/config_appservice.go b/setup/config/config_appservice.go index 9b89fc9af..b8f99a612 100644 --- a/setup/config/config_appservice.go +++ b/setup/config/config_appservice.go @@ -16,7 +16,7 @@ package config import ( "fmt" - "io/ioutil" + "os" "path/filepath" "regexp" "strings" @@ -181,7 +181,7 @@ func loadAppServices(config *AppServiceAPI, derived *Derived) error { } // Read the application service's config file - configData, err := ioutil.ReadFile(absPath) + configData, err := os.ReadFile(absPath) if err != nil { return err } diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index be216a02a..051d55a35 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -14,16 +14,16 @@ import ( "github.com/sirupsen/logrus" natsserver "github.com/nats-io/nats-server/v2/server" - "github.com/nats-io/nats.go" natsclient "github.com/nats-io/nats.go" ) type NATSInstance struct { *natsserver.Server - sync.Mutex } -func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) { +var natsLock sync.Mutex + +func DeleteAllStreams(js natsclient.JetStreamContext, cfg *config.JetStream) { for _, stream := range streams { // streams are defined in streams.go name := cfg.Prefixed(stream.Name) _ = js.DeleteStream(name) @@ -31,11 +31,12 @@ func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) { } func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { + natsLock.Lock() + defer natsLock.Unlock() // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { return setupNATS(process, cfg, nil) } - s.Lock() if s.Server == nil { var err error s.Server, err = natsserver.NewServer(&natsserver.Options{ @@ -63,7 +64,6 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS process.ComponentFinished() }() } - s.Unlock() if !s.ReadyForConnections(time.Second * 10) { logrus.Fatalln("NATS did not start in time") } @@ -77,9 +77,9 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { if nc == nil { var err error - opts := []nats.Option{} + opts := []natsclient.Option{} if cfg.DisableTLSValidation { - opts = append(opts, nats.Secure(&tls.Config{ + opts = append(opts, natsclient.Secure(&tls.Config{ InsecureSkipVerify: true, })) } diff --git a/setup/mscs/msc2836/msc2836_test.go b/setup/mscs/msc2836/msc2836_test.go index 9044823af..eeded4275 100644 --- a/setup/mscs/msc2836/msc2836_test.go +++ b/setup/mscs/msc2836/msc2836_test.go @@ -7,7 +7,7 @@ import ( "crypto/sha256" "encoding/base64" "encoding/json" - "io/ioutil" + "io" "net/http" "sort" "strings" @@ -15,6 +15,8 @@ import ( "time" "github.com/gorilla/mux" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/internal/httputil" roomserver "github.com/matrix-org/dendrite/roomserver/api" @@ -22,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/mscs/msc2836" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" ) var ( @@ -32,15 +33,17 @@ var ( ) // Basic sanity check of MSC2836 logic. Injects a thread that looks like: -// A -// | -// B -// / \ -// C D -// /|\ -// E F G -// | -// H +// +// A +// | +// B +// / \ +// C D +// /|\ +// E F G +// | +// H +// // And makes sure POST /event_relationships works with various parameters func TestMSC2836(t *testing.T) { alice := "@alice:localhost" @@ -425,12 +428,12 @@ func postRelationships(t *testing.T, expectCode int, accessToken string, req *ms t.Fatalf("failed to do request: %s", err) } if res.StatusCode != expectCode { - body, _ := ioutil.ReadAll(res.Body) + body, _ := io.ReadAll(res.Body) t.Fatalf("wrong response code, got %d want %d - body: %s", res.StatusCode, expectCode, string(body)) } if res.StatusCode == 200 { var result msc2836.EventRelationshipResponse - body, err := ioutil.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) if err != nil { t.Fatalf("response 200 OK but failed to read response body: %s", err) } diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index eec369c1a..02633b567 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -21,6 +21,11 @@ import ( "fmt" "github.com/getsentry/sentry-go" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" @@ -29,10 +34,6 @@ import ( "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - "github.com/sirupsen/logrus" - log "github.com/sirupsen/logrus" ) // OutputClientDataConsumer consumes events that originated in the client API server. @@ -107,7 +108,8 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) "type": output.Type, "room_id": output.RoomID, log.ErrorKey: err, - }).Panicf("could not save account data") + }).Errorf("could not save account data") + return false } if err = s.sendReadUpdate(ctx, userID, output); err != nil { diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 6bfc91edd..c7d8df740 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -6,12 +6,13 @@ import ( "sort" "testing" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" ) var ( @@ -364,13 +365,14 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { // tests that joining/leaving the SAME room puts users in `left` if the final state is leave. // NB: Consider the case: -// - Alice and Bob are in a room. -// - Alice goes offline, Charlie joins, sends encrypted messages then leaves the room. -// - Alice comes back online. Technically nothing has changed in the set of users between those two points in time, -// it's still just (Alice,Bob) but then we won't be tracking Charlie -- is this okay though? It's device keys -// which are only relevant when actively sending events I think? And if Alice does need the keys she knows -// charlie's (user_id, device_id) so can just hit /keys/query - no need to keep updated about it because she -// doesn't share any rooms with him. +// - Alice and Bob are in a room. +// - Alice goes offline, Charlie joins, sends encrypted messages then leaves the room. +// - Alice comes back online. Technically nothing has changed in the set of users between those two points in time, +// it's still just (Alice,Bob) but then we won't be tracking Charlie -- is this okay though? It's device keys +// which are only relevant when actively sending events I think? And if Alice does need the keys she knows +// charlie's (user_id, device_id) so can just hit /keys/query - no need to keep updated about it because she +// doesn't share any rooms with him. +// // Ergo, we put them in `left` as it is simpler. func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { newShareUser := "@berta:localhost" diff --git a/syncapi/routing/filter.go b/syncapi/routing/filter.go index 1a10bd649..f5acdbde3 100644 --- a/syncapi/routing/filter.go +++ b/syncapi/routing/filter.go @@ -16,16 +16,17 @@ package routing import ( "encoding/json" - "io/ioutil" + "io" "net/http" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/tidwall/gjson" + "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/tidwall/gjson" ) // GetFilter implements GET /_matrix/client/r0/user/{userId}/filter/{filterId} @@ -65,7 +66,9 @@ type filterResponse struct { FilterID string `json:"filter_id"` } -//PutFilter implements POST /_matrix/client/r0/user/{userId}/filter +// PutFilter implements +// +// POST /_matrix/client/r0/user/{userId}/filter func PutFilter( req *http.Request, device *api.Device, syncDB storage.Database, userID string, ) util.JSONResponse { @@ -85,7 +88,7 @@ func PutFilter( var filter gomatrixserverlib.Filter defer req.Body.Close() // nolint:errcheck - body, err := ioutil.ReadAll(req.Body) + body, err := io.ReadAll(req.Body) if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 990ca55b1..b4c9a5423 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -20,6 +20,10 @@ import ( "net/http" "sort" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver/api" @@ -28,9 +32,6 @@ import ( "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/sirupsen/logrus" ) type messagesReq struct { @@ -262,7 +263,7 @@ func (m *messagesResp) applyLazyLoadMembers( } } for _, evt := range membershipToUser { - m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync)) + m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll)) } } diff --git a/syncapi/storage/postgres/notification_data_table.go b/syncapi/storage/postgres/notification_data_table.go index f3fc4451f..9cd8b7362 100644 --- a/syncapi/storage/postgres/notification_data_table.go +++ b/syncapi/storage/postgres/notification_data_table.go @@ -58,7 +58,7 @@ const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_ (user_id, room_id, notification_count, highlight_count) VALUES ($1, $2, $3, $4) ON CONFLICT (user_id, room_id) - DO UPDATE SET notification_count = $3, highlight_count = $4 + DO UPDATE SET id = nextval('syncapi_notification_data_id_seq'), notification_count = $3, highlight_count = $4 RETURNING id` const selectUserUnreadNotificationCountsSQL = `SELECT diff --git a/syncapi/storage/sqlite3/notification_data_table.go b/syncapi/storage/sqlite3/notification_data_table.go index 4b3f074db..eaa11a8c0 100644 --- a/syncapi/storage/sqlite3/notification_data_table.go +++ b/syncapi/storage/sqlite3/notification_data_table.go @@ -25,12 +25,14 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" ) -func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { +func NewSqliteNotificationDataTable(db *sql.DB, streamID *StreamIDStatements) (tables.NotificationData, error) { _, err := db.Exec(notificationDataSchema) if err != nil { return nil, err } - r := ¬ificationDataStatements{} + r := ¬ificationDataStatements{ + streamIDStatements: streamID, + } return r, sqlutil.StatementList{ {&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL}, {&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL}, @@ -39,6 +41,7 @@ func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) } type notificationDataStatements struct { + streamIDStatements *StreamIDStatements upsertRoomUnreadCounts *sql.Stmt selectUserUnreadCounts *sql.Stmt selectMaxID *sql.Stmt @@ -58,8 +61,7 @@ const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_ (user_id, room_id, notification_count, highlight_count) VALUES ($1, $2, $3, $4) ON CONFLICT (user_id, room_id) - DO UPDATE SET notification_count = $3, highlight_count = $4 - RETURNING id` + DO UPDATE SET id = $5, notification_count = $6, highlight_count = $7` const selectUserUnreadNotificationCountsSQL = `SELECT id, room_id, notification_count, highlight_count @@ -71,7 +73,11 @@ const selectUserUnreadNotificationCountsSQL = `SELECT const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data` func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) { - err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos) + pos, err = r.streamIDStatements.nextNotificationID(ctx, nil) + if err != nil { + return + } + _, err = r.upsertRoomUnreadCounts.ExecContext(ctx, userID, roomID, notificationCount, highlightCount, pos, notificationCount, highlightCount) return } diff --git a/syncapi/storage/sqlite3/stream_id_table.go b/syncapi/storage/sqlite3/stream_id_table.go index 71980b806..1160a437e 100644 --- a/syncapi/storage/sqlite3/stream_id_table.go +++ b/syncapi/storage/sqlite3/stream_id_table.go @@ -26,6 +26,8 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0) ON CONFLICT DO NOTHING; INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0) ON CONFLICT DO NOTHING; +INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("notification", 0) + ON CONFLICT DO NOTHING; ` const increaseStreamIDStmt = "" + @@ -78,3 +80,9 @@ func (s *StreamIDStatements) nextPresenceID(ctx context.Context, txn *sql.Tx) (p err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos) return } + +func (s *StreamIDStatements) nextNotificationID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { + increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt) + err = increaseStmt.QueryRowContext(ctx, "notification").Scan(&pos) + return +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 65b2bb38a..5c5eb0f55 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -95,7 +95,7 @@ func (d *SyncServerDatasource) prepare() (err error) { if err != nil { return err } - notificationData, err := NewSqliteNotificationDataTable(d.db) + notificationData, err := NewSqliteNotificationDataTable(d.db, &d.streamID) if err != nil { return err } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index d8fc8f508..d68351d4c 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -18,10 +18,11 @@ import ( "context" "database/sql" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" ) type AccountData interface { @@ -122,12 +123,14 @@ type CurrentRoomState interface { // // We persist the previous event IDs as well, one per row, so when we do fetch even // earlier events we can simply delete rows which referenced it. Consider the graph: -// A -// | Event C has 1 prev_event ID: A. -// B C -// |___| Event D has 2 prev_event IDs: B and C. -// | -// D +// +// A +// | Event C has 1 prev_event ID: A. +// B C +// |___| Event D has 2 prev_event IDs: B and C. +// | +// D +// // The earliest known event we have is D, so this table has 2 rows. // A backfill request gives us C but not B. We delete rows where prev_event=C. This // still means that D is a backwards extremity as we do not have event B. However, event diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 1003208fd..1ad3adc4c 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -12,9 +12,12 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/tidwall/gjson" "go.uber.org/atomic" + + "github.com/matrix-org/dendrite/syncapi/notifier" ) // The max number of per-room goroutines to have running. @@ -34,6 +37,7 @@ type PDUStreamProvider struct { // userID+deviceID -> lazy loading cache lazyLoadCache caching.LazyLoadCache rsAPI roomserverAPI.SyncRoomserverAPI + notifier *notifier.Notifier } func (p *PDUStreamProvider) worker() { @@ -100,6 +104,15 @@ func (p *PDUStreamProvider) CompleteSync( req.Log.WithError(err).Error("unable to update event filter with ignored users") } + // Invalidate the lazyLoadCache, otherwise we end up with missing displaynames/avatars + // TODO: This might be inefficient, when joined to many and/or large rooms. + for _, roomID := range joinedRoomIDs { + joinedUsers := p.notifier.JoinedUsers(roomID) + for _, sharedUser := range joinedUsers { + p.lazyLoadCache.InvalidateLazyLoadedUser(req.Device, roomID, sharedUser) + } + } + // Build up a /sync response. Add joined rooms. var reqMutex sync.Mutex var reqWaitGroup sync.WaitGroup diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index 1ca4ee8c3..dbc053bd8 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -34,6 +34,7 @@ func NewSyncStreamProviders( StreamProvider: StreamProvider{DB: d}, lazyLoadCache: lazyLoadCache, rsAPI: rsAPI, + notifier: notifier, }, TypingStreamProvider: &TypingStreamProvider{ StreamProvider: StreamProvider{DB: d}, diff --git a/syncapi/sync/requestpool_test.go b/syncapi/sync/requestpool_test.go index 48e6c6c7a..3e5769d8c 100644 --- a/syncapi/sync/requestpool_test.go +++ b/syncapi/sync/requestpool_test.go @@ -12,10 +12,13 @@ import ( ) type dummyPublisher struct { + lock sync.Mutex count int } func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, statusMsg *string) error { + d.lock.Lock() + defer d.lock.Unlock() d.count++ return nil } @@ -125,11 +128,15 @@ func TestRequestPool_updatePresence(t *testing.T) { go rp.cleanPresence(db, time.Millisecond*50) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + publisher.lock.Lock() beforeCount := publisher.count + publisher.lock.Unlock() rp.updatePresence(db, tt.args.presence, tt.args.userID) + publisher.lock.Lock() if tt.wantIncrease && publisher.count <= beforeCount { t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount) } + publisher.lock.Unlock() time.Sleep(tt.args.sleep) }) } diff --git a/test/keys.go b/test/keys.go index 75e3800e0..327c6ed7b 100644 --- a/test/keys.go +++ b/test/keys.go @@ -22,7 +22,6 @@ import ( "encoding/pem" "errors" "fmt" - "io/ioutil" "math/big" "os" "strings" @@ -144,7 +143,7 @@ func NewTLSKeyWithAuthority(serverName, tlsKeyPath, tlsCertPath, authorityKeyPat } // load the authority key - dat, err := ioutil.ReadFile(authorityKeyPath) + dat, err := os.ReadFile(authorityKeyPath) if err != nil { return err } @@ -158,7 +157,7 @@ func NewTLSKeyWithAuthority(serverName, tlsKeyPath, tlsCertPath, authorityKeyPat } // load the authority certificate - dat, err = ioutil.ReadFile(authorityCertPath) + dat, err = os.ReadFile(authorityCertPath) if err != nil { return err } diff --git a/userapi/storage/sqlite3/stats_table.go b/userapi/storage/sqlite3/stats_table.go index e00ed417b..8aa1746c5 100644 --- a/userapi/storage/sqlite3/stats_table.go +++ b/userapi/storage/sqlite3/stats_table.go @@ -20,13 +20,14 @@ import ( "strings" "time" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage/tables" "github.com/matrix-org/dendrite/userapi/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" ) const userDailyVisitsSchema = ` @@ -297,11 +298,10 @@ func (s *statsStatements) monthlyUsers(ctx context.Context, txn *sql.Tx) (result return } -/* R30Users counts the number of 30 day retained users, defined as: -- Users who have created their accounts more than 30 days ago -- Where last seen at most 30 days ago -- Where account creation and last_seen are > 30 days apart -*/ +// R30Users counts the number of 30 day retained users, defined as: +// - Users who have created their accounts more than 30 days ago +// - Where last seen at most 30 days ago +// - Where account creation and last_seen are > 30 days apart func (s *statsStatements) r30Users(ctx context.Context, txn *sql.Tx) (map[string]int64, error) { stmt := sqlutil.TxStmt(txn, s.countR30UsersStmt) lastSeenAfter := time.Now().AddDate(0, 0, -30) @@ -334,7 +334,8 @@ func (s *statsStatements) r30Users(ctx context.Context, txn *sql.Tx) (map[string return result, rows.Err() } -/* R30UsersV2 counts the number of 30 day retained users, defined as users that: +/* +R30UsersV2 counts the number of 30 day retained users, defined as users that: - Appear more than once in the past 60 days - Have more than 30 days between the most and least recent appearances that occurred in the past 60 days. */