Merge branch 'main' into patch-1

This commit is contained in:
kegsay 2023-01-19 19:04:43 +00:00 committed by GitHub
commit 71ba306c0f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1164 additions and 349 deletions

View file

@ -73,3 +73,41 @@ jobs:
path: |
/logs/results.tap
/logs/**/*.log*
element_web:
timeout-minutes: 120
runs-on: ubuntu-latest
steps:
- uses: tecolicom/actions-use-apt-tools@v1
with:
# Our test suite includes some screenshot tests with unusual diacritics, which are
# supposed to be covered by STIXGeneral.
tools: fonts-stix
- uses: actions/checkout@v2
with:
repository: matrix-org/matrix-react-sdk
- uses: actions/setup-node@v3
with:
cache: 'yarn'
- name: Fetch layered build
run: scripts/ci/layered.sh
- name: Copy config
run: cp element.io/develop/config.json config.json
working-directory: ./element-web
- name: Build
env:
CI_PACKAGE: true
run: yarn build
working-directory: ./element-web
- name: Edit Test Config
run: |
sed -i '/HOMESERVER/c\ HOMESERVER: "dendrite",' cypress.config.ts
- name: "Run cypress tests"
uses: cypress-io/github-action@v4.1.1
with:
browser: chrome
start: npx serve -p 8080 ./element-web/webapp
wait-on: 'http://localhost:8080'
env:
PUPPETEER_SKIP_CHROMIUM_DOWNLOAD: true
TMPDIR: ${{ runner.temp }}

View file

