Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/expireedus

This commit is contained in:
Till Faelligen 2022-08-09 09:26:40 +02:00
commit ae4a45f7e9
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
38 changed files with 167 additions and 142 deletions

View file

@ -21,8 +21,7 @@ As of October 2020 (current [progress below](#progress)), Dendrite has now enter
This does not mean: This does not mean:
- Dendrite is bug-free. It has not yet been battle-tested in the real world and so will be error prone initially. - Dendrite is bug-free. It has not yet been battle-tested in the real world and so will be error prone initially.
- All of the CS/Federation APIs are implemented. We are tracking progress via a script called 'Are We Synapse Yet?'. In particular, - Dendrite is feature-complete. There may be client or federation APIs that are not implemented.
presence and push notifications are entirely missing from Dendrite. See [CHANGES.md](CHANGES.md) for updates.
- Dendrite is ready for massive homeserver deployments. You cannot shard each microservice, only run each one on a different machine. - Dendrite is ready for massive homeserver deployments. You cannot shard each microservice, only run each one on a different machine.
Currently, we expect Dendrite to function well for small (10s/100s of users) homeserver deployments as well as P2P Matrix nodes in-browser or on mobile devices. Currently, we expect Dendrite to function well for small (10s/100s of users) homeserver deployments as well as P2P Matrix nodes in-browser or on mobile devices.
@ -36,6 +35,9 @@ If you have further questions, please take a look at [our FAQ](docs/FAQ.md) or j
## Requirements ## Requirements
See the [Planning your Installation](https://matrix-org.github.io/dendrite/installation/planning) page for
more information on requirements.
To build Dendrite, you will need Go 1.18 or later. To build Dendrite, you will need Go 1.18 or later.
For a usable federating Dendrite deployment, you will also need: For a usable federating Dendrite deployment, you will also need:
@ -83,11 +85,11 @@ $ ./bin/create-account --config dendrite.yaml -username alice
Then point your favourite Matrix client at `http://localhost:8008` or `https://localhost:8448`. Then point your favourite Matrix client at `http://localhost:8008` or `https://localhost:8448`.
## <a id="progress"></a> Progress ## Progress
We use a script called Are We Synapse Yet which checks Sytest compliance rates. Sytest is a black-box homeserver We use a script called Are We Synapse Yet which checks Sytest compliance rates. Sytest is a black-box homeserver
test rig with around 900 tests. The script works out how many of these tests are passing on Dendrite and it test rig with around 900 tests. The script works out how many of these tests are passing on Dendrite and it
updates with CI. As of April 2022 we're at around 83% CS API coverage and 95% Federation coverage, though check updates with CI. As of August 2022 we're at around 83% CS API coverage and 95% Federation coverage, though check
CI for the latest numbers. In practice, this means you can communicate locally and via federation with Synapse CI for the latest numbers. In practice, this means you can communicate locally and via federation with Synapse
servers such as matrix.org reasonably well, although there are still some missing features (like Search). servers such as matrix.org reasonably well, although there are still some missing features (like Search).
@ -119,53 +121,8 @@ We would be grateful for any help on issues marked as
all have related Sytests which need to pass in order for the issue to be closed. Once you've written your all have related Sytests which need to pass in order for the issue to be closed. Once you've written your
code, you can quickly run Sytest to ensure that the test names are now passing. code, you can quickly run Sytest to ensure that the test names are now passing.
For example, if the test `Local device key changes get to remote servers` was marked as failing, find the If you're new to the project, see our
test file (e.g via `grep` or via the [Contributing page](https://matrix-org.github.io/dendrite/development/contributing) to get up to speed, then
[CI log output](https://buildkite.com/matrix-dot-org/dendrite/builds/2826#39cff5de-e032-4ad0-ad26-f819e6919c42)
it's `tests/50federation/40devicelists.pl` ) then to run Sytest:
```
docker run --rm --name sytest
-v "/Users/kegan/github/sytest:/sytest"
-v "/Users/kegan/github/dendrite:/src"
-v "/Users/kegan/logs:/logs"
-v "/Users/kegan/go/:/gopath"
-e "POSTGRES=1" -e "DENDRITE_TRACE_HTTP=1"
matrixdotorg/sytest-dendrite:latest tests/50federation/40devicelists.pl
```
See [sytest.md](docs/sytest.md) for the full description of these flags.
You can try running sytest outside of docker for faster runs, but the dependencies can be temperamental
and we recommend using docker where possible.
```
cd sytest
export PERL5LIB=$HOME/lib/perl5
export PERL_MB_OPT=--install_base=$HOME
export PERL_MM_OPT=INSTALL_BASE=$HOME
./install-deps.pl
./run-tests.pl -I Dendrite::Monolith -d $PATH_TO_DENDRITE_BINARIES
```
Sometimes Sytest is testing the wrong thing or is flakey, so it will need to be patched.
Ask on `#dendrite-dev:matrix.org` if you think this is the case for you and we'll be happy to help.
If you're new to the project, see [CONTRIBUTING.md](docs/CONTRIBUTING.md) to get up to speed then
look for [Good First Issues](https://github.com/matrix-org/dendrite/labels/good%20first%20issue). If you're look for [Good First Issues](https://github.com/matrix-org/dendrite/labels/good%20first%20issue). If you're
familiar with the project, look for [Help Wanted](https://github.com/matrix-org/dendrite/labels/help-wanted) familiar with the project, look for [Help Wanted](https://github.com/matrix-org/dendrite/labels/help-wanted)
issues. issues.
## Hardware requirements
Dendrite in Monolith + SQLite works in a range of environments including iOS and in-browser via WASM.
For small homeserver installations joined on ~10s rooms on matrix.org with ~100s of users in those rooms, including some
encrypted rooms:
- Memory: uses around 100MB of RAM, with peaks at around 200MB.
- Disk space: After a few months of usage, the database grew to around 2GB (in Monolith mode).
- CPU: Brief spikes when processing events, typically idles at 1% CPU.
This means Dendrite should comfortably work on things like Raspberry Pis.

View file

@ -18,7 +18,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
@ -34,7 +33,7 @@ import (
// If the final return value is non-nil, an error occurred and the cleanup function // If the final return value is non-nil, an error occurred and the cleanup function
// is nil. // is nil.
func LoginFromJSONReader(ctx context.Context, r io.Reader, useraccountAPI uapi.UserLoginAPI, userAPI UserInternalAPIForLogin, cfg *config.ClientAPI) (*Login, LoginCleanupFunc, *util.JSONResponse) { func LoginFromJSONReader(ctx context.Context, r io.Reader, useraccountAPI uapi.UserLoginAPI, userAPI UserInternalAPIForLogin, cfg *config.ClientAPI) (*Login, LoginCleanupFunc, *util.JSONResponse) {
reqBytes, err := ioutil.ReadAll(r) reqBytes, err := io.ReadAll(r)
if err != nil { if err != nil {
err := &util.JSONResponse{ err := &util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,

View file

@ -16,7 +16,7 @@ package httputil
import ( import (
"encoding/json" "encoding/json"
"io/ioutil" "io"
"net/http" "net/http"
"unicode/utf8" "unicode/utf8"
@ -29,9 +29,9 @@ import (
func UnmarshalJSONRequest(req *http.Request, iface interface{}) *util.JSONResponse { func UnmarshalJSONRequest(req *http.Request, iface interface{}) *util.JSONResponse {
// encoding/json allows invalid utf-8, matrix does not // encoding/json allows invalid utf-8, matrix does not
// https://matrix.org/docs/spec/client_server/r0.6.1#api-standards // https://matrix.org/docs/spec/client_server/r0.6.1#api-standards
body, err := ioutil.ReadAll(req.Body) body, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("ioutil.ReadAll failed") util.GetLogger(req.Context()).WithError(err).Error("io.ReadAll failed")
resp := jsonerror.InternalServerError() resp := jsonerror.InternalServerError()
return &resp return &resp
} }

View file

@ -17,7 +17,7 @@ package routing
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
@ -101,9 +101,9 @@ func SaveAccountData(
} }
} }
body, err := ioutil.ReadAll(req.Body) body, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("ioutil.ReadAll failed") util.GetLogger(req.Context()).WithError(err).Error("io.ReadAll failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -1,7 +1,7 @@
package routing package routing
import ( import (
"io/ioutil" "io"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth"
@ -20,7 +20,7 @@ func Deactivate(
) util.JSONResponse { ) util.JSONResponse {
ctx := req.Context() ctx := req.Context()
defer req.Body.Close() // nolint:errcheck defer req.Body.Close() // nolint:errcheck
bodyBytes, err := ioutil.ReadAll(req.Body) bodyBytes, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,

View file

@ -15,7 +15,7 @@
package routing package routing
import ( import (
"io/ioutil" "io"
"net" "net"
"net/http" "net/http"
@ -175,7 +175,7 @@ func DeleteDeviceById(
}() }()
ctx := req.Context() ctx := req.Context()
defer req.Body.Close() // nolint:errcheck defer req.Body.Close() // nolint:errcheck
bodyBytes, err := ioutil.ReadAll(req.Body) bodyBytes, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,

View file

@ -19,7 +19,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"net/url" "net/url"
"regexp" "regexp"
@ -371,7 +371,7 @@ func validateRecaptcha(
// Grab the body of the response from the captcha server // Grab the body of the response from the captcha server
var r recaptchaResponse var r recaptchaResponse
body, err := ioutil.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return &util.JSONResponse{ return &util.JSONResponse{
Code: http.StatusGatewayTimeout, Code: http.StatusGatewayTimeout,
@ -539,7 +539,7 @@ func Register(
cfg *config.ClientAPI, cfg *config.ClientAPI,
) util.JSONResponse { ) util.JSONResponse {
defer req.Body.Close() // nolint: errcheck defer req.Body.Close() // nolint: errcheck
reqBody, err := ioutil.ReadAll(req.Body) reqBody, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,

View file

@ -2,7 +2,7 @@ package routing
import ( import (
"bytes" "bytes"
"io/ioutil" "io"
"testing" "testing"
"github.com/patrickmn/go-cache" "github.com/patrickmn/go-cache"
@ -13,7 +13,7 @@ func TestSharedSecretRegister(t *testing.T) {
jsonStr := []byte(`{"admin":false,"mac":"f1ba8d37123866fd659b40de4bad9b0f8965c565","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice"}`) jsonStr := []byte(`{"admin":false,"mac":"f1ba8d37123866fd659b40de4bad9b0f8965c565","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice"}`)
sharedSecret := "dendritetest" sharedSecret := "dendritetest"
req, err := NewSharedSecretRegistrationRequest(ioutil.NopCloser(bytes.NewBuffer(jsonStr))) req, err := NewSharedSecretRegistrationRequest(io.NopCloser(bytes.NewBuffer(jsonStr)))
if err != nil { if err != nil {
t.Fatalf("failed to read request: %s", err) t.Fatalf("failed to read request: %s", err)
} }

View file

@ -19,7 +19,6 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"regexp" "regexp"
"strings" "strings"
@ -157,7 +156,7 @@ func main() {
func getPassword(password, pwdFile string, pwdStdin bool, r io.Reader) (string, error) { func getPassword(password, pwdFile string, pwdStdin bool, r io.Reader) (string, error) {
// read password from file // read password from file
if pwdFile != "" { if pwdFile != "" {
pw, err := ioutil.ReadFile(pwdFile) pw, err := os.ReadFile(pwdFile)
if err != nil { if err != nil {
return "", fmt.Errorf("Unable to read password from file: %v", err) return "", fmt.Errorf("Unable to read password from file: %v", err)
} }
@ -166,7 +165,7 @@ func getPassword(password, pwdFile string, pwdStdin bool, r io.Reader) (string,
// read password from stdin // read password from stdin
if pwdStdin { if pwdStdin {
data, err := ioutil.ReadAll(r) data, err := io.ReadAll(r)
if err != nil { if err != nil {
return "", fmt.Errorf("Unable to read password from stdin: %v", err) return "", fmt.Errorf("Unable to read password from stdin: %v", err)
} }

View file

@ -21,7 +21,6 @@ import (
"encoding/hex" "encoding/hex"
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"net" "net"
"net/http" "net/http"
"os" "os"
@ -76,11 +75,11 @@ func main() {
if pk, sk, err = ed25519.GenerateKey(nil); err != nil { if pk, sk, err = ed25519.GenerateKey(nil); err != nil {
panic(err) panic(err)
} }
if err = ioutil.WriteFile(keyfile, sk, 0644); err != nil { if err = os.WriteFile(keyfile, sk, 0644); err != nil {
panic(err) panic(err)
} }
} else if err == nil { } else if err == nil {
if sk, err = ioutil.ReadFile(keyfile); err != nil { if sk, err = os.ReadFile(keyfile); err != nil {
panic(err) panic(err)
} }
if len(sk) != ed25519.PrivateKeySize { if len(sk) != ed25519.PrivateKeySize {

View file

@ -20,7 +20,6 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"net" "net"
"os" "os"
@ -69,7 +68,7 @@ func Setup(instanceName, storageDirectory, peerURI string) (*Node, error) {
yggfile := fmt.Sprintf("%s/%s-yggdrasil.conf", storageDirectory, instanceName) yggfile := fmt.Sprintf("%s/%s-yggdrasil.conf", storageDirectory, instanceName)
if _, err := os.Stat(yggfile); !os.IsNotExist(err) { if _, err := os.Stat(yggfile); !os.IsNotExist(err) {
yggconf, e := ioutil.ReadFile(yggfile) yggconf, e := os.ReadFile(yggfile)
if e != nil { if e != nil {
panic(err) panic(err)
} }
@ -88,7 +87,7 @@ func Setup(instanceName, storageDirectory, peerURI string) (*Node, error) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
if e := ioutil.WriteFile(yggfile, j, 0600); e != nil { if e := os.WriteFile(yggfile, j, 0600); e != nil {
n.log.Printf("Couldn't write private key to file '%s': %s\n", yggfile, e) n.log.Printf("Couldn't write private key to file '%s': %s\n", yggfile, e)
} }

View file

@ -6,7 +6,7 @@ import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"io/ioutil" "io"
"log" "log"
"net/http" "net/http"
"os" "os"
@ -128,7 +128,7 @@ func downloadArchive(cli *http.Client, tmpDir, archiveURL string, dockerfile []b
return nil, err return nil, err
} }
// add top level Dockerfile // add top level Dockerfile
err = ioutil.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm) err = os.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to inject /Dockerfile: %w", err) return nil, fmt.Errorf("failed to inject /Dockerfile: %w", err)
} }
@ -150,7 +150,7 @@ func buildDendrite(httpClient *http.Client, dockerClient *client.Client, tmpDir,
if branchOrTagName == HEAD && *flagHead != "" { if branchOrTagName == HEAD && *flagHead != "" {
log.Printf("%s: Using %s as HEAD", branchOrTagName, *flagHead) log.Printf("%s: Using %s as HEAD", branchOrTagName, *flagHead)
// add top level Dockerfile // add top level Dockerfile
err = ioutil.WriteFile(path.Join(*flagHead, "Dockerfile"), []byte(Dockerfile), os.ModePerm) err = os.WriteFile(path.Join(*flagHead, "Dockerfile"), []byte(Dockerfile), os.ModePerm)
if err != nil { if err != nil {
return "", fmt.Errorf("custom HEAD: failed to inject /Dockerfile: %w", err) return "", fmt.Errorf("custom HEAD: failed to inject /Dockerfile: %w", err)
} }
@ -388,7 +388,7 @@ func runImage(dockerClient *client.Client, volumeName, version, imageID string)
}) })
// ignore errors when cannot get logs, it's just for debugging anyways // ignore errors when cannot get logs, it's just for debugging anyways
if err == nil { if err == nil {
logbody, err := ioutil.ReadAll(logs) logbody, err := io.ReadAll(logs)
if err == nil { if err == nil {
log.Printf("Container logs:\n\n%s\n\n", string(logbody)) log.Printf("Container logs:\n\n%s\n\n", string(logbody))
} }

View file

@ -9,7 +9,6 @@ import (
"encoding/pem" "encoding/pem"
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"net/url" "net/url"
"os" "os"
@ -30,7 +29,7 @@ func main() {
os.Exit(1) os.Exit(1)
} }
data, err := ioutil.ReadFile(*requestKey) data, err := os.ReadFile(*requestKey)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View file

@ -178,13 +178,16 @@ client_api:
# TURN server information that this homeserver should send to clients. # TURN server information that this homeserver should send to clients.
turn: turn:
turn_user_lifetime: "" turn_user_lifetime: "5m"
turn_uris: turn_uris:
# - turn:turn.server.org?transport=udp # - turn:turn.server.org?transport=udp
# - turn:turn.server.org?transport=tcp # - turn:turn.server.org?transport=tcp
turn_shared_secret: "" turn_shared_secret: ""
turn_username: "" # If your TURN server requires static credentials, then you will need to enter
turn_password: "" # them here instead of supplying a shared secret. Note that these credentials
# will be visible to clients!
# turn_username: ""
# turn_password: ""
# Settings for rate-limited endpoints. Rate limiting kicks in after the threshold # Settings for rate-limited endpoints. Rate limiting kicks in after the threshold
# number of "slots" have been taken by requests from a specific host. Each "slot" # number of "slots" have been taken by requests from a specific host. Each "slot"

View file

@ -181,13 +181,16 @@ client_api:
# TURN server information that this homeserver should send to clients. # TURN server information that this homeserver should send to clients.
turn: turn:
turn_user_lifetime: "" turn_user_lifetime: "5m"
turn_uris: turn_uris:
# - turn:turn.server.org?transport=udp # - turn:turn.server.org?transport=udp
# - turn:turn.server.org?transport=tcp # - turn:turn.server.org?transport=tcp
turn_shared_secret: "" turn_shared_secret: ""
turn_username: "" # If your TURN server requires static credentials, then you will need to enter
turn_password: "" # them here instead of supplying a shared secret. Note that these credentials
# will be visible to clients!
# turn_username: ""
# turn_password: ""
# Settings for rate-limited endpoints. Rate limiting kicks in after the threshold # Settings for rate-limited endpoints. Rate limiting kicks in after the threshold
# number of "slots" have been taken by requests from a specific host. Each "slot" # number of "slots" have been taken by requests from a specific host. Each "slot"

View file

@ -6,7 +6,7 @@ import (
"crypto/ed25519" "crypto/ed25519"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"os" "os"
"testing" "testing"
@ -66,7 +66,7 @@ func TestMain(m *testing.M) {
s.cache = caching.NewRistrettoCache(8*1024*1024, time.Hour, false) s.cache = caching.NewRistrettoCache(8*1024*1024, time.Hour, false)
// Create a temporary directory for JetStream. // Create a temporary directory for JetStream.
d, err := ioutil.TempDir("./", "jetstream*") d, err := os.MkdirTemp("./", "jetstream*")
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -136,7 +136,7 @@ func (m *MockRoundTripper) RoundTrip(req *http.Request) (res *http.Response, err
// And respond. // And respond.
res = &http.Response{ res = &http.Response{
StatusCode: 200, StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader(body)), Body: io.NopCloser(bytes.NewReader(body)),
} }
return return
} }

View file

@ -14,6 +14,7 @@ type lazyLoadingCacheKey struct {
type LazyLoadCache interface { type LazyLoadCache interface {
StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string)
IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool)
InvalidateLazyLoadedUser(device *userapi.Device, roomID, userID string)
} }
func (c Caches) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) { func (c Caches) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) {
@ -33,3 +34,12 @@ func (c Caches) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID st
TargetUserID: userID, TargetUserID: userID,
}) })
} }
func (c Caches) InvalidateLazyLoadedUser(device *userapi.Device, roomID, userID string) {
c.LazyLoading.Unset(lazyLoadingCacheKey{
UserID: device.UserID,
DeviceID: device.ID,
RoomID: roomID,
TargetUserID: userID,
})
}

View file

@ -18,7 +18,7 @@
package internal package internal
import ( import (
"io/ioutil" "io"
"log/syslog" "log/syslog"
"github.com/MFAshby/stdemuxerhook" "github.com/MFAshby/stdemuxerhook"
@ -63,7 +63,7 @@ func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
setupStdLogHook(logrus.InfoLevel) setupStdLogHook(logrus.InfoLevel)
} }
// Hooks are now configured for stdout/err, so throw away the default logger output // Hooks are now configured for stdout/err, so throw away the default logger output
logrus.SetOutput(ioutil.Discard) logrus.SetOutput(io.Discard)
} }
func checkSyslogHookParams(params map[string]interface{}) { func checkSyslogHookParams(params map[string]interface{}) {

View file

@ -18,7 +18,7 @@ import (
"context" "context"
"crypto/ed25519" "crypto/ed25519"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"net/url" "net/url"
"reflect" "reflect"
@ -203,7 +203,7 @@ func TestUpdateNoPrevID(t *testing.T) {
} }
return &http.Response{ return &http.Response{
StatusCode: 200, StatusCode: 200,
Body: ioutil.NopCloser(strings.NewReader(` Body: io.NopCloser(strings.NewReader(`
{ {
"user_id": "` + remoteUserID + `", "user_id": "` + remoteUserID + `",
"stream_id": 5, "stream_id": 5,
@ -318,7 +318,7 @@ func TestDebounce(t *testing.T) {
// now send the response over federation // now send the response over federation
fedCh <- &http.Response{ fedCh <- &http.Response{
StatusCode: 200, StatusCode: 200,
Body: ioutil.NopCloser(strings.NewReader(` Body: io.NopCloser(strings.NewReader(`
{ {
"user_id": "` + userID + `", "user_id": "` + userID + `",
"stream_id": 5, "stream_id": 5,

View file

@ -18,6 +18,8 @@ import (
"context" "context"
"database/sql" "database/sql"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/keyserver/storage/postgres/deltas" "github.com/matrix-org/dendrite/keyserver/storage/postgres/deltas"
@ -64,7 +66,8 @@ func NewPostgresKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) {
// TODO: Remove when we are sure we are not having goose artefacts in the db // TODO: Remove when we are sure we are not having goose artefacts in the db
// This forces an error, which indicates the migration is already applied, since the // This forces an error, which indicates the migration is already applied, since the
// column partition was removed from the table // column partition was removed from the table
err = db.QueryRow("SELECT partition FROM keyserver_key_changes LIMIT 1;").Scan() var count int
err = db.QueryRow("SELECT partition FROM keyserver_key_changes LIMIT 1;").Scan(&count)
if err == nil { if err == nil {
m := sqlutil.NewMigrator(db) m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{ m.AddMigrations(sqlutil.Migration{
@ -72,6 +75,16 @@ func NewPostgresKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) {
Up: deltas.UpRefactorKeyChanges, Up: deltas.UpRefactorKeyChanges,
}) })
return s, m.Up(context.Background()) return s, m.Up(context.Background())
} else {
switch e := err.(type) {
case *pq.Error:
// ignore undefined_column (42703) errors, as this is expected at this point
if e.Code != "42703" {
return nil, err
}
default:
return nil, err
}
} }
return s, nil return s, nil
} }

View file

@ -61,7 +61,8 @@ func NewSqliteKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) {
// TODO: Remove when we are sure we are not having goose artefacts in the db // TODO: Remove when we are sure we are not having goose artefacts in the db
// This forces an error, which indicates the migration is already applied, since the // This forces an error, which indicates the migration is already applied, since the
// column partition was removed from the table // column partition was removed from the table
err = db.QueryRow("SELECT partition FROM keyserver_key_changes LIMIT 1;").Scan() var count int
err = db.QueryRow("SELECT partition FROM keyserver_key_changes LIMIT 1;").Scan(&count)
if err == nil { if err == nil {
m := sqlutil.NewMigrator(db) m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{ m.AddMigrations(sqlutil.Migration{

View file

@ -21,7 +21,6 @@ import (
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@ -180,7 +179,7 @@ func createTempDir(baseDirectory config.Path) (types.Path, error) {
if err := os.MkdirAll(baseTmpDir, 0770); err != nil { if err := os.MkdirAll(baseTmpDir, 0770); err != nil {
return "", fmt.Errorf("failed to create base temp dir: %w", err) return "", fmt.Errorf("failed to create base temp dir: %w", err)
} }
tmpDir, err := ioutil.TempDir(baseTmpDir, "") tmpDir, err := os.MkdirTemp(baseTmpDir, "")
if err != nil { if err != nil {
return "", fmt.Errorf("failed to create temp dir: %w", err) return "", fmt.Errorf("failed to create temp dir: %w", err)
} }

View file

@ -19,7 +19,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"mime" "mime"
"net/http" "net/http"
"net/url" "net/url"
@ -695,7 +694,7 @@ func (r *downloadRequest) GetContentLengthAndReader(contentLengthHeader string,
// We successfully parsed the Content-Length, so we'll return a limited // We successfully parsed the Content-Length, so we'll return a limited
// reader that restricts us to reading only up to this size. // reader that restricts us to reading only up to this size.
reader = ioutil.NopCloser(io.LimitReader(*body, parsedLength)) reader = io.NopCloser(io.LimitReader(*body, parsedLength))
contentLength = parsedLength contentLength = parsedLength
} else { } else {
// Content-Length header is missing. If we have a maximum file size // Content-Length header is missing. If we have a maximum file size
@ -704,7 +703,7 @@ func (r *downloadRequest) GetContentLengthAndReader(contentLengthHeader string,
// ultimately it will get rewritten later when the temp file is written // ultimately it will get rewritten later when the temp file is written
// to disk. // to disk.
if maxFileSizeBytes > 0 { if maxFileSizeBytes > 0 {
reader = ioutil.NopCloser(io.LimitReader(*body, int64(maxFileSizeBytes))) reader = io.NopCloser(io.LimitReader(*body, int64(maxFileSizeBytes)))
} }
contentLength = 0 contentLength = 0
} }

View file

@ -19,8 +19,10 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/lib/pq"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas" "github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas"
@ -53,21 +55,32 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
// TODO: Remove when we are sure we are not having goose artefacts in the db // TODO: Remove when we are sure we are not having goose artefacts in the db
// This forces an error, which indicates the migration is already applied, since the // This forces an error, which indicates the migration is already applied, since the
// column event_nid was removed from the table // column event_nid was removed from the table
err = db.QueryRow("SELECT event_nid FROM roomserver_state_block LIMIT 1;").Scan() var eventNID int
err = db.QueryRow("SELECT event_nid FROM roomserver_state_block LIMIT 1;").Scan(&eventNID)
if err == nil { if err == nil {
m := sqlutil.NewMigrator(db) m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{ m.AddMigrations(sqlutil.Migration{
Version: "roomserver: state blocks refactor", Version: "roomserver: state blocks refactor",
Up: deltas.UpStateBlocksRefactor, Up: deltas.UpStateBlocksRefactor,
}) })
if err := m.Up(base.Context()); err != nil { if err = m.Up(base.Context()); err != nil {
return nil, err
}
} else {
switch e := err.(type) {
case *pq.Error:
// ignore undefined_column (42703) errors, as this is expected at this point
if e.Code != "42703" {
return nil, err
}
default:
return nil, err return nil, err
} }
} }
// Then prepare the statements. Now that the migrations have run, any columns referred // Then prepare the statements. Now that the migrations have run, any columns referred
// to in the database code should now exist. // to in the database code should now exist.
if err := d.prepare(db, writer, cache); err != nil { if err = d.prepare(db, writer, cache); err != nil {
return nil, err return nil, err
} }

View file

@ -20,6 +20,8 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/storage/shared"
@ -27,7 +29,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
) )
// A Database is used to store room events and stream offsets. // A Database is used to store room events and stream offsets.
@ -63,21 +64,22 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
// TODO: Remove when we are sure we are not having goose artefacts in the db // TODO: Remove when we are sure we are not having goose artefacts in the db
// This forces an error, which indicates the migration is already applied, since the // This forces an error, which indicates the migration is already applied, since the
// column event_nid was removed from the table // column event_nid was removed from the table
err = db.QueryRow("SELECT event_nid FROM roomserver_state_block LIMIT 1;").Scan() var eventNID int
err = db.QueryRow("SELECT event_nid FROM roomserver_state_block LIMIT 1;").Scan(&eventNID)
if err == nil { if err == nil {
m := sqlutil.NewMigrator(db) m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{ m.AddMigrations(sqlutil.Migration{
Version: "roomserver: state blocks refactor", Version: "roomserver: state blocks refactor",
Up: deltas.UpStateBlocksRefactor, Up: deltas.UpStateBlocksRefactor,
}) })
if err := m.Up(base.Context()); err != nil { if err = m.Up(base.Context()); err != nil {
return nil, err return nil, err
} }
} }
// Then prepare the statements. Now that the migrations have run, any columns referred // Then prepare the statements. Now that the migrations have run, any columns referred
// to in the database code should now exist. // to in the database code should now exist.
if err := d.prepare(db, writer, cache); err != nil { if err = d.prepare(db, writer, cache); err != nil {
return nil, err return nil, err
} }

View file

@ -19,8 +19,8 @@ import (
"encoding/pem" "encoding/pem"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/url" "net/url"
"os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings" "strings"
@ -191,7 +191,7 @@ type ConfigErrors []string
// Load a yaml config file for a server run as multiple processes or as a monolith. // Load a yaml config file for a server run as multiple processes or as a monolith.
// Checks the config to ensure that it is valid. // Checks the config to ensure that it is valid.
func Load(configPath string, monolith bool) (*Dendrite, error) { func Load(configPath string, monolith bool) (*Dendrite, error) {
configData, err := ioutil.ReadFile(configPath) configData, err := os.ReadFile(configPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -199,9 +199,9 @@ func Load(configPath string, monolith bool) (*Dendrite, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Pass the current working directory and ioutil.ReadFile so that they can // Pass the current working directory and os.ReadFile so that they can
// be mocked in the tests // be mocked in the tests
return loadConfig(basePath, configData, ioutil.ReadFile, monolith) return loadConfig(basePath, configData, os.ReadFile, monolith)
} }
func loadConfig( func loadConfig(
@ -530,7 +530,7 @@ func (config *Dendrite) KeyServerURL() string {
// SetupTracing configures the opentracing using the supplied configuration. // SetupTracing configures the opentracing using the supplied configuration.
func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) { func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) {
if !config.Tracing.Enabled { if !config.Tracing.Enabled {
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil return io.NopCloser(bytes.NewReader([]byte{})), nil
} }
return config.Tracing.Jaeger.InitGlobalTracer( return config.Tracing.Jaeger.InitGlobalTracer(
serviceName, serviceName,

View file

@ -16,7 +16,7 @@ package config
import ( import (
"fmt" "fmt"
"io/ioutil" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings" "strings"
@ -181,7 +181,7 @@ func loadAppServices(config *AppServiceAPI, derived *Derived) error {
} }
// Read the application service's config file // Read the application service's config file
configData, err := ioutil.ReadFile(absPath) configData, err := os.ReadFile(absPath)
if err != nil { if err != nil {
return err return err
} }

View file

@ -7,7 +7,7 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"io/ioutil" "io"
"net/http" "net/http"
"sort" "sort"
"strings" "strings"
@ -428,12 +428,12 @@ func postRelationships(t *testing.T, expectCode int, accessToken string, req *ms
t.Fatalf("failed to do request: %s", err) t.Fatalf("failed to do request: %s", err)
} }
if res.StatusCode != expectCode { if res.StatusCode != expectCode {
body, _ := ioutil.ReadAll(res.Body) body, _ := io.ReadAll(res.Body)
t.Fatalf("wrong response code, got %d want %d - body: %s", res.StatusCode, expectCode, string(body)) t.Fatalf("wrong response code, got %d want %d - body: %s", res.StatusCode, expectCode, string(body))
} }
if res.StatusCode == 200 { if res.StatusCode == 200 {
var result msc2836.EventRelationshipResponse var result msc2836.EventRelationshipResponse
body, err := ioutil.ReadAll(res.Body) body, err := io.ReadAll(res.Body)
if err != nil { if err != nil {
t.Fatalf("response 200 OK but failed to read response body: %s", err) t.Fatalf("response 200 OK but failed to read response body: %s", err)
} }

View file

@ -21,6 +21,11 @@ import (
"fmt" "fmt"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
@ -29,10 +34,6 @@ import (
"github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
) )
// OutputClientDataConsumer consumes events that originated in the client API server. // OutputClientDataConsumer consumes events that originated in the client API server.
@ -107,7 +108,8 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg)
"type": output.Type, "type": output.Type,
"room_id": output.RoomID, "room_id": output.RoomID,
log.ErrorKey: err, log.ErrorKey: err,
}).Panicf("could not save account data") }).Errorf("could not save account data")
return false
} }
if err = s.sendReadUpdate(ctx, userID, output); err != nil { if err = s.sendReadUpdate(ctx, userID, output); err != nil {

View file

@ -16,7 +16,7 @@ package routing
import ( import (
"encoding/json" "encoding/json"
"io/ioutil" "io"
"net/http" "net/http"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -88,7 +88,7 @@ func PutFilter(
var filter gomatrixserverlib.Filter var filter gomatrixserverlib.Filter
defer req.Body.Close() // nolint:errcheck defer req.Body.Close() // nolint:errcheck
body, err := ioutil.ReadAll(req.Body) body, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,

View file

@ -20,6 +20,10 @@ import (
"net/http" "net/http"
"sort" "sort"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -28,9 +32,6 @@ import (
"github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
type messagesReq struct { type messagesReq struct {
@ -262,7 +263,7 @@ func (m *messagesResp) applyLazyLoadMembers(
} }
} }
for _, evt := range membershipToUser { for _, evt := range membershipToUser {
m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync)) m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll))
} }
} }

View file

@ -58,7 +58,7 @@ const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_
(user_id, room_id, notification_count, highlight_count) (user_id, room_id, notification_count, highlight_count)
VALUES ($1, $2, $3, $4) VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, room_id) ON CONFLICT (user_id, room_id)
DO UPDATE SET notification_count = $3, highlight_count = $4 DO UPDATE SET id = nextval('syncapi_notification_data_id_seq'), notification_count = $3, highlight_count = $4
RETURNING id` RETURNING id`
const selectUserUnreadNotificationCountsSQL = `SELECT const selectUserUnreadNotificationCountsSQL = `SELECT

View file

@ -25,12 +25,14 @@ import (
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
) )
func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { func NewSqliteNotificationDataTable(db *sql.DB, streamID *StreamIDStatements) (tables.NotificationData, error) {
_, err := db.Exec(notificationDataSchema) _, err := db.Exec(notificationDataSchema)
if err != nil { if err != nil {
return nil, err return nil, err
} }
r := &notificationDataStatements{} r := &notificationDataStatements{
streamIDStatements: streamID,
}
return r, sqlutil.StatementList{ return r, sqlutil.StatementList{
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL}, {&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
{&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL}, {&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
@ -39,6 +41,7 @@ func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error)
} }
type notificationDataStatements struct { type notificationDataStatements struct {
streamIDStatements *StreamIDStatements
upsertRoomUnreadCounts *sql.Stmt upsertRoomUnreadCounts *sql.Stmt
selectUserUnreadCounts *sql.Stmt selectUserUnreadCounts *sql.Stmt
selectMaxID *sql.Stmt selectMaxID *sql.Stmt
@ -58,8 +61,7 @@ const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_
(user_id, room_id, notification_count, highlight_count) (user_id, room_id, notification_count, highlight_count)
VALUES ($1, $2, $3, $4) VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, room_id) ON CONFLICT (user_id, room_id)
DO UPDATE SET notification_count = $3, highlight_count = $4 DO UPDATE SET id = $5, notification_count = $6, highlight_count = $7`
RETURNING id`
const selectUserUnreadNotificationCountsSQL = `SELECT const selectUserUnreadNotificationCountsSQL = `SELECT
id, room_id, notification_count, highlight_count id, room_id, notification_count, highlight_count
@ -71,7 +73,11 @@ const selectUserUnreadNotificationCountsSQL = `SELECT
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data` const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) { func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos) pos, err = r.streamIDStatements.nextNotificationID(ctx, nil)
if err != nil {
return
}
_, err = r.upsertRoomUnreadCounts.ExecContext(ctx, userID, roomID, notificationCount, highlightCount, pos, notificationCount, highlightCount)
return return
} }

View file

@ -26,6 +26,8 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0)
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0) INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0)
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("notification", 0)
ON CONFLICT DO NOTHING;
` `
const increaseStreamIDStmt = "" + const increaseStreamIDStmt = "" +
@ -78,3 +80,9 @@ func (s *StreamIDStatements) nextPresenceID(ctx context.Context, txn *sql.Tx) (p
err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos) err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos)
return return
} }
func (s *StreamIDStatements) nextNotificationID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
err = increaseStmt.QueryRowContext(ctx, "notification").Scan(&pos)
return
}

View file

@ -95,7 +95,7 @@ func (d *SyncServerDatasource) prepare() (err error) {
if err != nil { if err != nil {
return err return err
} }
notificationData, err := NewSqliteNotificationDataTable(d.db) notificationData, err := NewSqliteNotificationDataTable(d.db, &d.streamID)
if err != nil { if err != nil {
return err return err
} }

View file

@ -12,9 +12,12 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"go.uber.org/atomic" "go.uber.org/atomic"
"github.com/matrix-org/dendrite/syncapi/notifier"
) )
// The max number of per-room goroutines to have running. // The max number of per-room goroutines to have running.
@ -34,6 +37,7 @@ type PDUStreamProvider struct {
// userID+deviceID -> lazy loading cache // userID+deviceID -> lazy loading cache
lazyLoadCache caching.LazyLoadCache lazyLoadCache caching.LazyLoadCache
rsAPI roomserverAPI.SyncRoomserverAPI rsAPI roomserverAPI.SyncRoomserverAPI
notifier *notifier.Notifier
} }
func (p *PDUStreamProvider) worker() { func (p *PDUStreamProvider) worker() {
@ -100,6 +104,15 @@ func (p *PDUStreamProvider) CompleteSync(
req.Log.WithError(err).Error("unable to update event filter with ignored users") req.Log.WithError(err).Error("unable to update event filter with ignored users")
} }
// Invalidate the lazyLoadCache, otherwise we end up with missing displaynames/avatars
// TODO: This might be inefficient, when joined to many and/or large rooms.
for _, roomID := range joinedRoomIDs {
joinedUsers := p.notifier.JoinedUsers(roomID)
for _, sharedUser := range joinedUsers {
p.lazyLoadCache.InvalidateLazyLoadedUser(req.Device, roomID, sharedUser)
}
}
// Build up a /sync response. Add joined rooms. // Build up a /sync response. Add joined rooms.
var reqMutex sync.Mutex var reqMutex sync.Mutex
var reqWaitGroup sync.WaitGroup var reqWaitGroup sync.WaitGroup

View file

@ -34,6 +34,7 @@ func NewSyncStreamProviders(
StreamProvider: StreamProvider{DB: d}, StreamProvider: StreamProvider{DB: d},
lazyLoadCache: lazyLoadCache, lazyLoadCache: lazyLoadCache,
rsAPI: rsAPI, rsAPI: rsAPI,
notifier: notifier,
}, },
TypingStreamProvider: &TypingStreamProvider{ TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d}, StreamProvider: StreamProvider{DB: d},

View file

@ -22,7 +22,6 @@ import (
"encoding/pem" "encoding/pem"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"math/big" "math/big"
"os" "os"
"strings" "strings"
@ -144,7 +143,7 @@ func NewTLSKeyWithAuthority(serverName, tlsKeyPath, tlsCertPath, authorityKeyPat
} }
// load the authority key // load the authority key
dat, err := ioutil.ReadFile(authorityKeyPath) dat, err := os.ReadFile(authorityKeyPath)
if err != nil { if err != nil {
return err return err
} }
@ -158,7 +157,7 @@ func NewTLSKeyWithAuthority(serverName, tlsKeyPath, tlsCertPath, authorityKeyPat
} }
// load the authority certificate // load the authority certificate
dat, err = ioutil.ReadFile(authorityCertPath) dat, err = os.ReadFile(authorityCertPath)
if err != nil { if err != nil {
return err return err
} }