Merge branch 'main' into jb-add-client-well-known

This commit is contained in:
Neil Alexander 2022-07-15 16:36:11 +01:00 committed by GitHub
commit 5204bb79f0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 357 additions and 132 deletions

View file

@ -32,6 +32,15 @@ To create a new **admin account**, add the `-admin` flag:
./bin/create-account -config /path/to/dendrite.yaml -username USERNAME -admin ./bin/create-account -config /path/to/dendrite.yaml -username USERNAME -admin
``` ```
An example of using `create-account` when running in **Docker**, having found the `CONTAINERNAME` from `docker ps`:
```bash
docker exec -it CONTAINERNAME /usr/bin/create-account -config /path/to/dendrite.yaml -username USERNAME
```
```bash
docker exec -it CONTAINERNAME /usr/bin/create-account -config /path/to/dendrite.yaml -username USERNAME -admin
```
## Using shared secret registration ## Using shared secret registration
Dendrite supports the Synapse-compatible shared secret registration endpoint. Dendrite supports the Synapse-compatible shared secret registration endpoint.

View file

@ -0,0 +1,81 @@
---
title: Troubleshooting
parent: Administration
permalink: /administration/troubleshooting
---
# Troubleshooting
If your Dendrite installation is acting strangely, there are a few things you should
check before seeking help.
## 1. Logs
Dendrite, by default, will log all warnings and errors to stdout, in addition to any
other locations configured in the `dendrite.yaml` configuration file. Often there will
be clues in the logs.
You can increase this log level to the more verbose `debug` level if necessary by adding
this to the config and restarting Dendrite:
```
logging:
- type: std
level: debug
```
Look specifically for lines that contain `level=error` or `level=warning`.
## 2. Federation tester
If you are experiencing problems federating with other homeservers, you should check
that the [Federation Tester](https://federationtester.matrix.org) is passing for your
server.
Common reasons that it may not pass include:
1. Incorrect DNS configuration;
2. Misconfigured DNS SRV entries or well-known files;
3. Invalid TLS/SSL certificates;
4. Reverse proxy configuration issues (if applicable).
Correct any errors if shown and re-run the federation tester to check the results.
## 3. System time
Matrix relies heavily on TLS which requires the system time to be correct. If the clock
drifts then you may find that federation no works reliably (or at all) and clients may
struggle to connect to your Dendrite server.
Ensure that your system time is correct and consider syncing to a reliable NTP source.
## 4. Database connections
If you are using the PostgreSQL database, you should ensure that Dendrite's configured
number of database connections does not exceed the maximum allowed by PostgreSQL.
Open your `postgresql.conf` configuration file and check the value of `max_connections`
(which is typically `100` by default). Then open your `dendrite.yaml` configuration file
and ensure that:
1. If you are using the `global.database` section, that `max_open_conns` does not exceed
that number;
2. If you are **not** using the `global.database` section, that the sum total of all
`max_open_conns` across all `database` blocks does not exceed that number.
## 5. File descriptors
Dendrite requires a sufficient number of file descriptors for every connection it makes
to a remote server, every connection to the database engine and every file it is reading
or writing to at a given time (media, logs etc). We recommend ensuring that the limit is
no lower than 65535 for Dendrite.
Dendrite will check at startup if there are a sufficient number of available descriptors.
If there aren't, you will see a log lines like this:
```
level=warning msg="IMPORTANT: Process file descriptor limit is currently 65535, it is recommended to raise the limit for Dendrite to at least 65535 to avoid issues"
```
Follow the [Optimisation](../installation/10_optimisation.md) instructions to correct the
available number of file descriptors.

View file

@ -95,6 +95,11 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msg *nats.Ms
return true return true
} }
// The SyncAPI is already handling sendToDevice for the local server
if destServerName == t.ServerName {
return true
}
// Pack the EDU and marshal it // Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{ edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MDirectToDevice, Type: gomatrixserverlib.MDirectToDevice,

2
go.mod
View file

