Merge commit '7bf5b3d4a2f32aeb3b6ed199e725c4321918a09f'

This commit is contained in:
Brian Meek 2022-08-05 09:53:14 -07:00
commit 29b29c10b0
58 changed files with 301 additions and 250 deletions

View file

@ -21,8 +21,7 @@ As of October 2020 (current [progress below](#progress)), Dendrite has now enter
This does not mean:
- Dendrite is bug-free. It has not yet been battle-tested in the real world and so will be error prone initially.
- All of the CS/Federation APIs are implemented. We are tracking progress via a script called 'Are We Synapse Yet?'. In particular,
presence and push notifications are entirely missing from Dendrite. See [CHANGES.md](CHANGES.md) for updates.
- Dendrite is feature-complete. There may be client or federation APIs that are not implemented.
- Dendrite is ready for massive homeserver deployments. You cannot shard each microservice, only run each one on a different machine.
Currently, we expect Dendrite to function well for small (10s/100s of users) homeserver deployments as well as P2P Matrix nodes in-browser or on mobile devices.
@ -36,6 +35,9 @@ If you have further questions, please take a look at [our FAQ](docs/FAQ.md) or j
## Requirements
See the [Planning your Installation](https://matrix-org.github.io/dendrite/installation/planning) page for
more information on requirements.
To build Dendrite, you will need Go 1.18 or later.
For a usable federating Dendrite deployment, you will also need:
@ -83,11 +85,11 @@ $ ./bin/create-account --config dendrite.yaml -username alice
Then point your favourite Matrix client at `http://localhost:8008` or `https://localhost:8448`.
## <a id="progress"></a> Progress
## Progress
We use a script called Are We Synapse Yet which checks Sytest compliance rates. Sytest is a black-box homeserver
test rig with around 900 tests. The script works out how many of these tests are passing on Dendrite and it
updates with CI. As of April 2022 we're at around 83% CS API coverage and 95% Federation coverage, though check
updates with CI. As of August 2022 we're at around 83% CS API coverage and 95% Federation coverage, though check
CI for the latest numbers. In practice, this means you can communicate locally and via federation with Synapse
servers such as matrix.org reasonably well, although there are still some missing features (like Search).
@ -119,53 +121,8 @@ We would be grateful for any help on issues marked as
all have related Sytests which need to pass in order for the issue to be closed. Once you've written your
code, you can quickly run Sytest to ensure that the test names are now passing.
For example, if the test `Local device key changes get to remote servers` was marked as failing, find the
test file (e.g via `grep` or via the
[CI log output](https://buildkite.com/matrix-dot-org/dendrite/builds/2826#39cff5de-e032-4ad0-ad26-f819e6919c42)
it's `tests/50federation/40devicelists.pl` ) then to run Sytest:
```
docker run --rm --name sytest
-v "/Users/kegan/github/sytest:/sytest"
-v "/Users/kegan/github/dendrite:/src"
-v "/Users/kegan/logs:/logs"
-v "/Users/kegan/go/:/gopath"
-e "POSTGRES=1" -e "DENDRITE_TRACE_HTTP=1"
matrixdotorg/sytest-dendrite:latest tests/50federation/40devicelists.pl
```
See [sytest.md](docs/sytest.md) for the full description of these flags.
You can try running sytest outside of docker for faster runs, but the dependencies can be temperamental
and we recommend using docker where possible.
```
cd sytest
export PERL5LIB=$HOME/lib/perl5
export PERL_MB_OPT=--install_base=$HOME
export PERL_MM_OPT=INSTALL_BASE=$HOME
./install-deps.pl
./run-tests.pl -I Dendrite::Monolith -d $PATH_TO_DENDRITE_BINARIES
```
Sometimes Sytest is testing the wrong thing or is flakey, so it will need to be patched.
Ask on `#dendrite-dev:matrix.org` if you think this is the case for you and we'll be happy to help.
If you're new to the project, see [CONTRIBUTING.md](docs/CONTRIBUTING.md) to get up to speed then
If you're new to the project, see our
[Contributing page](https://matrix-org.github.io/dendrite/development/contributing) to get up to speed, then
look for [Good First Issues](https://github.com/matrix-org/dendrite/labels/good%20first%20issue). If you're
familiar with the project, look for [Help Wanted](https://github.com/matrix-org/dendrite/labels/help-wanted)
issues.
## Hardware requirements
Dendrite in Monolith + SQLite works in a range of environments including iOS and in-browser via WASM.
For small homeserver installations joined on ~10s rooms on matrix.org with ~100s of users in those rooms, including some
encrypted rooms:
- Memory: uses around 100MB of RAM, with peaks at around 200MB.
- Disk space: After a few months of usage, the database grew to around 2GB (in Monolith mode).
- CPU: Brief spikes when processing events, typically idles at 1% CPU.
This means Dendrite should comfortably work on things like Raspberry Pis.

View file

@ -13,4 +13,4 @@ go build ./cmd/...
./build/scripts/find-lint.sh
echo "Testing..."
go test -v ./...
go test --race -v ./...

View file

@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
@ -42,7 +41,7 @@ func LoginFromJSONReader(
userInteractiveAuth *UserInteractive,
cfg *config.ClientAPI,
) (*Login, LoginCleanupFunc, *util.JSONResponse) {
reqBytes, err := ioutil.ReadAll(r)
reqBytes, err := io.ReadAll(r)
if err != nil {
err := &util.JSONResponse{
Code: http.StatusBadRequest,

View file

@ -16,7 +16,7 @@ package httputil
import (
"encoding/json"
"io/ioutil"
"io"
"net/http"
"unicode/utf8"
@ -29,9 +29,9 @@ import (
func UnmarshalJSONRequest(req *http.Request, iface interface{}) *util.JSONResponse {
// encoding/json allows invalid utf-8, matrix does not
// https://matrix.org/docs/spec/client_server/r0.6.1#api-standards
body, err := ioutil.ReadAll(req.Body)
body, err := io.ReadAll(req.Body)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("ioutil.ReadAll failed")
util.GetLogger(req.Context()).WithError(err).Error("io.ReadAll failed")
resp := jsonerror.InternalServerError()
return &resp
}

View file

@ -17,7 +17,7 @@ package routing
import (
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"github.com/matrix-org/dendrite/clientapi/httputil"
@ -101,9 +101,9 @@ func SaveAccountData(
}
}
body, err := ioutil.ReadAll(req.Body)
body, err := io.ReadAll(req.Body)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("ioutil.ReadAll failed")
util.GetLogger(req.Context()).WithError(err).Error("io.ReadAll failed")
return jsonerror.InternalServerError()
}

View file

@ -1,7 +1,7 @@
package routing
import (
"io/ioutil"
"io"
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth"
@ -20,7 +20,7 @@ func Deactivate(
) util.JSONResponse {
ctx := req.Context()
defer req.Body.Close() // nolint:errcheck
bodyBytes, err := ioutil.ReadAll(req.Body)
bodyBytes, err := io.ReadAll(req.Body)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,

View file

@ -15,7 +15,7 @@
package routing
import (
"io/ioutil"
"io"
"net"
"net/http"
@ -175,7 +175,7 @@ func DeleteDeviceById(
}()
ctx := req.Context()
defer req.Body.Close() // nolint:errcheck
bodyBytes, err := ioutil.ReadAll(req.Body)
bodyBytes, err := io.ReadAll(req.Body)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,

View file

@ -23,13 +23,14 @@ import (
"strings"
"sync"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/api"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
var (
@ -196,14 +197,14 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO
// sliceInto returns a subslice of `slice` which honours the since/limit values given.
//
// 0 1 2 3 4 5 6 index
// [A, B, C, D, E, F, G] slice
// 0 1 2 3 4 5 6 index
// [A, B, C, D, E, F, G] slice
//
// limit=3 => A,B,C (prev='', next='3')
// limit=3&since=3 => D,E,F (prev='0', next='6')
// limit=3&since=6 => G (prev='3', next='')
// limit=3 => A,B,C (prev='', next='3')
// limit=3&since=3 => D,E,F (prev='0', next='6')
// limit=3&since=6 => G (prev='3', next='')
//
// A value of '-1' for prev/next indicates no position.
// A value of '-1' for prev/next indicates no position.
func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (subset []gomatrixserverlib.PublicRoom, prev, next int) {
prev = -1
next = -1

View file

@ -19,7 +19,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"net/url"
"regexp"
@ -388,7 +388,7 @@ func validateRecaptcha(
// Grab the body of the response from the captcha server
var r recaptchaResponse
body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return &util.JSONResponse{
Code: http.StatusGatewayTimeout,
@ -556,7 +556,7 @@ func Register(
cfg *config.ClientAPI,
) util.JSONResponse {
defer req.Body.Close() // nolint: errcheck
reqBody, err := ioutil.ReadAll(req.Body)
reqBody, err := io.ReadAll(req.Body)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,

View file

@ -2,7 +2,7 @@ package routing
import (
"bytes"
"io/ioutil"
"io"
"testing"
"github.com/patrickmn/go-cache"
@ -13,7 +13,7 @@ func TestSharedSecretRegister(t *testing.T) {
jsonStr := []byte(`{"admin":false,"mac":"f1ba8d37123866fd659b40de4bad9b0f8965c565","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice"}`)
sharedSecret := "dendritetest"
req, err := NewSharedSecretRegistrationRequest(ioutil.NopCloser(bytes.NewBuffer(jsonStr)))
req, err := NewSharedSecretRegistrationRequest(io.NopCloser(bytes.NewBuffer(jsonStr)))
if err != nil {
t.Fatalf("failed to read request: %s", err)
}

View file

@ -63,9 +63,10 @@ var sendEventDuration = prometheus.NewHistogramVec(
)
// SendEvent implements:
// /rooms/{roomID}/send/{eventType}
// /rooms/{roomID}/send/{eventType}/{txnID}
// /rooms/{roomID}/state/{eventType}/{stateKey}
//
// /rooms/{roomID}/send/{eventType}
// /rooms/{roomID}/send/{eventType}/{txnID}
// /rooms/{roomID}/state/{eventType}/{stateKey}
func SendEvent(
req *http.Request,
device *userapi.Device,

View file

@ -38,8 +38,9 @@ type threePIDsResponse struct {
}
// RequestEmailToken implements:
// POST /account/3pid/email/requestToken
// POST /register/email/requestToken
//
// POST /account/3pid/email/requestToken
// POST /register/email/requestToken
func RequestEmailToken(req *http.Request, threePIDAPI api.ClientUserAPI, cfg *config.ClientAPI) util.JSONResponse {
var body threepid.EmailAssociationRequest
if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil {

View file

@ -22,15 +22,17 @@ import (
"net/http"
"time"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/util"
)
// RequestTurnServer implements:
// GET /voip/turnServer
//
// GET /voip/turnServer
func RequestTurnServer(req *http.Request, device *api.Device, cfg *config.ClientAPI) util.JSONResponse {
turnConfig := cfg.TURN

View file

@ -19,7 +19,6 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"regexp"
"strings"
@ -157,7 +156,7 @@ func main() {
func getPassword(password, pwdFile string, pwdStdin bool, r io.Reader) (string, error) {
// read password from file
if pwdFile != "" {
pw, err := ioutil.ReadFile(pwdFile)
pw, err := os.ReadFile(pwdFile)
if err != nil {
return "", fmt.Errorf("Unable to read password from file: %v", err)
}
@ -166,7 +165,7 @@ func getPassword(password, pwdFile string, pwdStdin bool, r io.Reader) (string,
// read password from stdin
if pwdStdin {
data, err := ioutil.ReadAll(r)
data, err := io.ReadAll(r)
if err != nil {
return "", fmt.Errorf("Unable to read password from stdin: %v", err)
}

View file

@ -21,7 +21,6 @@ import (
"encoding/hex"
"flag"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
@ -76,11 +75,11 @@ func main() {
if pk, sk, err = ed25519.GenerateKey(nil); err != nil {
panic(err)
}
if err = ioutil.WriteFile(keyfile, sk, 0644); err != nil {
if err = os.WriteFile(keyfile, sk, 0644); err != nil {
panic(err)
}
} else if err == nil {
if sk, err = ioutil.ReadFile(keyfile); err != nil {
if sk, err = os.ReadFile(keyfile); err != nil {
panic(err)
}
if len(sk) != ed25519.PrivateKeySize {

View file

@ -20,7 +20,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"os"
@ -69,7 +68,7 @@ func Setup(instanceName, storageDirectory, peerURI string) (*Node, error) {
yggfile := fmt.Sprintf("%s/%s-yggdrasil.conf", storageDirectory, instanceName)
if _, err := os.Stat(yggfile); !os.IsNotExist(err) {
yggconf, e := ioutil.ReadFile(yggfile)
yggconf, e := os.ReadFile(yggfile)
if e != nil {
panic(err)
}
@ -88,7 +87,7 @@ func Setup(instanceName, storageDirectory, peerURI string) (*Node, error) {
if err != nil {
panic(err)
}
if e := ioutil.WriteFile(yggfile, j, 0600); e != nil {
if e := os.WriteFile(yggfile, j, 0600); e != nil {
n.log.Printf("Couldn't write private key to file '%s': %s\n", yggfile, e)
}

View file

@ -6,7 +6,7 @@ import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"io"
"log"
"net/http"
"os"
@ -47,7 +47,7 @@ const HEAD = "HEAD"
// We cannot use the dockerfile associated with the repo with each version sadly due to changes in
// Docker versions. Specifically, earlier Dendrite versions are incompatible with newer Docker clients
// due to the error:
// When using COPY with more than one source file, the destination must be a directory and end with a /
// When using COPY with more than one source file, the destination must be a directory and end with a /
// We need to run a postgres anyway, so use the dockerfile associated with Complement instead.
const Dockerfile = `FROM golang:1.18-stretch as build
RUN apt-get update && apt-get install -y postgresql
@ -95,7 +95,9 @@ CMD /build/run_dendrite.sh `
const dendriteUpgradeTestLabel = "dendrite_upgrade_test"
// downloadArchive downloads an arbitrary github archive of the form:
// https://github.com/matrix-org/dendrite/archive/v0.3.11.tar.gz
//
// https://github.com/matrix-org/dendrite/archive/v0.3.11.tar.gz
//
// and re-tarballs it without the top-level directory which contains branch information. It inserts
// the contents of `dockerfile` as a root file `Dockerfile` in the re-tarballed directory such that
// you can directly feed the retarballed archive to `ImageBuild` to have it run said dockerfile.
@ -126,7 +128,7 @@ func downloadArchive(cli *http.Client, tmpDir, archiveURL string, dockerfile []b
return nil, err
}
// add top level Dockerfile
err = ioutil.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm)
err = os.WriteFile(path.Join(tmpDir, "Dockerfile"), dockerfile, os.ModePerm)
if err != nil {
return nil, fmt.Errorf("failed to inject /Dockerfile: %w", err)
}
@ -148,7 +150,7 @@ func buildDendrite(httpClient *http.Client, dockerClient *client.Client, tmpDir,
if branchOrTagName == HEAD && *flagHead != "" {
log.Printf("%s: Using %s as HEAD", branchOrTagName, *flagHead)
// add top level Dockerfile
err = ioutil.WriteFile(path.Join(*flagHead, "Dockerfile"), []byte(Dockerfile), os.ModePerm)
err = os.WriteFile(path.Join(*flagHead, "Dockerfile"), []byte(Dockerfile), os.ModePerm)
if err != nil {
return "", fmt.Errorf("custom HEAD: failed to inject /Dockerfile: %w", err)
}
@ -386,7 +388,7 @@ func runImage(dockerClient *client.Client, volumeName, version, imageID string)
})
// ignore errors when cannot get logs, it's just for debugging anyways
if err == nil {
logbody, err := ioutil.ReadAll(logs)
logbody, err := io.ReadAll(logs)
if err == nil {
log.Printf("Container logs:\n\n%s\n\n", string(logbody))
}

View file

@ -18,9 +18,9 @@ type user struct {
}
// runTests performs the following operations:
// - register alice and bob with branch name muxed into the localpart
// - create a DM room for the 2 users and exchange messages
// - create/join a public #global room and exchange messages
// - register alice and bob with branch name muxed into the localpart
// - create a DM room for the 2 users and exchange messages
// - create/join a public #global room and exchange messages
func runTests(baseURL, branchName string) error {
// register 2 users
users := []user{

View file

@ -9,7 +9,6 @@ import (
"encoding/pem"
"flag"
"fmt"
"io/ioutil"
"net/url"
"os"
@ -30,7 +29,7 @@ func main() {
os.Exit(1)
}
data, err := ioutil.ReadFile(*requestKey)
data, err := os.ReadFile(*requestKey)
if err != nil {
panic(err)
}

View file

@ -188,13 +188,16 @@ client_api:
# TURN server information that this homeserver should send to clients.
turn:
turn_user_lifetime: ""
turn_user_lifetime: "5m"
turn_uris:
# - turn:turn.server.org?transport=udp
# - turn:turn.server.org?transport=tcp
turn_shared_secret: ""
turn_username: ""
turn_password: ""
# If your TURN server requires static credentials, then you will need to enter
# them here instead of supplying a shared secret. Note that these credentials
# will be visible to clients!
# turn_username: ""
# turn_password: ""
# Settings for rate-limited endpoints. Rate limiting kicks in after the threshold
# number of "slots" have been taken by requests from a specific host. Each "slot"

View file

@ -191,13 +191,16 @@ client_api:
# TURN server information that this homeserver should send to clients.
turn:
turn_user_lifetime: ""
turn_user_lifetime: "5m"
turn_uris:
# - turn:turn.server.org?transport=udp
# - turn:turn.server.org?transport=tcp
turn_shared_secret: ""
turn_username: ""
turn_password: ""
# If your TURN server requires static credentials, then you will need to enter
# them here instead of supplying a shared secret. Note that these credentials
# will be visible to clients!
# turn_username: ""
# turn_password: ""
# Settings for rate-limited endpoints. Rate limiting kicks in after the threshold
# number of "slots" have been taken by requests from a specific host. Each "slot"

View file

@ -64,7 +64,7 @@ comment. Please avoid doing this if you can.
We also have unit tests which we run via:
```bash
go test ./...
go test --race ./...
```
In general, we like submissions that come with tests. Anything that proves that the

View file

@ -208,9 +208,11 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rew
// joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event (including peeking ones)
// It is important to use the state at the event for sending messages because:
// 1) We shouldn't send messages to servers that weren't in the room.
// 2) If a server is kicked from the rooms it should still be told about the
// kick event,
//
// 1. We shouldn't send messages to servers that weren't in the room.
// 2. If a server is kicked from the rooms it should still be told about the
// kick event.
//
// Usually the list can be calculated locally, but sometimes it will need fetch
// events from the room server.
// Returns an error if there was a problem talking to the room server.

View file

@ -6,7 +6,7 @@ import (
"crypto/ed25519"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"os"
"testing"
@ -66,7 +66,7 @@ func TestMain(m *testing.M) {
s.cache = caching.NewRistrettoCache(8*1024*1024, time.Hour, false)
// Create a temporary directory for JetStream.
d, err := ioutil.TempDir("./", "jetstream*")
d, err := os.MkdirTemp("./", "jetstream*")
if err != nil {
panic(err)
}
@ -136,7 +136,7 @@ func (m *MockRoundTripper) RoundTrip(req *http.Request) (res *http.Response, err
// And respond.
res = &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader(body)),
Body: io.NopCloser(bytes.NewReader(body)),
}
return
}

View file

@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"testing"
"time"
@ -48,6 +49,7 @@ func (f *fedRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *rsapi.Que
// TODO: This struct isn't generic, only works for TestFederationAPIJoinThenKeyUpdate
type fedClient struct {
fedClientMutex sync.Mutex
api.FederationClient
allowJoins []*test.Room
keys map[gomatrixserverlib.ServerName]struct {
@ -59,6 +61,8 @@ type fedClient struct {
}
func (f *fedClient) GetServerKeys(ctx context.Context, matrixServer gomatrixserverlib.ServerName) (gomatrixserverlib.ServerKeys, error) {
f.fedClientMutex.Lock()
defer f.fedClientMutex.Unlock()
fmt.Println("GetServerKeys:", matrixServer)
var keys gomatrixserverlib.ServerKeys
var keyID gomatrixserverlib.KeyID
@ -122,6 +126,8 @@ func (f *fedClient) MakeJoin(ctx context.Context, s gomatrixserverlib.ServerName
return
}
func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res gomatrixserverlib.RespSendJoin, err error) {
f.fedClientMutex.Lock()
defer f.fedClientMutex.Unlock()
for _, r := range f.allowJoins {
if r.ID == event.RoomID() {
r.InsertEvent(f.t, event.Headered(r.Version))
@ -134,6 +140,8 @@ func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName
}
func (f *fedClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) {
f.fedClientMutex.Lock()
defer f.fedClientMutex.Unlock()
for _, edu := range t.EDUs {
if edu.Type == gomatrixserverlib.MDeviceListUpdate {
f.sentTxn = true
@ -242,6 +250,8 @@ func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) {
testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(500 * time.Millisecond)
fc.fedClientMutex.Lock()
defer fc.fedClientMutex.Unlock()
if !fc.sentTxn {
t.Fatalf("did not send device list update")
}

View file

@ -158,7 +158,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
oq, ok := oqs.queues[destination]
if !ok || oq != nil {
if !ok || oq == nil {
destinationQueueTotal.Inc()
oq = &destinationQueue{
queues: oqs,

View file

@ -21,13 +21,14 @@ import (
"sort"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
// MakeJoin implements the /make_join API
@ -435,13 +436,13 @@ func SendJoin(
// a restricted room join. If the room version does not support restricted
// joins then this function returns with no side effects. This returns three
// values:
// * an optional JSON response body (i.e. M_UNABLE_TO_AUTHORISE_JOIN) which
// should always be sent back to the client if one is specified
// * a user ID of an authorising user, typically a user that has power to
// issue invites in the room, if one has been found
// * an error if there was a problem finding out if this was allowable,
// like if the room version isn't known or a problem happened talking to
// the roomserver
// - an optional JSON response body (i.e. M_UNABLE_TO_AUTHORISE_JOIN) which
// should always be sent back to the client if one is specified
// - a user ID of an authorising user, typically a user that has power to
// issue invites in the room, if one has been found
// - an error if there was a problem finding out if this was allowable,
// like if the room version isn't known or a problem happened talking to
// the roomserver
func checkRestrictedJoin(
httpReq *http.Request,
rsAPI api.FederationRoomserverAPI,

View file

@ -14,6 +14,7 @@ type lazyLoadingCacheKey struct {
type LazyLoadCache interface {
StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string)
IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool)
InvalidateLazyLoadedUser(device *userapi.Device, roomID, userID string)
}
func (c Caches) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) {
@ -33,3 +34,12 @@ func (c Caches) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID st
TargetUserID: userID,
})
}
func (c Caches) InvalidateLazyLoadedUser(device *userapi.Device, roomID, userID string) {
c.LazyLoading.Unset(lazyLoadingCacheKey{
UserID: device.UserID,
DeviceID: device.ID,
RoomID: roomID,
TargetUserID: userID,
})
}

View file

@ -27,9 +27,10 @@ import (
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dugong"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/setup/config"
)
type utcFormatter struct {
@ -145,7 +146,7 @@ func setupFileHook(hook config.LogrusHook, level logrus.Level, componentName str
})
}
//CloseAndLogIfError Closes io.Closer and logs the error if any
// CloseAndLogIfError Closes io.Closer and logs the error if any
func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string) {
if closer == nil {
return

View file

@ -18,7 +18,7 @@
package internal
import (
"io/ioutil"
"io"
"log/syslog"
"github.com/MFAshby/stdemuxerhook"
@ -63,7 +63,7 @@ func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
setupStdLogHook(logrus.InfoLevel)
}
// Hooks are now configured for stdout/err, so throw away the default logger output
logrus.SetOutput(ioutil.Discard)
logrus.SetOutput(io.Discard)
}
func checkSyslogHookParams(params map[string]interface{}) {

View file

@ -22,12 +22,13 @@ import (
"sync"
"time"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api"
)
var (
@ -66,12 +67,14 @@ func init() {
// - We don't have unbounded growth in proportion to the number of servers (this is more important in a P2P world where
// we have many many servers)
// - We can adjust concurrency (at the cost of memory usage) by tuning N, to accommodate mobile devices vs servers.
//
// The downsides are that:
// - Query requests can get queued behind other servers if they hash to the same worker, even if there are other free
// workers elsewhere. Whilst suboptimal, provided we cap how long a single request can last (e.g using context timeouts)
// we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests
// (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse
// than being stuck behind foo.bar
//
// In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is
// set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried.
type DeviceListUpdater struct {

View file

@ -18,7 +18,7 @@ import (
"context"
"crypto/ed25519"
"fmt"
"io/ioutil"
"io"
"net/http"
"net/url"
"reflect"
@ -27,8 +27,9 @@ import (
"testing"
"time"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/keyserver/api"
)
var (
@ -202,7 +203,7 @@ func TestUpdateNoPrevID(t *testing.T) {
}
return &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(strings.NewReader(`
Body: io.NopCloser(strings.NewReader(`
{
"user_id": "` + remoteUserID + `",
"stream_id": 5,
@ -317,7 +318,7 @@ func TestDebounce(t *testing.T) {
// now send the response over federation
fedCh <- &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(strings.NewReader(`
Body: io.NopCloser(strings.NewReader(`
{
"user_id": "` + userID + `",
"stream_id": 5,

View file

@ -3,6 +3,7 @@ package storage_test
import (
"context"
"reflect"
"sync"
"testing"
"github.com/matrix-org/dendrite/keyserver/api"
@ -103,6 +104,9 @@ func TestKeyChangesUpperLimit(t *testing.T) {
})
}
var dbLock sync.Mutex
var deviceArray = []string{"AAA", "another_device"}
// The purpose of this test is to make sure that the storage layer is generating sequential stream IDs per user,
// and that they are returned correctly when querying for device keys.
func TestDeviceKeysStreamIDGeneration(t *testing.T) {
@ -169,8 +173,11 @@ func TestDeviceKeysStreamIDGeneration(t *testing.T) {
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID)
}
dbLock.Lock()
defer dbLock.Unlock()
// Querying for device keys returns the latest stream IDs
msgs, err = db.DeviceKeysForUser(ctx, alice, []string{"AAA", "another_device"}, false)
msgs, err = db.DeviceKeysForUser(ctx, alice, deviceArray, false)
if err != nil {
t.Fatalf("DeviceKeysForUser returned error: %s", err)
}

View file

@ -21,7 +21,6 @@ import (
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
@ -180,7 +179,7 @@ func createTempDir(baseDirectory config.Path) (types.Path, error) {
if err := os.MkdirAll(baseTmpDir, 0770); err != nil {
return "", fmt.Errorf("failed to create base temp dir: %w", err)
}
tmpDir, err := ioutil.TempDir(baseTmpDir, "")
tmpDir, err := os.MkdirTemp(baseTmpDir, "")
if err != nil {
return "", fmt.Errorf("failed to create temp dir: %w", err)
}

View file

@ -19,7 +19,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"mime"
"net/http"
"net/url"
@ -695,7 +694,7 @@ func (r *downloadRequest) GetContentLengthAndReader(contentLengthHeader string,
// We successfully parsed the Content-Length, so we'll return a limited
// reader that restricts us to reading only up to this size.
reader = ioutil.NopCloser(io.LimitReader(*body, parsedLength))
reader = io.NopCloser(io.LimitReader(*body, parsedLength))
contentLength = parsedLength
} else {
// Content-Length header is missing. If we have a maximum file size
@ -704,7 +703,7 @@ func (r *downloadRequest) GetContentLengthAndReader(contentLengthHeader string,
// ultimately it will get rewritten later when the temp file is written
// to disk.
if maxFileSizeBytes > 0 {
reader = ioutil.NopCloser(io.LimitReader(*body, int64(maxFileSizeBytes)))
reader = io.NopCloser(io.LimitReader(*body, int64(maxFileSizeBytes)))
}
contentLength = 0
}

View file

@ -25,6 +25,11 @@ import (
"github.com/Arceliar/phony"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
fedapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api"
@ -35,10 +40,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
// Inputer is responsible for consuming from the roomserver input
@ -60,9 +61,9 @@ import (
// per-room durable consumers will only progress through the stream
// as events are processed.
//
// A BC * -> positions of each consumer (* = ephemeral)
// ⌄ ⌄⌄ ⌄
// ABAABCAABCAA -> newest (letter = subject for each message)
// A BC * -> positions of each consumer (* = ephemeral)
// ⌄ ⌄⌄ ⌄
// ABAABCAABCAA -> newest (letter = subject for each message)
//
// In this example, A is still processing an event but has two
// pending events to process afterwards. Both B and C are caught

View file

@ -20,32 +20,32 @@ import (
"context"
"fmt"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
)
// updateLatestEvents updates the list of latest events for this room in the database and writes the
// event to the output log.
// The latest events are the events that aren't referenced by another event in the database:
//
// Time goes down the page. 1 is the m.room.create event (root).
//
// 1 After storing 1 the latest events are {1}
// | After storing 2 the latest events are {2}
// 2 After storing 3 the latest events are {3}
// / \ After storing 4 the latest events are {3,4}
// 3 4 After storing 5 the latest events are {5,4}
// | | After storing 6 the latest events are {5,6}
// 5 6 <--- latest After storing 7 the latest events are {6,7}
// |
// 7 <----- latest
// Time goes down the page. 1 is the m.room.create event (root).
// 1 After storing 1 the latest events are {1}
// | After storing 2 the latest events are {2}
// 2 After storing 3 the latest events are {3}
// / \ After storing 4 the latest events are {3,4}
// 3 4 After storing 5 the latest events are {5,4}
// | | After storing 6 the latest events are {5,6}
// 5 6 <--- latest After storing 7 the latest events are {6,7}
// |
// 7 <----- latest
//
// Can only be called once at a time
func (r *Inputer) updateLatestEvents(

View file

@ -19,6 +19,10 @@ import (
"fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
@ -26,9 +30,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
// the max number of servers to backfill from per request. If this is too low we may fail to backfill when
@ -522,8 +523,9 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion,
}
// joinEventsFromHistoryVisibility returns all CURRENTLY joined members if our server can read the room history
//
// TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just
// pull all events and then filter by that table.
// pull all events and then filter by that table.
func joinEventsFromHistoryVisibility(
ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry,
thisServer gomatrixserverlib.ServerName) ([]types.Event, error) {

View file

@ -5,9 +5,10 @@ import (
"database/sql"
"errors"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/roomserver/types"
)
var OptimisationNotSupportedError = errors.New("optimisation not supported")
@ -178,7 +179,7 @@ type StrippedEvent struct {
}
// ExtractContentValue from the given state event. For example, given an m.room.name event with:
// content: { name: "Foo" }
// content: { name: "Foo" }
// this returns "Foo".
func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string {
content := ev.Content()

View file

@ -17,7 +17,7 @@ main() {
if [ -d ../sytest ]; then
local tmpdir
tmpdir="$(mktemp -d --tmpdir run-systest.XXXXXXXXXX)"
tmpdir="$(mktemp -d -t run-systest.XXXXXXXXXX)"
trap "rm -r '$tmpdir'" EXIT
if [ -z "$DISABLE_BUILDING_SYTEST" ]; then

View file

@ -19,8 +19,8 @@ import (
"encoding/pem"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"regexp"
"strings"
@ -192,7 +192,7 @@ type ConfigErrors []string
// Load a yaml config file for a server run as multiple processes or as a monolith.
// Checks the config to ensure that it is valid.
func Load(configPath string, monolith bool) (*Dendrite, error) {
configData, err := ioutil.ReadFile(configPath)
configData, err := os.ReadFile(configPath)
if err != nil {
return nil, err
}
@ -200,9 +200,9 @@ func Load(configPath string, monolith bool) (*Dendrite, error) {
if err != nil {
return nil, err
}
// Pass the current working directory and ioutil.ReadFile so that they can
// Pass the current working directory and os.ReadFile so that they can
// be mocked in the tests
return loadConfig(basePath, configData, ioutil.ReadFile, monolith)
return loadConfig(basePath, configData, os.ReadFile, monolith)
}
func loadConfig(
@ -541,7 +541,7 @@ func (config *Dendrite) KeyServerURL() string {
// SetupTracing configures the opentracing using the supplied configuration.
func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) {
if !config.Tracing.Enabled {
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
return io.NopCloser(bytes.NewReader([]byte{})), nil
}
return config.Tracing.Jaeger.InitGlobalTracer(
serviceName,

View file

@ -16,7 +16,7 @@ package config
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strings"
@ -181,7 +181,7 @@ func loadAppServices(config *AppServiceAPI, derived *Derived) error {
}
// Read the application service's config file
configData, err := ioutil.ReadFile(absPath)
configData, err := os.ReadFile(absPath)
if err != nil {
return err
}

View file

@ -14,16 +14,16 @@ import (
"github.com/sirupsen/logrus"
natsserver "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
natsclient "github.com/nats-io/nats.go"
)
type NATSInstance struct {
*natsserver.Server
sync.Mutex
}
func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
var natsLock sync.Mutex
func DeleteAllStreams(js natsclient.JetStreamContext, cfg *config.JetStream) {
for _, stream := range streams { // streams are defined in streams.go
name := cfg.Prefixed(stream.Name)
_ = js.DeleteStream(name)
@ -31,11 +31,12 @@ func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
}
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
natsLock.Lock()
defer natsLock.Unlock()
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
return setupNATS(process, cfg, nil)
}
s.Lock()
if s.Server == nil {
var err error
s.Server, err = natsserver.NewServer(&natsserver.Options{
@ -63,7 +64,6 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
process.ComponentFinished()
}()
}
s.Unlock()
if !s.ReadyForConnections(time.Second * 10) {
logrus.Fatalln("NATS did not start in time")
}
@ -77,9 +77,9 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
if nc == nil {
var err error
opts := []nats.Option{}
opts := []natsclient.Option{}
if cfg.DisableTLSValidation {
opts = append(opts, nats.Secure(&tls.Config{
opts = append(opts, natsclient.Secure(&tls.Config{
InsecureSkipVerify: true,
}))
}

View file

@ -7,7 +7,7 @@ import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
"io/ioutil"
"io"
"net/http"
"sort"
"strings"
@ -15,6 +15,8 @@ import (
"time"
"github.com/gorilla/mux"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/hooks"
"github.com/matrix-org/dendrite/internal/httputil"
roomserver "github.com/matrix-org/dendrite/roomserver/api"
@ -22,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/mscs/msc2836"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
var (
@ -32,15 +33,17 @@ var (
)
// Basic sanity check of MSC2836 logic. Injects a thread that looks like:
// A
// |
// B
// / \
// C D
// /|\
// E F G
// |
// H
//
// A
// |
// B
// / \
// C D
// /|\
// E F G
// |
// H
//
// And makes sure POST /event_relationships works with various parameters
func TestMSC2836(t *testing.T) {
alice := "@alice:localhost"
@ -425,12 +428,12 @@ func postRelationships(t *testing.T, expectCode int, accessToken string, req *ms
t.Fatalf("failed to do request: %s", err)
}
if res.StatusCode != expectCode {
body, _ := ioutil.ReadAll(res.Body)
body, _ := io.ReadAll(res.Body)
t.Fatalf("wrong response code, got %d want %d - body: %s", res.StatusCode, expectCode, string(body))
}
if res.StatusCode == 200 {
var result msc2836.EventRelationshipResponse
body, err := ioutil.ReadAll(res.Body)
body, err := io.ReadAll(res.Body)
if err != nil {
t.Fatalf("response 200 OK but failed to read response body: %s", err)
}

View file

@ -21,6 +21,11 @@ import (
"fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@ -29,10 +34,6 @@ import (
"github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
// OutputClientDataConsumer consumes events that originated in the client API server.
@ -107,7 +108,8 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg)
"type": output.Type,
"room_id": output.RoomID,
log.ErrorKey: err,
}).Panicf("could not save account data")
}).Errorf("could not save account data")
return false
}
if err = s.sendReadUpdate(ctx, userID, output); err != nil {

View file

@ -6,12 +6,13 @@ import (
"sort"
"testing"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
var (
@ -364,13 +365,14 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
// tests that joining/leaving the SAME room puts users in `left` if the final state is leave.
// NB: Consider the case:
// - Alice and Bob are in a room.
// - Alice goes offline, Charlie joins, sends encrypted messages then leaves the room.
// - Alice comes back online. Technically nothing has changed in the set of users between those two points in time,
// it's still just (Alice,Bob) but then we won't be tracking Charlie -- is this okay though? It's device keys
// which are only relevant when actively sending events I think? And if Alice does need the keys she knows
// charlie's (user_id, device_id) so can just hit /keys/query - no need to keep updated about it because she
// doesn't share any rooms with him.
// - Alice and Bob are in a room.
// - Alice goes offline, Charlie joins, sends encrypted messages then leaves the room.
// - Alice comes back online. Technically nothing has changed in the set of users between those two points in time,
// it's still just (Alice,Bob) but then we won't be tracking Charlie -- is this okay though? It's device keys
// which are only relevant when actively sending events I think? And if Alice does need the keys she knows
// charlie's (user_id, device_id) so can just hit /keys/query - no need to keep updated about it because she
// doesn't share any rooms with him.
//
// Ergo, we put them in `left` as it is simpler.
func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
newShareUser := "@berta:localhost"

View file

@ -16,16 +16,17 @@ package routing
import (
"encoding/json"
"io/ioutil"
"io"
"net/http"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/tidwall/gjson"
)
// GetFilter implements GET /_matrix/client/r0/user/{userId}/filter/{filterId}
@ -65,7 +66,9 @@ type filterResponse struct {
FilterID string `json:"filter_id"`
}
//PutFilter implements POST /_matrix/client/r0/user/{userId}/filter
// PutFilter implements
//
// POST /_matrix/client/r0/user/{userId}/filter
func PutFilter(
req *http.Request, device *api.Device, syncDB storage.Database, userID string,
) util.JSONResponse {
@ -85,7 +88,7 @@ func PutFilter(
var filter gomatrixserverlib.Filter
defer req.Body.Close() // nolint:errcheck
body, err := ioutil.ReadAll(req.Body)
body, err := io.ReadAll(req.Body)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,

View file

@ -20,6 +20,10 @@ import (
"net/http"
"sort"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/api"
@ -28,9 +32,6 @@ import (
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
type messagesReq struct {
@ -262,7 +263,7 @@ func (m *messagesResp) applyLazyLoadMembers(
}
}
for _, evt := range membershipToUser {
m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync))
m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll))
}
}

View file

@ -58,7 +58,7 @@ const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_
(user_id, room_id, notification_count, highlight_count)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, room_id)
DO UPDATE SET notification_count = $3, highlight_count = $4
DO UPDATE SET id = nextval('syncapi_notification_data_id_seq'), notification_count = $3, highlight_count = $4
RETURNING id`
const selectUserUnreadNotificationCountsSQL = `SELECT

View file

@ -25,12 +25,14 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
)
func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) {
func NewSqliteNotificationDataTable(db *sql.DB, streamID *StreamIDStatements) (tables.NotificationData, error) {
_, err := db.Exec(notificationDataSchema)
if err != nil {
return nil, err
}
r := &notificationDataStatements{}
r := &notificationDataStatements{
streamIDStatements: streamID,
}
return r, sqlutil.StatementList{
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
{&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
@ -39,6 +41,7 @@ func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error)
}
type notificationDataStatements struct {
streamIDStatements *StreamIDStatements
upsertRoomUnreadCounts *sql.Stmt
selectUserUnreadCounts *sql.Stmt
selectMaxID *sql.Stmt
@ -58,8 +61,7 @@ const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_
(user_id, room_id, notification_count, highlight_count)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, room_id)
DO UPDATE SET notification_count = $3, highlight_count = $4
RETURNING id`
DO UPDATE SET id = $5, notification_count = $6, highlight_count = $7`
const selectUserUnreadNotificationCountsSQL = `SELECT
id, room_id, notification_count, highlight_count
@ -71,7 +73,11 @@ const selectUserUnreadNotificationCountsSQL = `SELECT
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
pos, err = r.streamIDStatements.nextNotificationID(ctx, nil)
if err != nil {
return
}
_, err = r.upsertRoomUnreadCounts.ExecContext(ctx, userID, roomID, notificationCount, highlightCount, pos, notificationCount, highlightCount)
return
}

View file

@ -26,6 +26,8 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("notification", 0)
ON CONFLICT DO NOTHING;
`
const increaseStreamIDStmt = "" +
@ -78,3 +80,9 @@ func (s *StreamIDStatements) nextPresenceID(ctx context.Context, txn *sql.Tx) (p
err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos)
return
}
func (s *StreamIDStatements) nextNotificationID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
err = increaseStmt.QueryRowContext(ctx, "notification").Scan(&pos)
return
}

View file

@ -95,7 +95,7 @@ func (d *SyncServerDatasource) prepare() (err error) {
if err != nil {
return err
}
notificationData, err := NewSqliteNotificationDataTable(d.db)
notificationData, err := NewSqliteNotificationDataTable(d.db, &d.streamID)
if err != nil {
return err
}

View file

@ -18,10 +18,11 @@ import (
"context"
"database/sql"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type AccountData interface {
@ -122,12 +123,14 @@ type CurrentRoomState interface {
//
// We persist the previous event IDs as well, one per row, so when we do fetch even
// earlier events we can simply delete rows which referenced it. Consider the graph:
// A
// | Event C has 1 prev_event ID: A.
// B C
// |___| Event D has 2 prev_event IDs: B and C.
// |
// D
//
// A
// | Event C has 1 prev_event ID: A.
// B C
// |___| Event D has 2 prev_event IDs: B and C.
// |
// D
//
// The earliest known event we have is D, so this table has 2 rows.
// A backfill request gives us C but not B. We delete rows where prev_event=C. This
// still means that D is a backwards extremity as we do not have event B. However, event

View file

@ -12,9 +12,12 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
"go.uber.org/atomic"
"github.com/matrix-org/dendrite/syncapi/notifier"
)
// The max number of per-room goroutines to have running.
@ -34,6 +37,7 @@ type PDUStreamProvider struct {
// userID+deviceID -> lazy loading cache
lazyLoadCache caching.LazyLoadCache
rsAPI roomserverAPI.SyncRoomserverAPI
notifier *notifier.Notifier
}
func (p *PDUStreamProvider) worker() {
@ -100,6 +104,15 @@ func (p *PDUStreamProvider) CompleteSync(
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}
// Invalidate the lazyLoadCache, otherwise we end up with missing displaynames/avatars
// TODO: This might be inefficient, when joined to many and/or large rooms.
for _, roomID := range joinedRoomIDs {
joinedUsers := p.notifier.JoinedUsers(roomID)
for _, sharedUser := range joinedUsers {
p.lazyLoadCache.InvalidateLazyLoadedUser(req.Device, roomID, sharedUser)
}
}
// Build up a /sync response. Add joined rooms.
var reqMutex sync.Mutex
var reqWaitGroup sync.WaitGroup

View file

@ -34,6 +34,7 @@ func NewSyncStreamProviders(
StreamProvider: StreamProvider{DB: d},
lazyLoadCache: lazyLoadCache,
rsAPI: rsAPI,
notifier: notifier,
},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},

View file

@ -12,10 +12,13 @@ import (
)
type dummyPublisher struct {
lock sync.Mutex
count int
}
func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, statusMsg *string) error {
d.lock.Lock()
defer d.lock.Unlock()
d.count++
return nil
}
@ -125,11 +128,15 @@ func TestRequestPool_updatePresence(t *testing.T) {
go rp.cleanPresence(db, time.Millisecond*50)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
publisher.lock.Lock()
beforeCount := publisher.count
publisher.lock.Unlock()
rp.updatePresence(db, tt.args.presence, tt.args.userID)
publisher.lock.Lock()
if tt.wantIncrease && publisher.count <= beforeCount {
t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount)
}
publisher.lock.Unlock()
time.Sleep(tt.args.sleep)
})
}

View file

@ -22,7 +22,6 @@ import (
"encoding/pem"
"errors"
"fmt"
"io/ioutil"
"math/big"
"os"
"strings"
@ -144,7 +143,7 @@ func NewTLSKeyWithAuthority(serverName, tlsKeyPath, tlsCertPath, authorityKeyPat
}
// load the authority key
dat, err := ioutil.ReadFile(authorityKeyPath)
dat, err := os.ReadFile(authorityKeyPath)
if err != nil {
return err
}
@ -158,7 +157,7 @@ func NewTLSKeyWithAuthority(serverName, tlsKeyPath, tlsCertPath, authorityKeyPat
}
// load the authority certificate
dat, err = ioutil.ReadFile(authorityCertPath)
dat, err = os.ReadFile(authorityCertPath)
if err != nil {
return err
}

View file

@ -20,13 +20,14 @@ import (
"strings"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
const userDailyVisitsSchema = `
@ -297,11 +298,10 @@ func (s *statsStatements) monthlyUsers(ctx context.Context, txn *sql.Tx) (result
return
}
/* R30Users counts the number of 30 day retained users, defined as:
- Users who have created their accounts more than 30 days ago
- Where last seen at most 30 days ago
- Where account creation and last_seen are > 30 days apart
*/
// R30Users counts the number of 30 day retained users, defined as:
// - Users who have created their accounts more than 30 days ago
// - Where last seen at most 30 days ago
// - Where account creation and last_seen are > 30 days apart
func (s *statsStatements) r30Users(ctx context.Context, txn *sql.Tx) (map[string]int64, error) {
stmt := sqlutil.TxStmt(txn, s.countR30UsersStmt)
lastSeenAfter := time.Now().AddDate(0, 0, -30)
@ -334,7 +334,8 @@ func (s *statsStatements) r30Users(ctx context.Context, txn *sql.Tx) (map[string
return result, rows.Err()
}
/* R30UsersV2 counts the number of 30 day retained users, defined as users that:
/*
R30UsersV2 counts the number of 30 day retained users, defined as users that:
- Appear more than once in the past 60 days
- Have more than 30 days between the most and least recent appearances that occurred in the past 60 days.
*/