@ -1,5 +1,25 @@
# Changelog
## Dendrite 0.10.9 (2023-01-17)
### Features
* Stale device lists are now cleaned up on startup, removing entries for users the server doesn't share a room with anymore
* Dendrite now has its own Helm chart
* Guest access is now handled correctly (disallow joins, kick guests on revocation of guest access, as well as over federation)
### Fixes
* Push rules have seen several tweaks and fixes, which should, for example, fix notifications for `m.read_receipts`
* Outgoing presence will now correctly be sent to newly joined hosts
* Fixes the `/_dendrite/admin/resetPassword/{userID}` admin endpoint to use the correct variable
* Federated backfilling for medium/large rooms has been fixed
* `/login` causing wrong device list updates has been resolved
* `/sync` should now return the correct room summary heroes
* The default config options for `recaptcha_sitekey_class` and `recaptcha_form_field` are now set correctly
* `/messages` now omits empty `state` to be more spec compliant (contributed by [handlerug](https://github.com/handlerug))
* `/sync` has been optimised to only query state events for history visibility if they are really needed
## Dendrite 0.10.8 (2022-11-29)
### Features

View file

@ -22,6 +22,7 @@ import (
"testing"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/setup/config"
uapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@ -47,7 +48,7 @@ func TestLoginFromJSONReader(t *testing.T) {
"password": "herpassword",
"device_id": "adevice"
}`,
WantUsername: "alice",
WantUsername: "@alice:example.com",
WantDeviceID: "adevice",
},
{
@ -174,7 +175,7 @@ func (ua *fakeUserInternalAPI) QueryAccountByPassword(ctx context.Context, req *
return nil
}
res.Exists = true
res.Account = &uapi.Account{}
res.Account = &uapi.Account{UserID: userutil.MakeUserID(req.Localpart, req.ServerName)}
return nil
}

View file

@ -101,6 +101,8 @@ func (t *LoginTypePassword) Login(ctx context.Context, req interface{}) (*Login,
}
}
// If we couldn't find the user by the lower cased localpart, try the provided
// localpart as is.
if !res.Exists {
err = t.GetAccountByPassword(ctx, &api.QueryAccountByPasswordRequest{
Localpart: localpart,
@ -122,5 +124,8 @@ func (t *LoginTypePassword) Login(ctx context.Context, req interface{}) (*Login,
}
}
}
// Set the user, so login.Username() can do the right thing
r.Identifier.User = res.Account.UserID
r.User = res.Account.UserID
return &r.Login, nil
}

View file

@ -0,0 +1,152 @@
package routing
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/dendrite/userapi"
uapi "github.com/matrix-org/dendrite/userapi/api"
)
func TestLogin(t *testing.T) {
aliceAdmin := test.NewUser(t, test.WithAccountType(uapi.AccountTypeAdmin))
bobUser := &test.User{ID: "@bob:test", AccountType: uapi.AccountTypeUser}
charlie := &test.User{ID: "@Charlie:test", AccountType: uapi.AccountTypeUser}
vhUser := &test.User{ID: "@vhuser:vh1"}
ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
base.Cfg.ClientAPI.RateLimiting.Enabled = false
// add a vhost
base.Cfg.Global.VirtualHosts = append(base.Cfg.Global.VirtualHosts, &config.VirtualHost{
SigningIdentity: gomatrixserverlib.SigningIdentity{ServerName: "vh1"},
})
rsAPI := roomserver.NewInternalAPI(base)
// Needed for /login
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, nil, rsAPI)
userAPI := userapi.NewInternalAPI(base, &base.Cfg.UserAPI, nil, keyAPI, rsAPI, nil)
keyAPI.SetUserAPI(userAPI)
// We mostly need the userAPI for this test, so nil for other APIs/caches etc.
Setup(base, &base.Cfg.ClientAPI, nil, nil, userAPI, nil, nil, nil, nil, nil, keyAPI, nil, &base.Cfg.MSCs, nil)
// Create password
password := util.RandomString(8)
// create the users
for _, u := range []*test.User{aliceAdmin, bobUser, vhUser, charlie} {
localpart, serverName, _ := gomatrixserverlib.SplitID('@', u.ID)
userRes := &uapi.PerformAccountCreationResponse{}
if err := userAPI.PerformAccountCreation(ctx, &uapi.PerformAccountCreationRequest{
AccountType: u.AccountType,
Localpart: localpart,
ServerName: serverName,
Password: password,
}, userRes); err != nil {
t.Errorf("failed to create account: %s", err)
}
if !userRes.AccountCreated {
t.Fatalf("account not created")
}
}
testCases := []struct {
name string
userID string
wantOK bool
}{
{
name: "aliceAdmin can login",
userID: aliceAdmin.ID,
wantOK: true,
},
{
name: "bobUser can login",
userID: bobUser.ID,
wantOK: true,
},
{
name: "vhuser can login",
userID: vhUser.ID,
wantOK: true,
},
{
name: "bob with uppercase can login",
userID: "@Bob:test",
wantOK: true,
},
{
name: "Charlie can login (existing uppercase)",
userID: charlie.ID,
wantOK: true,
},
{
name: "Charlie can not login with lowercase userID",
userID: strings.ToLower(charlie.ID),
wantOK: false,
},
}
ctx := context.Background()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
req := test.NewRequest(t, http.MethodPost, "/_matrix/client/v3/login", test.WithJSONBody(t, map[string]interface{}{
"type": authtypes.LoginTypePassword,
"identifier": map[string]interface{}{
"type": "m.id.user",
"user": tc.userID,
},
"password": password,
}))
rec := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(rec, req)
if tc.wantOK && rec.Code != http.StatusOK {
t.Fatalf("failed to login: %s", rec.Body.String())
}
t.Logf("Response: %s", rec.Body.String())
// get the response
resp := loginResponse{}
if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil {
t.Fatal(err)
}
// everything OK
if !tc.wantOK && resp.AccessToken == "" {
return
}
if tc.wantOK && resp.AccessToken == "" {
t.Fatalf("expected accessToken after successful login but got none: %+v", resp)
}
devicesResp := &uapi.QueryDevicesResponse{}
if err := userAPI.QueryDevices(ctx, &uapi.QueryDevicesRequest{UserID: resp.UserID}, devicesResp); err != nil {
t.Fatal(err)
}
for _, dev := range devicesResp.Devices {
// We expect the userID on the device to be the same as resp.UserID
if dev.UserID != resp.UserID {
t.Fatalf("unexpected userID on device: %s", dev.UserID)
}
}
})
}
})
}

View file

@ -780,7 +780,7 @@ func handleApplicationServiceRegistration(
// Don't need to worry about appending to registration stages as
// application service registration is entirely separate.
return completeRegistration(
req.Context(), userAPI, r.Username, r.ServerName, "", appserviceID, req.RemoteAddr,
req.Context(), userAPI, r.Username, r.ServerName, "", "", appserviceID, req.RemoteAddr,
req.UserAgent(), r.Auth.Session, r.InhibitLogin, r.InitialDisplayName, r.DeviceID,
userapi.AccountTypeAppService,
)
@ -800,7 +800,7 @@ func checkAndCompleteFlow(
if checkFlowCompleted(flow, cfg.Derived.Registration.Flows) {
// This flow was completed, registration can continue
return completeRegistration(
req.Context(), userAPI, r.Username, r.ServerName, r.Password, "", req.RemoteAddr,
req.Context(), userAPI, r.Username, r.ServerName, "", r.Password, "", req.RemoteAddr,
req.UserAgent(), sessionID, r.InhibitLogin, r.InitialDisplayName, r.DeviceID,
userapi.AccountTypeUser,
)
@ -824,10 +824,10 @@ func checkAndCompleteFlow(
func completeRegistration(
ctx context.Context,
userAPI userapi.ClientUserAPI,
username string, serverName gomatrixserverlib.ServerName,
username string, serverName gomatrixserverlib.ServerName, displayName string,
password, appserviceID, ipAddr, userAgent, sessionID string,
inhibitLogin eventutil.WeakBoolean,
displayName, deviceID *string,
deviceDisplayName, deviceID *string,
accType userapi.AccountType,
) util.JSONResponse {
if username == "" {
@ -887,12 +887,28 @@ func completeRegistration(
}
}
if displayName != "" {
nameReq := userapi.PerformUpdateDisplayNameRequest{
Localpart: username,
ServerName: serverName,
DisplayName: displayName,
}
var nameRes userapi.PerformUpdateDisplayNameResponse
err = userAPI.SetDisplayName(ctx, &nameReq, &nameRes)
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: jsonerror.Unknown("failed to set display name: " + err.Error()),
}
}
}
var devRes userapi.PerformDeviceCreationResponse
err = userAPI.PerformDeviceCreation(ctx, &userapi.PerformDeviceCreationRequest{
Localpart: username,
ServerName: serverName,
AccessToken: token,
DeviceDisplayName: displayName,
DeviceDisplayName: deviceDisplayName,
DeviceID: deviceID,
IPAddr: ipAddr,
UserAgent: userAgent,
@ -1077,5 +1093,5 @@ func handleSharedSecretRegistration(cfg *config.ClientAPI, userAPI userapi.Clien
if ssrr.Admin {
accType = userapi.AccountTypeAdmin
}
return completeRegistration(req.Context(), userAPI, ssrr.User, cfg.Matrix.ServerName, ssrr.Password, "", req.RemoteAddr, req.UserAgent(), "", false, &ssrr.User, &deviceID, accType)
return completeRegistration(req.Context(), userAPI, ssrr.User, cfg.Matrix.ServerName, ssrr.DisplayName, ssrr.Password, "", req.RemoteAddr, req.UserAgent(), "", false, &ssrr.User, &deviceID, accType)
}

View file

@ -18,12 +18,13 @@ import (
)
type SharedSecretRegistrationRequest struct {
User string `json:"username"`
Password string `json:"password"`
Nonce string `json:"nonce"`
MacBytes []byte
MacStr string `json:"mac"`
Admin bool `json:"admin"`
User string `json:"username"`
Password string `json:"password"`
Nonce string `json:"nonce"`
MacBytes []byte
MacStr string `json:"mac"`
Admin bool `json:"admin"`
DisplayName string `json:"displayname,omitempty"`
}
func NewSharedSecretRegistrationRequest(reader io.ReadCloser) (*SharedSecretRegistrationRequest, error) {

View file

@ -10,7 +10,7 @@ import (
func TestSharedSecretRegister(t *testing.T) {
// these values have come from a local synapse instance to ensure compatibility
jsonStr := []byte(`{"admin":false,"mac":"f1ba8d37123866fd659b40de4bad9b0f8965c565","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice"}`)
jsonStr := []byte(`{"admin":false,"mac":"f1ba8d37123866fd659b40de4bad9b0f8965c565","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`)
sharedSecret := "dendritetest"
req, err := NewSharedSecretRegistrationRequest(io.NopCloser(bytes.NewBuffer(jsonStr)))

View file

@ -18,6 +18,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"reflect"
@ -35,7 +36,10 @@ import (
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
)
var (
@ -570,3 +574,92 @@ func Test_register(t *testing.T) {
}
})
}
func TestRegisterUserWithDisplayName(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
base.Cfg.Global.ServerName = "server"
rsAPI := roomserver.NewInternalAPI(base)
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, nil, rsAPI)
userAPI := userapi.NewInternalAPI(base, &base.Cfg.UserAPI, nil, keyAPI, rsAPI, nil)
keyAPI.SetUserAPI(userAPI)
deviceName, deviceID := "deviceName", "deviceID"
expectedDisplayName := "DisplayName"
response := completeRegistration(
base.Context(),
userAPI,
"user",
"server",
expectedDisplayName,
"password",
"",
"localhost",
"user agent",
"session",
false,
&deviceName,
&deviceID,
api.AccountTypeAdmin,
)
assert.Equal(t, http.StatusOK, response.Code)
req := api.QueryProfileRequest{UserID: "@user:server"}
var res api.QueryProfileResponse
err := userAPI.QueryProfile(base.Context(), &req, &res)
assert.NoError(t, err)
assert.Equal(t, expectedDisplayName, res.DisplayName)
})
}
func TestRegisterAdminUsingSharedSecret(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
base.Cfg.Global.ServerName = "server"
sharedSecret := "dendritetest"
base.Cfg.ClientAPI.RegistrationSharedSecret = sharedSecret
rsAPI := roomserver.NewInternalAPI(base)
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, nil, rsAPI)
userAPI := userapi.NewInternalAPI(base, &base.Cfg.UserAPI, nil, keyAPI, rsAPI, nil)
keyAPI.SetUserAPI(userAPI)
expectedDisplayName := "rabbit"
jsonStr := []byte(`{"admin":true,"mac":"24dca3bba410e43fe64b9b5c28306693bf3baa9f","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`)
req, err := NewSharedSecretRegistrationRequest(io.NopCloser(bytes.NewBuffer(jsonStr)))
assert.NoError(t, err)
if err != nil {
t.Fatalf("failed to read request: %s", err)
}
r := NewSharedSecretRegistration(sharedSecret)
// force the nonce to be known
r.nonces.Set(req.Nonce, true, cache.DefaultExpiration)
_, err = r.IsValidMacLogin(req.Nonce, req.User, req.Password, req.Admin, req.MacBytes)
assert.NoError(t, err)
body := &bytes.Buffer{}
err = json.NewEncoder(body).Encode(req)
assert.NoError(t, err)
ssrr := httptest.NewRequest(http.MethodPost, "/", body)
response := handleSharedSecretRegistration(
&base.Cfg.ClientAPI,
userAPI,
r,
ssrr,
)
assert.Equal(t, http.StatusOK, response.Code)
profilReq := api.QueryProfileRequest{UserID: "@alice:server"}
var profileRes api.QueryProfileResponse
err = userAPI.QueryProfile(base.Context(), &profilReq, &profileRes)
assert.NoError(t, err)
assert.Equal(t, expectedDisplayName, profileRes.DisplayName)
})
}

View file