@ -25,7 +25,7 @@ require (
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220711125303-3bb2e997a44c github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.13 github.com/mattn/go-sqlite3 v1.14.13

4
go.sum
View file

@ -341,8 +341,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220711125303-3bb2e997a44c h1:mt30TDK8kXKV+nCmVfnqoXsh842N+74kvZw7DXuS/JQ= github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46 h1:5X/kXY3nwqKOwwrE9tnMKrjbsi3PHigQYvrvDBSntO8=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220711125303-3bb2e997a44c/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a h1:DdG8vXMlZ65EAtc4V+3t7zHZ2Gqs24pSnyXS+4BRHUs= github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a h1:DdG8vXMlZ65EAtc4V+3t7zHZ2Gqs24pSnyXS+4BRHUs=
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc= github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -16,18 +16,18 @@ import (
// a room Info cache. It must only be used from the roomserver only // a room Info cache. It must only be used from the roomserver only
// It is not safe for use from other components. // It is not safe for use from other components.
type RoomInfoCache interface { type RoomInfoCache interface {
GetRoomInfo(roomID string) (roomInfo types.RoomInfo, ok bool) GetRoomInfo(roomID string) (roomInfo *types.RoomInfo, ok bool)
StoreRoomInfo(roomID string, roomInfo types.RoomInfo) StoreRoomInfo(roomID string, roomInfo *types.RoomInfo)
} }
// GetRoomInfo must only be called from the roomserver only. It is not // GetRoomInfo must only be called from the roomserver only. It is not
// safe for use from other components. // safe for use from other components.
func (c Caches) GetRoomInfo(roomID string) (types.RoomInfo, bool) { func (c Caches) GetRoomInfo(roomID string) (*types.RoomInfo, bool) {
return c.RoomInfos.Get(roomID) return c.RoomInfos.Get(roomID)
} }
// StoreRoomInfo must only be called from the roomserver only. It is not // StoreRoomInfo must only be called from the roomserver only. It is not
// safe for use from other components. // safe for use from other components.
func (c Caches) StoreRoomInfo(roomID string, roomInfo types.RoomInfo) { func (c Caches) StoreRoomInfo(roomID string, roomInfo *types.RoomInfo) {
c.RoomInfos.Set(roomID, roomInfo) c.RoomInfos.Set(roomID, roomInfo)
} }

View file

@ -28,7 +28,7 @@ type Caches struct {
RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID
RoomServerRoomIDs Cache[int64, string] // room NID -> room ID RoomServerRoomIDs Cache[int64, string] // room NID -> room ID
RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event
RoomInfos Cache[string, types.RoomInfo] // room ID -> room info RoomInfos Cache[string, *types.RoomInfo] // room ID -> room info
FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU
FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU
SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response

View file

@ -100,7 +100,7 @@ func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enableProm
MaxAge: maxAge, MaxAge: maxAge,
}, },
}, },
RoomInfos: &RistrettoCachePartition[string, types.RoomInfo]{ // room ID -> room info RoomInfos: &RistrettoCachePartition[string, *types.RoomInfo]{ // room ID -> room info
cache: cache, cache: cache,
Prefix: roomInfosCache, Prefix: roomInfosCache,
Mutable: true, Mutable: true,

View file

@ -225,13 +225,12 @@ func (u *RoomUpdater) SetLatestEvents(
if err := u.d.RoomsTable.UpdateLatestEventNIDs(u.ctx, txn, roomNID, eventNIDs, lastEventNIDSent, currentStateSnapshotNID); err != nil { if err := u.d.RoomsTable.UpdateLatestEventNIDs(u.ctx, txn, roomNID, eventNIDs, lastEventNIDSent, currentStateSnapshotNID); err != nil {
return fmt.Errorf("u.d.RoomsTable.updateLatestEventNIDs: %w", err) return fmt.Errorf("u.d.RoomsTable.updateLatestEventNIDs: %w", err)
} }
if roomID, ok := u.d.Cache.GetRoomServerRoomID(roomNID); ok {
if roomInfo, ok := u.d.Cache.GetRoomInfo(roomID); ok { // Since it's entirely possible that this types.RoomInfo came from the
roomInfo.StateSnapshotNID = currentStateSnapshotNID // cache, we should make sure to update that entry so that the next run
roomInfo.IsStub = false // works from live data.
u.d.Cache.StoreRoomInfo(roomID, roomInfo) u.roomInfo.StateSnapshotNID = currentStateSnapshotNID
} u.roomInfo.IsStub = false
}
return nil return nil
}) })
} }

View file

@ -139,13 +139,13 @@ func (d *Database) RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo
} }
func (d *Database) roomInfo(ctx context.Context, txn *sql.Tx, roomID string) (*types.RoomInfo, error) { func (d *Database) roomInfo(ctx context.Context, txn *sql.Tx, roomID string) (*types.RoomInfo, error) {
if roomInfo, ok := d.Cache.GetRoomInfo(roomID); ok { if roomInfo, ok := d.Cache.GetRoomInfo(roomID); ok && roomInfo != nil {
return &roomInfo, nil return roomInfo, nil
} }
roomInfo, err := d.RoomsTable.SelectRoomInfo(ctx, txn, roomID) roomInfo, err := d.RoomsTable.SelectRoomInfo(ctx, txn, roomID)
if err == nil && roomInfo != nil { if err == nil && roomInfo != nil {
d.Cache.StoreRoomServerRoomID(roomInfo.RoomNID, roomID) d.Cache.StoreRoomServerRoomID(roomInfo.RoomNID, roomID)
d.Cache.StoreRoomInfo(roomID, *roomInfo) d.Cache.StoreRoomInfo(roomID, roomInfo)
} }
return roomInfo, err return roomInfo, err
} }

View file

@ -21,6 +21,7 @@ import (
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
keytypes "github.com/matrix-org/dendrite/keyserver/types" keytypes "github.com/matrix-org/dendrite/keyserver/types"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -46,7 +47,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.SyncKeyAPI, userID, devi
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST // was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
// be already filled in with join/leave information. // be already filled in with join/leave information.
func DeviceListCatchup( func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI, ctx context.Context, db storage.SharedUsers, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI,
userID string, res *types.Response, from, to types.StreamPosition, userID string, res *types.Response, from, to types.StreamPosition,
) (newPos types.StreamPosition, hasNew bool, err error) { ) (newPos types.StreamPosition, hasNew bool, err error) {
@ -93,7 +94,7 @@ func DeviceListCatchup(
queryRes.UserIDs = append(queryRes.UserIDs, leaveUserIDs...) queryRes.UserIDs = append(queryRes.UserIDs, leaveUserIDs...)
queryRes.UserIDs = util.UniqueStrings(queryRes.UserIDs) queryRes.UserIDs = util.UniqueStrings(queryRes.UserIDs)
var sharedUsersMap map[string]int var sharedUsersMap map[string]int
sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, rsAPI, userID, queryRes.UserIDs) sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, db, userID, queryRes.UserIDs)
util.GetLogger(ctx).Debugf( util.GetLogger(ctx).Debugf(
"QueryKeyChanges request off=%d,to=%d response off=%d uids=%v", "QueryKeyChanges request off=%d,to=%d response off=%d uids=%v",
offset, toOffset, queryRes.Offset, queryRes.UserIDs, offset, toOffset, queryRes.Offset, queryRes.UserIDs,
@ -215,30 +216,28 @@ func TrackChangedUsers(
return changed, left, nil return changed, left, nil
} }
// filterSharedUsers takes a list of remote users whose keys have changed and filters
// it down to include only users who the requesting user shares a room with.
func filterSharedUsers( func filterSharedUsers(
ctx context.Context, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, usersWithChangedKeys []string, ctx context.Context, db storage.SharedUsers, userID string, usersWithChangedKeys []string,
) (map[string]int, []string) { ) (map[string]int, []string) {
var result []string sharedUsersMap := make(map[string]int, len(usersWithChangedKeys))
var sharedUsersRes roomserverAPI.QuerySharedUsersResponse for _, userID := range usersWithChangedKeys {
err := rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{ sharedUsersMap[userID] = 0
UserID: userID, }
OtherUserIDs: usersWithChangedKeys, sharedUsers, err := db.SharedUsers(ctx, userID, usersWithChangedKeys)
}, &sharedUsersRes)
if err != nil { if err != nil {
// default to all users so we do needless queries rather than miss some important device update // default to all users so we do needless queries rather than miss some important device update
return nil, usersWithChangedKeys return nil, usersWithChangedKeys
} }
for _, userID := range sharedUsers {
sharedUsersMap[userID]++
}
// We forcibly put ourselves in this list because we should be notified about our own device updates // We forcibly put ourselves in this list because we should be notified about our own device updates
// and if we are in 0 rooms then we don't technically share any room with ourselves so we wouldn't // and if we are in 0 rooms then we don't technically share any room with ourselves so we wouldn't
// be notified about key changes. // be notified about key changes.
sharedUsersRes.UserIDsToCount[userID] = 1 sharedUsersMap[userID] = 1
return sharedUsersMap, sharedUsers
for _, uid := range usersWithChangedKeys {
if sharedUsersRes.UserIDsToCount[uid] > 0 {
result = append(result, uid)
}
}
return sharedUsersRes.UserIDsToCount, result
} }
func joinedRooms(res *types.Response, userID string) []string { func joinedRooms(res *types.Response, userID string) []string {

View file

@ -11,6 +11,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
) )
var ( var (
@ -105,6 +106,22 @@ func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.Query
return nil return nil
} }
// This is actually a database function, but seeing as we track the state inside the
// *mockRoomserverAPI, we'll just comply with the interface here instead.
func (s *mockRoomserverAPI) SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) {
commonUsers := []string{}
for _, members := range s.roomIDToJoinedMembers {
for _, member := range members {
for _, userID := range otherUserIDs {
if member == userID {
commonUsers = append(commonUsers, userID)
}
}
}
}
return util.UniqueStrings(commonUsers), nil
}
type wantCatchup struct { type wantCatchup struct {
hasNew bool hasNew bool
changed []string changed []string
@ -178,7 +195,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
} }
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil { if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
@ -201,7 +218,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
} }
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil { if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
@ -224,7 +241,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser},
}, },
} }
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("Catchup returned an error: %s", err)
} }
@ -246,7 +263,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser},
}, },
} }
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil { if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
@ -305,7 +322,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
roomID: {syncingUser, existingUser}, roomID: {syncingUser, existingUser},
}, },
} }
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil { if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
@ -333,7 +350,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
} }
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("Catchup returned an error: %s", err)
} }
@ -419,7 +436,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
}, },
} }
_, hasNew, err := DeviceListCatchup( _, hasNew, err := DeviceListCatchup(
context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken, context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken,
) )
if err != nil { if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)