@ -6,6 +6,12 @@ permalink: /faq
# FAQ
## Why does Dendrite exist?
Dendrite aims to provide a matrix compatible server that has low resource usage compared to [Synapse](https://github.com/matrix-org/synapse).
It also aims to provide more flexibility when scaling either up or down.
Dendrite's code is also very easy to hack on which makes it suitable for experimenting with new matrix features such as peer-to-peer.
## Is Dendrite stable?
Mostly, although there are still bugs and missing features. If you are a confident power user and you are happy to spend some time debugging things when they go wrong, then please try out Dendrite. If you are a community, organisation or business that demands stability and uptime, then Dendrite is not for you yet - please install Synapse instead.
@ -34,6 +40,10 @@ No, Dendrite has a very different database schema to Synapse and the two are not
Monolith deployments are always preferred where possible, and at this time, are far better tested than polylith deployments are. The only reason to consider a polylith deployment is if you wish to run different Dendrite components on separate physical machines, but this is an advanced configuration which we don't
recommend.
## Can I configure which port Dendrite listens on?
Yes, use the cli flag `-http-bind-address`.
## I've installed Dendrite but federation isn't working
Check the [Federation Tester](https://federationtester.matrix.org). You need at least:
@ -42,6 +52,10 @@ Check the [Federation Tester](https://federationtester.matrix.org). You need at
* A valid TLS certificate for that DNS name
* Either DNS SRV records or well-known files
## Whenever I try to connect from Element it says unable to connect to homeserver
Check that your dendrite instance is running. Otherwise this is most likely due to a reverse proxy misconfiguration.
## Does Dendrite work with my favourite client?
It should do, although we are aware of some minor issues:
@ -49,6 +63,10 @@ It should do, although we are aware of some minor issues:
* **Element Android**: registration does not work, but logging in with an existing account does
* **Hydrogen**: occasionally sync can fail due to gaps in the `since` parameter, but clearing the cache fixes this
## Is there a public instance of Dendrite I can try out?
Use [dendrite.matrix.org](https://dendrite.matrix.org) which we officially support.
## Does Dendrite support Space Summaries?
Yes, [Space Summaries](https://github.com/matrix-org/matrix-spec-proposals/pull/2946) were merged into the Matrix Spec as of 2022-01-17 however, they are still treated as an MSC (Matrix Specification Change) in Dendrite. In order to enable Space Summaries in Dendrite, you must add the MSC to the MSC configuration section in the configuration YAML. If the MSC is not enabled, a user will typically see a perpetual loading icon on the summary page. See below for a demonstration of how to add to the Dendrite configuration:
@ -84,10 +102,42 @@ Remember to add the config file(s) to the `app_service_api` section of the confi
Yes, you can do this by disabling federation - set `disable_federation` to `true` in the `global` section of the Dendrite configuration file.
## How can I migrate a room in order to change the internal ID?
This can be done by performing a room upgrade. Use the command `/upgraderoom <version>` in Element to do this.
## How do I reset somebody's password on my server?
Use the admin endpoint [resetpassword](https://matrix-org.github.io/dendrite/administration/adminapi#post-_dendriteadminresetpassworduserid)
## Should I use PostgreSQL or SQLite for my databases?
Please use PostgreSQL wherever possible, especially if you are planning to run a homeserver that caters to more than a couple of users.
## What data needs to be kept if transferring/backing up Dendrite?
The list of files that need to be stored is:
- matrix-key.pem
- dendrite.yaml
- the postgres or sqlite DB
- the media store
- the search index (although this can be regenerated)
Note that this list may change / be out of date. We don't officially maintain instructions for migrations like this.
## How can I prepare enough storage for media caches?
This might be what you want: [matrix-media-repo](https://github.com/turt2live/matrix-media-repo)
We don't officially support this or any other dedicated media storage solutions.
## Is there an upgrade guide for Dendrite?
Run a newer docker image. We don't officially support deployments other than Docker.
Most of the time you should be able to just
- stop
- replace binary
- start
## Dendrite is using a lot of CPU
Generally speaking, you should expect to see some CPU spikes, particularly if you are joining or participating in large rooms. However, constant/sustained high CPU usage is not expected - if you are experiencing that, please join `#dendrite-dev:matrix.org` and let us know what you were doing when the
@ -102,6 +152,10 @@ not expected. Join `#dendrite-dev:matrix.org` and let us know what you were doin
ballooned, or file a GitHub issue if you can. If you can take a [memory profile](development/PROFILING.md) then that
would be a huge help too, as that will help us to understand where the memory usage is happening.
## Do I need to generate the self-signed certificate if I'm going to use a reverse proxy?
No, if you already have a proper certificate from some provider, like Let's Encrypt, and use that on your reverse proxy, and the reverse proxy does TLS termination, then youre good and can use HTTP to the dendrite process.
## Dendrite is running out of PostgreSQL database connections
You may need to revisit the connection limit of your PostgreSQL server and/or make changes to the `max_connections` lines in your Dendrite configuration. Be aware that each Dendrite component opens its own database connections and has its own connection limit, even in monolith mode!

View file

@ -195,7 +195,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rew
}
// If we added new hosts, inform them about our known presence events for this room
if len(addsJoinedHosts) > 0 && ore.Event.Type() == gomatrixserverlib.MRoomMember && ore.Event.StateKey() != nil {
if s.cfg.Matrix.Presence.EnableOutbound && len(addsJoinedHosts) > 0 && ore.Event.Type() == gomatrixserverlib.MRoomMember && ore.Event.StateKey() != nil {
membership, _ := ore.Event.Membership()
if membership == gomatrixserverlib.Join {
s.sendPresence(ore.Event.RoomID(), addsJoinedHosts)

View file

@ -35,18 +35,13 @@ func NewStatistics(db storage.Database, failuresUntilBlacklist uint32) Statistic
DB: db,
FailuresUntilBlacklist: failuresUntilBlacklist,
backoffTimers: make(map[gomatrixserverlib.ServerName]*time.Timer),
servers: make(map[gomatrixserverlib.ServerName]*ServerStatistics),
}
}
// ForServer returns server statistics for the given server name. If it
// does not exist, it will create empty statistics and return those.
func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerStatistics {
// If the map hasn't been initialised yet then do that.
if s.servers == nil {
s.mutex.Lock()
s.servers = make(map[gomatrixserverlib.ServerName]*ServerStatistics)
s.mutex.Unlock()
}
// Look up if we have statistics for this server already.
s.mutex.RLock()
server, found := s.servers[serverName]

View file

@ -1,7 +1,7 @@
apiVersion: v2
name: dendrite
version: "0.10.8"
appVersion: "0.10.8"
version: "0.10.9"
appVersion: "0.10.9"
description: Dendrite Matrix Homeserver
type: application
keywords:

View file

@ -15,9 +15,11 @@
{{- define "image.name" -}}
image: {{ .name }}
{{- with .Values.image -}}
image: {{ .repository }}:{{ .tag | default (printf "v%s" $.Chart.AppVersion) }}
imagePullPolicy: {{ .pullPolicy }}
{{- end -}}
{{- end -}}
{{/*
Expand the name of the chart.

View file

@ -45,8 +45,8 @@ spec:
persistentVolumeClaim:
claimName: {{ default (print ( include "dendrite.fullname" . ) "-search-pvc") $.Values.persistence.search.existingClaim | quote }}
containers:
- name: {{ $.Chart.Name }}
{{- include "image.name" $.Values.image | nindent 8 }}
- name: {{ .Chart.Name }}
{{- include "image.name" . | nindent 8 }}
args:
- '--config'
- '/etc/dendrite/dendrite.yaml'

View file

@ -8,6 +8,7 @@ metadata:
name: {{ $name }}
labels:
app.kubernetes.io/component: signingkey-job
{{- include "dendrite.labels" . | nindent 4 }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
@ -80,7 +81,7 @@ spec:
name: signing-key
readOnly: true
- name: generate-key
{{- include "image.name" $.Values.image | nindent 8 }}
{{- include "image.name" . | nindent 8 }}
command:
- sh
- -c

View file

@ -13,5 +13,5 @@ spec:
ports:
- name: http
protocol: TCP
port: 8008
port: {{ .Values.service.port }}
targetPort: 8008

View file

@ -1,8 +1,10 @@
image:
# -- Docker repository/image to use
name: "ghcr.io/matrix-org/dendrite-monolith:v0.10.8"
repository: "ghcr.io/matrix-org/dendrite-monolith"
# -- Kubernetes pullPolicy
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: ""
# signing key to use
@ -345,4 +347,4 @@ ingress:
service:
type: ClusterIP
port: 80
port: 8008

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 10
VersionPatch = 8
VersionPatch = 9
VersionTag = "" // example: "rc1"
)

View file

@ -85,10 +85,10 @@ func (c *ClientAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
c.RecaptchaApiJsUrl = "https://www.google.com/recaptcha/api.js"
}
if c.RecaptchaFormField == "" {
c.RecaptchaFormField = "g-recaptcha"
c.RecaptchaFormField = "g-recaptcha-response"
}
if c.RecaptchaSitekeyClass == "" {
c.RecaptchaSitekeyClass = "g-recaptcha-response"
c.RecaptchaSitekeyClass = "g-recaptcha"
}
checkNotEmpty(configErrs, "client_api.recaptcha_public_key", c.RecaptchaPublicKey)
checkNotEmpty(configErrs, "client_api.recaptcha_private_key", c.RecaptchaPrivateKey)

View file

@ -16,16 +16,16 @@ package routing
import (
"encoding/json"
"math"
"net/http"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
type getMembershipResponse struct {
@ -87,19 +87,18 @@ func GetMemberships(
if err != nil {
return jsonerror.InternalServerError()
}
defer db.Rollback() // nolint: errcheck
atToken, err := types.NewTopologyTokenFromString(at)
if err != nil {
atToken = types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64}
if queryRes.HasBeenInRoom && !queryRes.IsInRoom {
// If you have left the room then this will be the members of the room when you left.
atToken, err = db.EventPositionInTopology(req.Context(), queryRes.EventID)
} else {
// If you are joined to the room then this will be the current members of the room.
atToken, err = db.MaxTopologicalPosition(req.Context(), roomID)
}
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("unable to get 'atToken'")
return jsonerror.InternalServerError()
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("unable to get 'atToken'")
return jsonerror.InternalServerError()
}
}
}

View file

@ -17,6 +17,7 @@ package routing
import (
"context"
"fmt"
"math"
"net/http"
"sort"
"time"
@ -57,7 +58,7 @@ type messagesResp struct {
StartStream string `json:"start_stream,omitempty"` // NOTSPEC: used by Cerulean, so clients can hit /messages then immediately /sync with a latest sync token
End string `json:"end,omitempty"`
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
State []gomatrixserverlib.ClientEvent `json:"state"`
State []gomatrixserverlib.ClientEvent `json:"state,omitempty"`
}
// OnIncomingMessagesRequest implements the /messages endpoint from the
@ -177,10 +178,11 @@ func OnIncomingMessagesRequest(
// If "to" isn't provided, it defaults to either the earliest stream
// position (if we're going backward) or to the latest one (if we're
// going forward).
to, err = setToDefault(req.Context(), snapshot, backwardOrdering, roomID)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("setToDefault failed")
return jsonerror.InternalServerError()
to = types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64}
if backwardOrdering {
// go 1 earlier than the first event so we correctly fetch the earliest event
// this is because Database.GetEventsInTopologicalRange is exclusive of the lower-bound.
to = types.TopologyToken{}
}
wasToProvided = false
}
@ -577,24 +579,3 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
return events, nil
}
// setToDefault returns the default value for the "to" query parameter of a
// request to /messages if not provided. It defaults to either the earliest
// topological position (if we're going backward) or to the latest one (if we're
// going forward).
// Returns an error if there was an issue with retrieving the latest position
// from the database
func setToDefault(
ctx context.Context, snapshot storage.DatabaseTransaction, backwardOrdering bool,
roomID string,
) (to types.TopologyToken, err error) {
if backwardOrdering {
// go 1 earlier than the first event so we correctly fetch the earliest event
// this is because Database.GetEventsInTopologicalRange is exclusive of the lower-bound.
to = types.TopologyToken{}
} else {
to, err = snapshot.MaxTopologicalPosition(ctx, roomID)
}
return
}

View file

@ -45,7 +45,7 @@ type DatabaseTransaction interface {
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error)
GetRoomSummary(ctx context.Context, roomID, userID string) (summary *types.Summary, err error)
RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
GetBackwardTopologyPos(ctx context.Context, events []*gomatrixserverlib.HeaderedEvent) (types.TopologyToken, error)
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
@ -84,8 +84,6 @@ type DatabaseTransaction interface {
EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error)
// BackwardExtremitiesForRoom returns a map of backwards extremity event ID to a list of its prev_events.
BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities map[string][]string, err error)
// MaxTopologicalPosition returns the highest topological position for a given room.
MaxTopologicalPosition(ctx context.Context, roomID string) (types.TopologyToken, error)
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.

View file

@ -19,6 +19,7 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
@ -110,6 +111,15 @@ const selectSharedUsersSQL = "" +
" SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
") AND type = 'm.room.member' AND state_key = ANY($2) AND membership IN ('join', 'invite');"
const selectMembershipCount = `SELECT count(*) FROM syncapi_current_room_state WHERE type = 'm.room.member' AND room_id = $1 AND membership = $2`
const selectRoomHeroes = `
SELECT state_key FROM syncapi_current_room_state
WHERE type = 'm.room.member' AND room_id = $1 AND membership = ANY($2) AND state_key != $3
ORDER BY added_at, state_key
LIMIT 5
`
type currentRoomStateStatements struct {
upsertRoomStateStmt *sql.Stmt
deleteRoomStateByEventIDStmt *sql.Stmt
@ -122,6 +132,8 @@ type currentRoomStateStatements struct {
selectEventsWithEventIDsStmt *sql.Stmt
selectStateEventStmt *sql.Stmt
selectSharedUsersStmt *sql.Stmt
selectMembershipCountStmt *sql.Stmt
selectRoomHeroesStmt *sql.Stmt
}
func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
@ -141,40 +153,21 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro
return nil, err
}
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
return nil, err
}
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
return nil, err
}
if s.deleteRoomStateForRoomStmt, err = db.Prepare(deleteRoomStateForRoomSQL); err != nil {
return nil, err
}
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
return nil, err
}
if s.selectRoomIDsWithAnyMembershipStmt, err = db.Prepare(selectRoomIDsWithAnyMembershipSQL); err != nil {
return nil, err
}
if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
return nil, err
}
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
return nil, err
}
if s.selectJoinedUsersInRoomStmt, err = db.Prepare(selectJoinedUsersInRoomSQL); err != nil {
return nil, err
}
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
return nil, err
}
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
return nil, err
}
if s.selectSharedUsersStmt, err = db.Prepare(selectSharedUsersSQL); err != nil {
return nil, err
}
return s, nil
return s, sqlutil.StatementList{
{&s.upsertRoomStateStmt, upsertRoomStateSQL},
{&s.deleteRoomStateByEventIDStmt, deleteRoomStateByEventIDSQL},
{&s.deleteRoomStateForRoomStmt, deleteRoomStateForRoomSQL},
{&s.selectRoomIDsWithMembershipStmt, selectRoomIDsWithMembershipSQL},
{&s.selectRoomIDsWithAnyMembershipStmt, selectRoomIDsWithAnyMembershipSQL},
{&s.selectCurrentStateStmt, selectCurrentStateSQL},
{&s.selectJoinedUsersStmt, selectJoinedUsersSQL},
{&s.selectJoinedUsersInRoomStmt, selectJoinedUsersInRoomSQL},
{&s.selectEventsWithEventIDsStmt, selectEventsWithEventIDsSQL},
{&s.selectStateEventStmt, selectStateEventSQL},
{&s.selectSharedUsersStmt, selectSharedUsersSQL},
{&s.selectMembershipCountStmt, selectMembershipCount},
{&s.selectRoomHeroesStmt, selectRoomHeroes},
}.Prepare(db)
}
// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
@ -447,3 +440,34 @@ func (s *currentRoomStateStatements) SelectSharedUsers(
}
return result, rows.Err()
}
func (s *currentRoomStateStatements) SelectRoomHeroes(ctx context.Context, txn *sql.Tx, roomID, excludeUserID string, memberships []string) ([]string, error) {
stmt := sqlutil.TxStmt(txn, s.selectRoomHeroesStmt)
rows, err := stmt.QueryContext(ctx, roomID, pq.StringArray(memberships), excludeUserID)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomHeroesStmt: rows.close() failed")
var stateKey string
result := make([]string, 0, 5)
for rows.Next() {
if err = rows.Scan(&stateKey); err != nil {
return nil, err
}
result = append(result, stateKey)
}
return result, rows.Err()
}
func (s *currentRoomStateStatements) SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string) (count int, err error) {
stmt := sqlutil.TxStmt(txn, s.selectMembershipCountStmt)
err = stmt.QueryRowContext(ctx, roomID, membership).Scan(&count)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, err
}
return count, nil
}

View file

@ -19,10 +19,8 @@ import (
"database/sql"
"fmt"
"github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
@ -64,9 +62,6 @@ const selectMembershipCountSQL = "" +
" SELECT DISTINCT ON (room_id, user_id) room_id, user_id, membership FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 ORDER BY room_id, user_id, stream_pos DESC" +
") t WHERE t.membership = $3"
const selectHeroesSQL = "" +
"SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership = ANY($3) LIMIT 5"
const selectMembershipBeforeSQL = "" +
"SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1"
@ -81,7 +76,6 @@ WHERE ($3::text IS NULL OR t.membership = $3)
type membershipsStatements struct {
upsertMembershipStmt *sql.Stmt
selectMembershipCountStmt *sql.Stmt
selectHeroesStmt *sql.Stmt
selectMembershipForUserStmt *sql.Stmt
selectMembersStmt *sql.Stmt
}
@ -95,7 +89,6 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
return s, sqlutil.StatementList{
{&s.upsertMembershipStmt, upsertMembershipSQL},
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
{&s.selectHeroesStmt, selectHeroesSQL},
{&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
{&s.selectMembersStmt, selectMembersSQL},
}.Prepare(db)
@ -129,26 +122,6 @@ func (s *membershipsStatements) SelectMembershipCount(
return
}
func (s *membershipsStatements) SelectHeroes(
ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string,
) (heroes []string, err error) {
stmt := sqlutil.TxStmt(txn, s.selectHeroesStmt)
var rows *sql.Rows
rows, err = stmt.QueryContext(ctx, roomID, userID, pq.StringArray(memberships))
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectHeroes: rows.close() failed")
var hero string
for rows.Next() {
if err = rows.Scan(&hero); err != nil {
return
}
heroes = append(heroes, hero)
}
return heroes, rows.Err()
}
// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty
// string as the membership.

View file

@ -65,14 +65,6 @@ const selectPositionInTopologySQL = "" +
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
" WHERE event_id = $1"
// Select the max topological position for the room, then sort by stream position and take the highest,
// returning both topological and stream positions.
const selectMaxPositionInTopologySQL = "" +
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
" WHERE topological_position=(" +
"SELECT MAX(topological_position) FROM syncapi_output_room_events_topology WHERE room_id=$1" +
") ORDER BY stream_position DESC LIMIT 1"
const selectStreamToTopologicalPositionAscSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;"
@ -84,7 +76,6 @@ type outputRoomEventsTopologyStatements struct {
selectEventIDsInRangeASCStmt *sql.Stmt
selectEventIDsInRangeDESCStmt *sql.Stmt
selectPositionInTopologyStmt *sql.Stmt
selectMaxPositionInTopologyStmt *sql.Stmt
selectStreamToTopologicalPositionAscStmt *sql.Stmt
selectStreamToTopologicalPositionDescStmt *sql.Stmt
}
@ -107,9 +98,6 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
return nil, err
}
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
return nil, err
}
@ -189,10 +177,3 @@ func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition(
}
return
}
func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
ctx context.Context, txn *sql.Tx, roomID string,
) (pos types.StreamPosition, spos types.StreamPosition, err error) {
err = sqlutil.TxStmt(txn, s.selectMaxPositionInTopologyStmt).QueryRowContext(ctx, roomID).Scan(&pos, &spos)
return
}

View file

@ -4,8 +4,10 @@ import (
"context"
"database/sql"
"fmt"
"math"
"github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/syncapi/types"
@ -92,8 +94,61 @@ func (d *DatabaseTransaction) MembershipCount(ctx context.Context, roomID, membe
return d.Memberships.SelectMembershipCount(ctx, d.txn, roomID, membership, pos)
}
func (d *DatabaseTransaction) GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error) {
return d.Memberships.SelectHeroes(ctx, d.txn, roomID, userID, memberships)
func (d *DatabaseTransaction) GetRoomSummary(ctx context.Context, roomID, userID string) (*types.Summary, error) {
summary := &types.Summary{Heroes: []string{}}
joinCount, err := d.CurrentRoomState.SelectMembershipCount(ctx, d.txn, roomID, gomatrixserverlib.Join)
if err != nil {
return summary, err
}
inviteCount, err := d.CurrentRoomState.SelectMembershipCount(ctx, d.txn, roomID, gomatrixserverlib.Invite)
if err != nil {
return summary, err
}
summary.InvitedMemberCount = &inviteCount
summary.JoinedMemberCount = &joinCount
// Get the room name and canonical alias, if any
filter := gomatrixserverlib.DefaultStateFilter()
filterTypes := []string{gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias}
filterRooms := []string{roomID}
filter.Types = &filterTypes
filter.Rooms = &filterRooms
evs, err := d.CurrentRoomState.SelectCurrentState(ctx, d.txn, roomID, &filter, nil)
if err != nil {
return summary, err
}
for _, ev := range evs {
switch ev.Type() {
case gomatrixserverlib.MRoomName:
if gjson.GetBytes(ev.Content(), "name").Str != "" {
return summary, nil
}
case gomatrixserverlib.MRoomCanonicalAlias:
if gjson.GetBytes(ev.Content(), "alias").Str != "" {
return summary, nil
}
}
}
// If there's no room name or canonical alias, get the room heroes, excluding the user
heroes, err := d.CurrentRoomState.SelectRoomHeroes(ctx, d.txn, roomID, userID, []string{gomatrixserverlib.Join, gomatrixserverlib.Invite})
if err != nil {
return summary, err
}
// "When no joined or invited members are available, this should consist of the banned and left users"
if len(heroes) == 0 {
heroes, err = d.CurrentRoomState.SelectRoomHeroes(ctx, d.txn, roomID, userID, []string{gomatrixserverlib.Leave, gomatrixserverlib.Ban})
if err != nil {
return summary, err
}
}
summary.Heroes = heroes
return summary, nil
}
func (d *DatabaseTransaction) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
@ -215,16 +270,6 @@ func (d *DatabaseTransaction) BackwardExtremitiesForRoom(
return d.BackwardExtremities.SelectBackwardExtremitiesForRoom(ctx, d.txn, roomID)
}
func (d *DatabaseTransaction) MaxTopologicalPosition(
ctx context.Context, roomID string,
) (types.TopologyToken, error) {
depth, streamPos, err := d.Topology.SelectMaxPositionInTopology(ctx, d.txn, roomID)
if err != nil {
return types.TopologyToken{}, err
}
return types.TopologyToken{Depth: depth, PDUPosition: streamPos}, nil
}
func (d *DatabaseTransaction) EventPositionInTopology(
ctx context.Context, eventID string,
) (types.TopologyToken, error) {
@ -243,11 +288,7 @@ func (d *DatabaseTransaction) StreamToTopologicalPosition(
case err == sql.ErrNoRows && backwardOrdering: // no events in range, going backward
return types.TopologyToken{PDUPosition: streamPos}, nil
case err == sql.ErrNoRows && !backwardOrdering: // no events in range, going forward
topoPos, streamPos, err = d.Topology.SelectMaxPositionInTopology(ctx, d.txn, roomID)
if err != nil {
return types.TopologyToken{}, fmt.Errorf("d.Topology.SelectMaxPositionInTopology: %w", err)
}
return types.TopologyToken{Depth: topoPos, PDUPosition: streamPos}, nil
return types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64}, nil
case err != nil: // some other error happened
return types.TopologyToken{}, fmt.Errorf("d.Topology.SelectStreamToTopologicalPosition: %w", err)
default:

View file

@ -19,6 +19,7 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
@ -95,6 +96,15 @@ const selectSharedUsersSQL = "" +
" SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
") AND type = 'm.room.member' AND state_key IN ($2) AND membership IN ('join', 'invite');"
const selectMembershipCount = `SELECT count(*) FROM syncapi_current_room_state WHERE type = 'm.room.member' AND room_id = $1 AND membership = $2`
const selectRoomHeroes = `
SELECT state_key FROM syncapi_current_room_state
WHERE type = 'm.room.member' AND room_id = $1 AND state_key != $2 AND membership IN ($3)
ORDER BY added_at, state_key
LIMIT 5
`
type currentRoomStateStatements struct {
db *sql.DB
streamIDStatements *StreamIDStatements
@ -107,6 +117,8 @@ type currentRoomStateStatements struct {
//selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic
selectStateEventStmt *sql.Stmt
//selectSharedUsersSQL *sql.Stmt - prepared at runtime due to variadic
selectMembershipCountStmt *sql.Stmt
//selectRoomHeroes *sql.Stmt - prepared at runtime due to variadic
}
func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (tables.CurrentRoomState, error) {
@ -129,31 +141,16 @@ func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (t
return nil, err
}
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
return nil, err
}
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
return nil, err
}
if s.deleteRoomStateForRoomStmt, err = db.Prepare(deleteRoomStateForRoomSQL); err != nil {
return nil, err
}
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
return nil, err
}
if s.selectRoomIDsWithAnyMembershipStmt, err = db.Prepare(selectRoomIDsWithAnyMembershipSQL); err != nil {
return nil, err
}
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
return nil, err
}
//if s.selectJoinedUsersInRoomStmt, err = db.Prepare(selectJoinedUsersInRoomSQL); err != nil {
// return nil, err
//}
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
return nil, err
}
return s, nil
return s, sqlutil.StatementList{
{&s.upsertRoomStateStmt, upsertRoomStateSQL},
{&s.deleteRoomStateByEventIDStmt, deleteRoomStateByEventIDSQL},
{&s.deleteRoomStateForRoomStmt, deleteRoomStateForRoomSQL},
{&s.selectRoomIDsWithMembershipStmt, selectRoomIDsWithMembershipSQL},
{&s.selectRoomIDsWithAnyMembershipStmt, selectRoomIDsWithAnyMembershipSQL},
{&s.selectJoinedUsersStmt, selectJoinedUsersSQL},
{&s.selectStateEventStmt, selectStateEventSQL},
{&s.selectMembershipCountStmt, selectMembershipCount},
}.Prepare(db)
}
// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
@ -485,3 +482,53 @@ func (s *currentRoomStateStatements) SelectSharedUsers(
return result, err
}
func (s *currentRoomStateStatements) SelectRoomHeroes(ctx context.Context, txn *sql.Tx, roomID, excludeUserID string, memberships []string) ([]string, error) {
params := make([]interface{}, len(memberships)+2)
params[0] = roomID
params[1] = excludeUserID
for k, v := range memberships {
params[k+2] = v
}
query := strings.Replace(selectRoomHeroes, "($3)", sqlutil.QueryVariadicOffset(len(memberships), 2), 1)
var stmt *sql.Stmt
var err error
if txn != nil {
stmt, err = txn.Prepare(query)
} else {
stmt, err = s.db.Prepare(query)
}
if err != nil {
return []string{}, err
}
defer internal.CloseAndLogIfError(ctx, stmt, "selectRoomHeroes: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomHeroes: rows.close() failed")
var stateKey string
result := make([]string, 0, 5)
for rows.Next() {
if err = rows.Scan(&stateKey); err != nil {
return nil, err
}
result = append(result, stateKey)
}
return result, rows.Err()
}
func (s *currentRoomStateStatements) SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string) (count int, err error) {
stmt := sqlutil.TxStmt(txn, s.selectMembershipCountStmt)
err = stmt.QueryRowContext(ctx, roomID, membership).Scan(&count)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, err
}
return count, nil
}

View file

@ -18,11 +18,9 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
@ -64,9 +62,6 @@ const selectMembershipCountSQL = "" +
" SELECT * FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 GROUP BY user_id HAVING(max(stream_pos))" +
") t WHERE t.membership = $3"
const selectHeroesSQL = "" +
"SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership IN ($3) LIMIT 5"
const selectMembershipBeforeSQL = "" +
"SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1"
@ -99,7 +94,6 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
{&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
{&s.selectMembersStmt, selectMembersSQL},
// {&s.selectHeroesStmt, selectHeroesSQL}, - prepared at runtime due to variadic
}.Prepare(db)
}
@ -131,39 +125,6 @@ func (s *membershipsStatements) SelectMembershipCount(
return
}
func (s *membershipsStatements) SelectHeroes(
ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string,
) (heroes []string, err error) {
stmtSQL := strings.Replace(selectHeroesSQL, "($3)", sqlutil.QueryVariadicOffset(len(memberships), 2), 1)
stmt, err := s.db.PrepareContext(ctx, stmtSQL)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, stmt, "SelectHeroes: stmt.close() failed")
params := []interface{}{
roomID, userID,
}
for _, membership := range memberships {
params = append(params, membership)
}
stmt = sqlutil.TxStmt(txn, stmt)
var rows *sql.Rows
rows, err = stmt.QueryContext(ctx, params...)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectHeroes: rows.close() failed")
var hero string
for rows.Next() {
if err = rows.Scan(&hero); err != nil {
return
}
heroes = append(heroes, hero)
}
return heroes, rows.Err()
}
// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty
// string as the membership.

View file

@ -61,10 +61,6 @@ const selectPositionInTopologySQL = "" +
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
" WHERE event_id = $1"
const selectMaxPositionInTopologySQL = "" +
"SELECT MAX(topological_position), stream_position FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1 ORDER BY stream_position DESC"
const selectStreamToTopologicalPositionAscSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position >= $2 ORDER BY topological_position ASC LIMIT 1;"
@ -77,7 +73,6 @@ type outputRoomEventsTopologyStatements struct {
selectEventIDsInRangeASCStmt *sql.Stmt
selectEventIDsInRangeDESCStmt *sql.Stmt
selectPositionInTopologyStmt *sql.Stmt
selectMaxPositionInTopologyStmt *sql.Stmt
selectStreamToTopologicalPositionAscStmt *sql.Stmt
selectStreamToTopologicalPositionDescStmt *sql.Stmt
}
@ -102,9 +97,6 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
return nil, err
}
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
return nil, err
}
@ -182,11 +174,3 @@ func (s *outputRoomEventsTopologyStatements) SelectStreamToTopologicalPosition(
}
return
}
func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
ctx context.Context, txn *sql.Tx, roomID string,
) (pos types.StreamPosition, spos types.StreamPosition, err error) {
stmt := sqlutil.TxStmt(txn, s.selectMaxPositionInTopologyStmt)
err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
return
}

View file

@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"reflect"
"testing"
@ -14,6 +15,7 @@ import (
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrixserverlib"
"github.com/stretchr/testify/assert"
)
var ctx = context.Background()
@ -198,10 +200,7 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
_ = MustWriteEvents(t, db, events)
WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) {
from, err := snapshot.MaxTopologicalPosition(ctx, r.ID)
if err != nil {
t.Fatalf("failed to get MaxTopologicalPosition: %s", err)
}
from := types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64}
t.Logf("max topo pos = %+v", from)
// head towards the beginning of time
to := types.TopologyToken{}
@ -218,6 +217,88 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
})
}
func TestStreamToTopologicalPosition(t *testing.T) {
alice := test.NewUser(t)
r := test.NewRoom(t, alice)
testCases := []struct {
name string
roomID string
streamPos types.StreamPosition
backwardOrdering bool
wantToken types.TopologyToken
}{
{
name: "forward ordering found streamPos returns found position",
roomID: r.ID,
streamPos: 1,
backwardOrdering: false,
wantToken: types.TopologyToken{Depth: 1, PDUPosition: 1},
},
{
name: "forward ordering not found streamPos returns max position",
roomID: r.ID,
streamPos: 100,
backwardOrdering: false,
wantToken: types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64},
},
{
name: "backward ordering found streamPos returns found position",
roomID: r.ID,
streamPos: 1,
backwardOrdering: true,
wantToken: types.TopologyToken{Depth: 1, PDUPosition: 1},
},
{
name: "backward ordering not found streamPos returns maxDepth with param pduPosition",
roomID: r.ID,
streamPos: 100,
backwardOrdering: true,
wantToken: types.TopologyToken{Depth: 5, PDUPosition: 100},
},
{
name: "backward non-existent room returns zero token",
roomID: "!doesnotexist:localhost",
streamPos: 1,
backwardOrdering: true,
wantToken: types.TopologyToken{Depth: 0, PDUPosition: 1},
},
{
name: "forward non-existent room returns max token",
roomID: "!doesnotexist:localhost",
streamPos: 1,
backwardOrdering: false,
wantToken: types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64},
},
}
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, close, closeBase := MustCreateDatabase(t, dbType)
defer close()
defer closeBase()
txn, err := db.NewDatabaseTransaction(ctx)
if err != nil {
t.Fatal(err)
}
defer txn.Rollback()
MustWriteEvents(t, db, r.Events())
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
token, err := txn.StreamToTopologicalPosition(ctx, tc.roomID, tc.streamPos, tc.backwardOrdering)
if err != nil {
t.Fatal(err)
}
if tc.wantToken != token {
t.Fatalf("expected token %q, got %q", tc.wantToken, token)
}
})
}
})
}
/*
// The purpose of this test is to make sure that backpagination returns all events, even if some events have the same depth.
// For cases where events have the same depth, the streaming token should be used to tie break so events written via WriteEvent
@ -664,3 +745,181 @@ func topologyTokenBefore(t *testing.T, db storage.Database, eventID string) *typ
return &tok
}
*/
func pointer[t any](s t) *t {
return &s
}
func TestRoomSummary(t *testing.T) {
alice := test.NewUser(t)
bob := test.NewUser(t)
charlie := test.NewUser(t)
// Create some dummy users
moreUsers := []*test.User{}
moreUserIDs := []string{}
for i := 0; i < 10; i++ {
u := test.NewUser(t)
moreUsers = append(moreUsers, u)
moreUserIDs = append(moreUserIDs, u.ID)
}
testCases := []struct {
name string
wantSummary *types.Summary
additionalEvents func(t *testing.T, room *test.Room)
}{
{
name: "after initial creation",
wantSummary: &types.Summary{JoinedMemberCount: pointer(1), InvitedMemberCount: pointer(0), Heroes: []string{}},
},
{
name: "invited user",
wantSummary: &types.Summary{JoinedMemberCount: pointer(1), InvitedMemberCount: pointer(1), Heroes: []string{bob.ID}},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "invite",
}, test.WithStateKey(bob.ID))
},
},
{
name: "invited user, but declined",
wantSummary: &types.Summary{JoinedMemberCount: pointer(1), InvitedMemberCount: pointer(0), Heroes: []string{bob.ID}},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "invite",
}, test.WithStateKey(bob.ID))
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "leave",
}, test.WithStateKey(bob.ID))
},
},
{
name: "joined user after invitation",
wantSummary: &types.Summary{JoinedMemberCount: pointer(2), InvitedMemberCount: pointer(0), Heroes: []string{bob.ID}},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "invite",
}, test.WithStateKey(bob.ID))
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))
},
},
{
name: "multiple joined user",
wantSummary: &types.Summary{JoinedMemberCount: pointer(3), InvitedMemberCount: pointer(0), Heroes: []string{charlie.ID, bob.ID}},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, charlie, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(charlie.ID))
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))
},
},
{
name: "multiple joined/invited user",
wantSummary: &types.Summary{JoinedMemberCount: pointer(2), InvitedMemberCount: pointer(1), Heroes: []string{charlie.ID, bob.ID}},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "invite",
}, test.WithStateKey(charlie.ID))
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))
},
},
{
name: "multiple joined/invited/left user",
wantSummary: &types.Summary{JoinedMemberCount: pointer(1), InvitedMemberCount: pointer(1), Heroes: []string{charlie.ID}},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "invite",
}, test.WithStateKey(charlie.ID))
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "leave",
}, test.WithStateKey(bob.ID))
},
},
{
name: "leaving user after joining",
wantSummary: &types.Summary{JoinedMemberCount: pointer(1), InvitedMemberCount: pointer(0), Heroes: []string{bob.ID}},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "leave",
}, test.WithStateKey(bob.ID))
},
},
{
name: "many users", // heroes ordered by stream id
wantSummary: &types.Summary{JoinedMemberCount: pointer(len(moreUserIDs) + 1), InvitedMemberCount: pointer(0), Heroes: moreUserIDs[:5]},
additionalEvents: func(t *testing.T, room *test.Room) {
for _, x := range moreUsers {
room.CreateAndInsert(t, x, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(x.ID))
}
},
},
{
name: "canonical alias set",
wantSummary: &types.Summary{JoinedMemberCount: pointer(2), InvitedMemberCount: pointer(0), Heroes: []string{}},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomCanonicalAlias, map[string]interface{}{
"alias": "myalias",
}, test.WithStateKey(""))
},
},
{
name: "room name set",
wantSummary: &types.Summary{JoinedMemberCount: pointer(2), InvitedMemberCount: pointer(0), Heroes: []string{}},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomName, map[string]interface{}{
"name": "my room name",
}, test.WithStateKey(""))
},
},
}
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, close, closeBase := MustCreateDatabase(t, dbType)
defer close()
defer closeBase()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
r := test.NewRoom(t, alice)
if tc.additionalEvents != nil {
tc.additionalEvents(t, r)
}
// write the room before creating a transaction
MustWriteEvents(t, db, r.Events())
transaction, err := db.NewDatabaseTransaction(ctx)
assert.NoError(t, err)
defer transaction.Rollback()
summary, err := transaction.GetRoomSummary(ctx, r.ID, alice.ID)
assert.NoError(t, err)
assert.Equal(t, tc.wantSummary, summary)
})
}
})
}