View file

@ -27,6 +27,8 @@ import (
type Database interface { type Database interface {
Presence Presence
SharedUsers
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
@ -165,3 +167,8 @@ type Presence interface {
PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error)
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
} }
type SharedUsers interface {
// SharedUsers returns a subset of otherUserIDs that share a room with userID.
SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error)
}

View file

@ -107,6 +107,11 @@ const selectEventsWithEventIDsSQL = "" +
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
" FROM syncapi_current_room_state WHERE event_id = ANY($1)" " FROM syncapi_current_room_state WHERE event_id = ANY($1)"
const selectSharedUsersSQL = "" +
"SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" +
" SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
") AND state_key = ANY($2) AND membership='join';"
type currentRoomStateStatements struct { type currentRoomStateStatements struct {
upsertRoomStateStmt *sql.Stmt upsertRoomStateStmt *sql.Stmt
deleteRoomStateByEventIDStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt
@ -118,6 +123,7 @@ type currentRoomStateStatements struct {
selectJoinedUsersInRoomStmt *sql.Stmt selectJoinedUsersInRoomStmt *sql.Stmt
selectEventsWithEventIDsStmt *sql.Stmt selectEventsWithEventIDsStmt *sql.Stmt
selectStateEventStmt *sql.Stmt selectStateEventStmt *sql.Stmt
selectSharedUsersStmt *sql.Stmt
} }
func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) { func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
@ -156,6 +162,9 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
return nil, err return nil, err
} }
if s.selectSharedUsersStmt, err = db.Prepare(selectSharedUsersSQL); err != nil {
return nil, err
}
return s, nil return s, nil
} }
@ -379,3 +388,24 @@ func (s *currentRoomStateStatements) SelectStateEvent(
} }
return &ev, err return &ev, err
} }
func (s *currentRoomStateStatements) SelectSharedUsers(
ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string,
) ([]string, error) {
stmt := sqlutil.TxStmt(txn, s.selectSharedUsersStmt)
rows, err := stmt.QueryContext(ctx, userID, otherUserIDs)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectSharedUsersStmt: rows.close() failed")
var stateKey string
result := make([]string, 0, len(otherUserIDs))
for rows.Next() {
if err := rows.Scan(&stateKey); err != nil {
return nil, err
}
result = append(result, stateKey)
}
return result, rows.Err()
}

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/sirupsen/logrus"
) )
const sendToDeviceSchema = ` const sendToDeviceSchema = `
@ -51,7 +52,7 @@ const selectSendToDeviceMessagesSQL = `
SELECT id, user_id, device_id, content SELECT id, user_id, device_id, content
FROM syncapi_send_to_device FROM syncapi_send_to_device
WHERE user_id = $1 AND device_id = $2 AND id > $3 AND id <= $4 WHERE user_id = $1 AND device_id = $2 AND id > $3 AND id <= $4
ORDER BY id DESC ORDER BY id ASC
` `
const deleteSendToDeviceMessagesSQL = ` const deleteSendToDeviceMessagesSQL = `
@ -112,17 +113,18 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages(
if err = rows.Scan(&id, &userID, &deviceID, &content); err != nil { if err = rows.Scan(&id, &userID, &deviceID, &content); err != nil {
return return
} }
if id > lastPos {
lastPos = id
}
event := types.SendToDeviceEvent{ event := types.SendToDeviceEvent{
ID: id, ID: id,
UserID: userID, UserID: userID,
DeviceID: deviceID, DeviceID: deviceID,
} }
if err = json.Unmarshal([]byte(content), &event.SendToDeviceEvent); err != nil { if err = json.Unmarshal([]byte(content), &event.SendToDeviceEvent); err != nil {
logrus.WithError(err).Errorf("Failed to unmarshal send-to-device message")
continue continue
} }
if id > lastPos {
lastPos = id
}
events = append(events, event) events = append(events, event)
} }
if lastPos == 0 { if lastPos == 0 {

View file

@ -176,6 +176,10 @@ func (d *Database) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]t
return d.Peeks.SelectPeekingDevices(ctx) return d.Peeks.SelectPeekingDevices(ctx)
} }
func (d *Database) SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) {
return d.CurrentRoomState.SelectSharedUsers(ctx, nil, userID, otherUserIDs)
}
func (d *Database) GetStateEvent( func (d *Database) GetStateEvent(
ctx context.Context, roomID, evType, stateKey string, ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.HeaderedEvent, error) { ) (*gomatrixserverlib.HeaderedEvent, error) {

View file

@ -91,6 +91,11 @@ const selectEventsWithEventIDsSQL = "" +
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
" FROM syncapi_current_room_state WHERE event_id IN ($1)" " FROM syncapi_current_room_state WHERE event_id IN ($1)"
const selectSharedUsersSQL = "" +
"SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" +
" SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
") AND state_key IN ($2) AND membership='join';"
type currentRoomStateStatements struct { type currentRoomStateStatements struct {
db *sql.DB db *sql.DB
streamIDStatements *StreamIDStatements streamIDStatements *StreamIDStatements
@ -102,6 +107,7 @@ type currentRoomStateStatements struct {
selectJoinedUsersStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt
//selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic //selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic
selectStateEventStmt *sql.Stmt selectStateEventStmt *sql.Stmt
//selectSharedUsersSQL *sql.Stmt - prepared at runtime due to variadic
} }
func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (tables.CurrentRoomState, error) { func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (tables.CurrentRoomState, error) {
@ -396,3 +402,29 @@ func (s *currentRoomStateStatements) SelectStateEvent(
} }
return &ev, err return &ev, err
} }
func (s *currentRoomStateStatements) SelectSharedUsers(
ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string,
) ([]string, error) {
query := strings.Replace(selectSharedUsersSQL, "($2)", sqlutil.QueryVariadicOffset(len(otherUserIDs), 1), 1)
stmt, err := s.db.Prepare(query)
if err != nil {
return nil, fmt.Errorf("SelectSharedUsers s.db.Prepare: %w", err)
}
defer internal.CloseAndLogIfError(ctx, stmt, "SelectSharedUsers: stmt.close() failed")
rows, err := sqlutil.TxStmt(txn, stmt).QueryContext(ctx, userID, otherUserIDs)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectSharedUsersStmt: rows.close() failed")
var stateKey string
result := make([]string, 0, len(otherUserIDs))
for rows.Next() {
if err := rows.Scan(&stateKey); err != nil {
return nil, err
}
result = append(result, stateKey)
}
return result, rows.Err()
}

View file

@ -49,7 +49,7 @@ const selectSendToDeviceMessagesSQL = `
SELECT id, user_id, device_id, content SELECT id, user_id, device_id, content
FROM syncapi_send_to_device FROM syncapi_send_to_device
WHERE user_id = $1 AND device_id = $2 AND id > $3 AND id <= $4 WHERE user_id = $1 AND device_id = $2 AND id > $3 AND id <= $4
ORDER BY id DESC ORDER BY id ASC
` `
const deleteSendToDeviceMessagesSQL = ` const deleteSendToDeviceMessagesSQL = `
@ -120,9 +120,6 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages(
logrus.WithError(err).Errorf("Failed to retrieve send-to-device message") logrus.WithError(err).Errorf("Failed to retrieve send-to-device message")
return return
} }
if id > lastPos {
lastPos = id
}
event := types.SendToDeviceEvent{ event := types.SendToDeviceEvent{
ID: id, ID: id,
UserID: userID, UserID: userID,
@ -132,6 +129,9 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages(
logrus.WithError(err).Errorf("Failed to unmarshal send-to-device message") logrus.WithError(err).Errorf("Failed to unmarshal send-to-device message")
continue continue
} }
if id > lastPos {
lastPos = id
}
events = append(events, event) events = append(events, event)
} }
if lastPos == 0 { if lastPos == 0 {

View file

@ -1,7 +1,9 @@
package storage_test package storage_test
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"reflect" "reflect"
"testing" "testing"
@ -394,28 +396,34 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) {
from = topologyTokenBefore(t, db, paginatedEvents[len(paginatedEvents)-1].EventID()) from = topologyTokenBefore(t, db, paginatedEvents[len(paginatedEvents)-1].EventID())
} }
} }
*/
func TestSendToDeviceBehaviour(t *testing.T) { func TestSendToDeviceBehaviour(t *testing.T) {
//t.Parallel() t.Parallel()
db := MustCreateDatabase(t) alice := test.NewUser(t)
bob := test.NewUser(t)
deviceID := "one"
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, close := MustCreateDatabase(t, dbType)
defer close()
// At this point there should be no messages. We haven't sent anything // At this point there should be no messages. We haven't sent anything
// yet. // yet.
_, events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{}) _, events, err := db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 { if len(events) != 0 {
t.Fatal("first call should have no updates") t.Fatal("first call should have no updates")
} }
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{})
err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, 100)
if err != nil { if err != nil {
return return
} }
// Try sending a message. // Try sending a message.
streamPos, err := db.StoreNewSendForDeviceMessage(ctx, "alice", "one", gomatrixserverlib.SendToDeviceEvent{ streamPos, err := db.StoreNewSendForDeviceMessage(ctx, alice.ID, deviceID, gomatrixserverlib.SendToDeviceEvent{
Sender: "bob", Sender: bob.ID,
Type: "m.type", Type: "m.type",
Content: json.RawMessage("{}"), Content: json.RawMessage("{}"),
}) })
@ -426,14 +434,14 @@ func TestSendToDeviceBehaviour(t *testing.T) {
// At this point we should get exactly one message. We're sending the sync position // At this point we should get exactly one message. We're sending the sync position
// that we were given from the update and the send-to-device update will be updated // that we were given from the update and the send-to-device update will be updated
// in the database to reflect that this was the sync position we sent the message at. // in the database to reflect that this was the sync position we sent the message at.
_, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos}) streamPos, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, streamPos)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(events) != 1 || len(updates) != 1 || len(deletions) != 0 { if count := len(events); count != 1 {
t.Fatal("second call should have one update") t.Fatalf("second call should have one update, got %d", count)
} }
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos}) err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos)
if err != nil { if err != nil {
return return
} }
@ -441,43 +449,72 @@ func TestSendToDeviceBehaviour(t *testing.T) {
// At this point we should still have one message because we haven't progressed the // At this point we should still have one message because we haven't progressed the
// sync position yet. This is equivalent to the client failing to /sync and retrying // sync position yet. This is equivalent to the client failing to /sync and retrying
// with the same position. // with the same position.
_, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos}) streamPos, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(events) != 1 || len(updates) != 0 || len(deletions) != 0 { if len(events) != 1 {
t.Fatal("third call should have one update still") t.Fatal("third call should have one update still")
} }
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos}) err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos+1)
if err != nil { if err != nil {
return return
} }
// At this point we should now have no updates, because we've progressed the sync // At this point we should now have no updates, because we've progressed the sync
// position. Therefore the update from before will not be sent again. // position. Therefore the update from before will not be sent again.
_, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 1}) _, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, streamPos+1, streamPos+2)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(events) != 0 || len(updates) != 0 || len(deletions) != 1 { if len(events) != 0 {
t.Fatal("fourth call should have no updates") t.Fatal("fourth call should have no updates")
} }
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos + 1}) err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos+1)
if err != nil { if err != nil {
return return
} }
// At this point we should still have no updates, because no new updates have been // At this point we should still have no updates, because no new updates have been
// sent. // sent.
_, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 2}) _, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, streamPos, streamPos+2)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 { if len(events) != 0 {
t.Fatal("fifth call should have no updates") t.Fatal("fifth call should have no updates")
} }
// Send some more messages and verify the ordering is correct ("in order of arrival")
var lastPos types.StreamPosition = 0
for i := 0; i < 10; i++ {
streamPos, err = db.StoreNewSendForDeviceMessage(ctx, alice.ID, deviceID, gomatrixserverlib.SendToDeviceEvent{
Sender: bob.ID,
Type: "m.type",
Content: json.RawMessage(fmt.Sprintf(`{ "count": %d }`, i)),
})
if err != nil {
t.Fatal(err)
}
lastPos = streamPos
}
_, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, lastPos)
if err != nil {
t.Fatalf("unable to get events: %v", err)
}
for i := 0; i < 10; i++ {
want := json.RawMessage(fmt.Sprintf(`{"count":%d}`, i))
got := events[i].Content
if !bytes.Equal(got, want) {
t.Fatalf("messages are out of order\nwant: %s\ngot: %s", string(want), string(got))
}
}
})
} }
/*
func TestInviteBehaviour(t *testing.T) { func TestInviteBehaviour(t *testing.T) {
db := MustCreateDatabase(t) db := MustCreateDatabase(t)
inviteRoom1 := "!inviteRoom1:somewhere" inviteRoom1 := "!inviteRoom1:somewhere"

View file

@ -104,6 +104,8 @@ type CurrentRoomState interface {
SelectJoinedUsers(ctx context.Context) (map[string][]string, error) SelectJoinedUsers(ctx context.Context) (map[string][]string, error)
// SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room. // SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room.
SelectJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error) SelectJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error)
// SelectSharedUsers returns a subset of otherUserIDs that share a room with userID.
SelectSharedUsers(ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string) ([]string, error)
} }
// BackwardsExtremities keeps track of backwards extremities for a room. // BackwardsExtremities keeps track of backwards extremities for a room.

View file

@ -28,7 +28,7 @@ func (p *DeviceListStreamProvider) IncrementalSync(
from, to types.StreamPosition, from, to types.StreamPosition,
) types.StreamPosition { ) types.StreamPosition {
var err error var err error
to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) to, _, err = internal.DeviceListCatchup(context.Background(), p.DB, p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to)
if err != nil { if err != nil {
req.Log.WithError(err).Error("internal.DeviceListCatchup failed") req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
return from return from

View file

@ -429,7 +429,7 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use
} }
rp.streams.PDUStreamProvider.IncrementalSync(req.Context(), syncReq, fromToken.PDUPosition, toToken.PDUPosition) rp.streams.PDUStreamProvider.IncrementalSync(req.Context(), syncReq, fromToken.PDUPosition, toToken.PDUPosition)
_, _, err = internal.DeviceListCatchup( _, _, err = internal.DeviceListCatchup(
req.Context(), rp.keyAPI, rp.rsAPI, syncReq.Device.UserID, req.Context(), rp.db, rp.keyAPI, rp.rsAPI, syncReq.Device.UserID,
syncReq.Response, fromToken.DeviceListPosition, toToken.DeviceListPosition, syncReq.Response, fromToken.DeviceListPosition, toToken.DeviceListPosition,
) )
if err != nil { if err != nil {

View file

@ -720,3 +720,4 @@ Setting state twice is idempotent
Joining room twice is idempotent Joining room twice is idempotent
Inbound federation can return missing events for shared visibility Inbound federation can return missing events for shared visibility
Inbound federation ignores redactions from invalid servers room > v3 Inbound federation ignores redactions from invalid servers room > v3
Newly joined room includes presence in incremental sync