View file

@ -91,8 +91,6 @@ type Topology interface {
SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error)
// SelectPositionInTopology returns the depth and stream position of a given event in the topology of the room it belongs to.
SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (depth, spos types.StreamPosition, err error)
// SelectMaxPositionInTopology returns the event which has the highest depth, and if there are multiple, the event with the highest stream position.
SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error)
// SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room.
SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, forward bool) (topoPos types.StreamPosition, err error)
}
@ -115,6 +113,9 @@ type CurrentRoomState interface {
SelectJoinedUsersInRoom(ctx context.Context, txn *sql.Tx, roomIDs []string) (map[string][]string, error)
// SelectSharedUsers returns a subset of otherUserIDs that share a room with userID.
SelectSharedUsers(ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string) ([]string, error)
SelectRoomHeroes(ctx context.Context, txn *sql.Tx, roomID, excludeUserID string, memberships []string) ([]string, error)
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string) (int, error)
}
// BackwardsExtremities keeps track of backwards extremities for a room.
@ -185,7 +186,6 @@ type Receipts interface {
type Memberships interface {
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error)
SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
SelectMemberships(
ctx context.Context, txn *sql.Tx,

View file

@ -3,8 +3,6 @@ package tables_test
import (
"context"
"database/sql"
"reflect"
"sort"
"testing"
"time"
@ -88,43 +86,9 @@ func TestMembershipsTable(t *testing.T) {
testUpsert(t, ctx, table, userEvents[0], alice, room)
testMembershipCount(t, ctx, table, room)
testHeroes(t, ctx, table, alice, room, users)
})
}
func testHeroes(t *testing.T, ctx context.Context, table tables.Memberships, user *test.User, room *test.Room, users []string) {
// Re-slice and sort the expected users
users = users[1:]
sort.Strings(users)
type testCase struct {
name string
memberships []string
wantHeroes []string
}
testCases := []testCase{
{name: "no memberships queried", memberships: []string{}},
{name: "joined memberships queried should be limited", memberships: []string{gomatrixserverlib.Join}, wantHeroes: users[:5]},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got, err := table.SelectHeroes(ctx, nil, room.ID, user.ID, tc.memberships)
if err != nil {
t.Fatalf("unable to select heroes: %s", err)
}
if gotLen := len(got); gotLen != len(tc.wantHeroes) {
t.Fatalf("expected %d heroes, got %d", len(tc.wantHeroes), gotLen)
}
if !reflect.DeepEqual(got, tc.wantHeroes) {
t.Fatalf("expected heroes to be %+v, got %+v", tc.wantHeroes, got)
}
})
}
}
func testMembershipCount(t *testing.T, ctx context.Context, table tables.Memberships, room *test.Room) {
t.Run("membership counts are correct", func(t *testing.T) {
// After 10 events, we should have 6 users (5 create related [incl. one member event], 5 member events = 6 users)

View file

@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"sort"
"time"
"github.com/matrix-org/dendrite/internal/caching"
@ -14,11 +13,9 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/syncapi/notifier"
)
// The max number of per-room goroutines to have running.
@ -339,7 +336,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
if hasMembershipChange {
p.addRoomSummary(ctx, snapshot, jr, delta.RoomID, device.UserID, latestPosition)
jr.Summary, err = snapshot.GetRoomSummary(ctx, delta.RoomID, device.UserID)
if err != nil {
logrus.WithError(err).Warn("failed to get room summary")
}
}
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
@ -384,19 +384,32 @@ func applyHistoryVisibilityFilter(
roomID, userID string,
recentEvents []*gomatrixserverlib.HeaderedEvent,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
// We need to make sure we always include the latest states events, if they are in the timeline.
// We grep at least limit * 2 events, to ensure we really get the needed events.
filter := gomatrixserverlib.DefaultStateFilter()
stateEvents, err := snapshot.CurrentState(ctx, roomID, &filter, nil)
if err != nil {
// Not a fatal error, we can continue without the stateEvents,
// they are only needed if there are state events in the timeline.
logrus.WithError(err).Warnf("Failed to get current room state for history visibility")
// We need to make sure we always include the latest state events, if they are in the timeline.
alwaysIncludeIDs := make(map[string]struct{})
var stateTypes []string
var senders []string
for _, ev := range recentEvents {
if ev.StateKey() != nil {
stateTypes = append(stateTypes, ev.Type())
senders = append(senders, ev.Sender())
}
}
alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents))
for _, ev := range stateEvents {
alwaysIncludeIDs[ev.EventID()] = struct{}{}
// Only get the state again if there are state events in the timeline
if len(stateTypes) > 0 {
filter := gomatrixserverlib.DefaultStateFilter()
filter.Types = &stateTypes
filter.Senders = &senders
stateEvents, err := snapshot.CurrentState(ctx, roomID, &filter, nil)
if err != nil {
return nil, fmt.Errorf("failed to get current room state for history visibility calculation: %w", err)
}
for _, ev := range stateEvents {
alwaysIncludeIDs[ev.EventID()] = struct{}{}
}
}
startTime := time.Now()
events, err := internal.ApplyHistoryVisibilityFilter(ctx, snapshot, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
if err != nil {
@ -411,45 +424,6 @@ func applyHistoryVisibilityFilter(
return events, nil
}
func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, snapshot storage.DatabaseTransaction, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) {
// Work out how many members are in the room.
joinedCount, _ := snapshot.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition)
invitedCount, _ := snapshot.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition)
jr.Summary.JoinedMemberCount = &joinedCount
jr.Summary.InvitedMemberCount = &invitedCount
fetchStates := []gomatrixserverlib.StateKeyTuple{
{EventType: gomatrixserverlib.MRoomName},
{EventType: gomatrixserverlib.MRoomCanonicalAlias},
}
// Check if the room has a name or a canonical alias
latestState := &roomserverAPI.QueryLatestEventsAndStateResponse{}
err := p.rsAPI.QueryLatestEventsAndState(ctx, &roomserverAPI.QueryLatestEventsAndStateRequest{StateToFetch: fetchStates, RoomID: roomID}, latestState)
if err != nil {
return
}
// Check if the room has a name or canonical alias, if so, return.
for _, ev := range latestState.StateEvents {
switch ev.Type() {
case gomatrixserverlib.MRoomName:
if gjson.GetBytes(ev.Content(), "name").Str != "" {
return
}
case gomatrixserverlib.MRoomCanonicalAlias:
if gjson.GetBytes(ev.Content(), "alias").Str != "" {
return
}
}
}
heroes, err := snapshot.GetRoomHeroes(ctx, roomID, userID, []string{"join", "invite"})
if err != nil {
return
}
sort.Strings(heroes)
jr.Summary.Heroes = heroes
}
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context,
snapshot storage.DatabaseTransaction,
@ -493,7 +467,10 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
return
}
p.addRoomSummary(ctx, snapshot, jr, roomID, device.UserID, r.From)
jr.Summary, err = snapshot.GetRoomSummary(ctx, roomID, device.UserID)
if err != nil {
logrus.WithError(err).Warn("failed to get room summary")
}
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:

View file

@ -521,6 +521,252 @@ func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *gomatr
}
}
func TestGetMembership(t *testing.T) {
alice := test.NewUser(t)
aliceDev := userapi.Device{
ID: "ALICEID",
UserID: alice.ID,
AccessToken: "ALICE_BEARER_TOKEN",
DisplayName: "Alice",
AccountType: userapi.AccountTypeUser,
}
bob := test.NewUser(t)
bobDev := userapi.Device{
ID: "BOBID",
UserID: bob.ID,
AccessToken: "notjoinedtoanyrooms",
}
testCases := []struct {
name string
roomID string
additionalEvents func(t *testing.T, room *test.Room)
request func(t *testing.T, room *test.Room) *http.Request
wantOK bool
wantMemberCount int
useSleep bool // :/
}{
{
name: "/members - Alice joined",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
"access_token": aliceDev.AccessToken,
}))
},
wantOK: true,
wantMemberCount: 1,
},
{
name: "/members - Bob never joined",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
}))
},
wantOK: false,
},
{
name: "/joined_members - Bob never joined",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/joined_members", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
}))
},
wantOK: false,
},
{
name: "/joined_members - Alice joined",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/joined_members", room.ID), test.WithQueryParams(map[string]string{
"access_token": aliceDev.AccessToken,
}))
},
wantOK: true,
},
{
name: "Alice leaves before Bob joins, should not be able to see Bob",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
"access_token": aliceDev.AccessToken,
}))
},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "leave",
}, test.WithStateKey(alice.ID))
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))
},
useSleep: true,
wantOK: true,
wantMemberCount: 1,
},
{
name: "Alice leaves after Bob joins, should be able to see Bob",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
"access_token": aliceDev.AccessToken,
}))
},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "leave",
}, test.WithStateKey(alice.ID))
},
useSleep: true,
wantOK: true,
wantMemberCount: 2,
},
{
name: "/joined_members - Alice leaves, shouldn't be able to see members ",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/joined_members", room.ID), test.WithQueryParams(map[string]string{
"access_token": aliceDev.AccessToken,
}))
},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "leave",
}, test.WithStateKey(alice.ID))
},
useSleep: true,
wantOK: false,
},
{
name: "'at' specified, returns memberships before Bob joins",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
"access_token": aliceDev.AccessToken,
"at": "t2_5",
}))
},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))
},
useSleep: true,
wantOK: true,
wantMemberCount: 1,
},
{
name: "'membership=leave' specified, returns no memberships",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
"access_token": aliceDev.AccessToken,
"membership": "leave",
}))
},
wantOK: true,
wantMemberCount: 0,
},
{
name: "'not_membership=join' specified, returns no memberships",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
"access_token": aliceDev.AccessToken,
"not_membership": "join",
}))
},
wantOK: true,
wantMemberCount: 0,
},
{
name: "'not_membership=leave' & 'membership=join' specified, returns correct memberships",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
"access_token": aliceDev.AccessToken,
"not_membership": "leave",
"membership": "join",
}))
},
additionalEvents: func(t *testing.T, room *test.Room) {
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
"membership": "leave",
}, test.WithStateKey(bob.ID))
},
wantOK: true,
wantMemberCount: 1,
},
{
name: "non-existent room ID",
request: func(t *testing.T, room *test.Room) *http.Request {
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", "!notavalidroom:test"), test.WithQueryParams(map[string]string{
"access_token": aliceDev.AccessToken,
}))
},
wantOK: false,
},
}
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// Use an actual roomserver for this
rsAPI := roomserver.NewInternalAPI(base)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, &syncKeyAPI{})
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
room := test.NewRoom(t, alice)
t.Cleanup(func() {
t.Logf("running cleanup for %s", tc.name)
})
// inject additional events
if tc.additionalEvents != nil {
tc.additionalEvents(t, room)
}
if err := api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
// wait for the events to come down sync
if tc.useSleep {
time.Sleep(time.Millisecond * 100)
} else {
syncUntil(t, base, aliceDev.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists()
})
}
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, tc.request(t, room))
if w.Code != 200 && tc.wantOK {
t.Logf("%s", w.Body.String())
t.Fatalf("got HTTP %d want %d", w.Code, 200)
}
t.Logf("[%s] Resp: %s", tc.name, w.Body.String())
// check we got the expected events
if tc.wantOK {
memberCount := len(gjson.GetBytes(w.Body.Bytes(), "chunk").Array())
if memberCount != tc.wantMemberCount {
t.Fatalf("expected %d members, got %d", tc.wantMemberCount, memberCount)
}
}
})
}
})
}
func TestSendToDevice(t *testing.T) {
test.WithAllDatabases(t, testSendToDevice)
}