diff --git a/docs/administration/1_createusers.md b/docs/administration/1_createusers.md index f40b7f576..61ec2299b 100644 --- a/docs/administration/1_createusers.md +++ b/docs/administration/1_createusers.md @@ -32,6 +32,15 @@ To create a new **admin account**, add the `-admin` flag: ./bin/create-account -config /path/to/dendrite.yaml -username USERNAME -admin ``` +An example of using `create-account` when running in **Docker**, having found the `CONTAINERNAME` from `docker ps`: + +```bash +docker exec -it CONTAINERNAME /usr/bin/create-account -config /path/to/dendrite.yaml -username USERNAME +``` +```bash +docker exec -it CONTAINERNAME /usr/bin/create-account -config /path/to/dendrite.yaml -username USERNAME -admin +``` + ## Using shared secret registration Dendrite supports the Synapse-compatible shared secret registration endpoint. diff --git a/docs/administration/5_troubleshooting.md b/docs/administration/5_troubleshooting.md new file mode 100644 index 000000000..14df2e3fb --- /dev/null +++ b/docs/administration/5_troubleshooting.md @@ -0,0 +1,81 @@ +--- +title: Troubleshooting +parent: Administration +permalink: /administration/troubleshooting +--- + +# Troubleshooting + +If your Dendrite installation is acting strangely, there are a few things you should +check before seeking help. + +## 1. Logs + +Dendrite, by default, will log all warnings and errors to stdout, in addition to any +other locations configured in the `dendrite.yaml` configuration file. Often there will +be clues in the logs. + +You can increase this log level to the more verbose `debug` level if necessary by adding +this to the config and restarting Dendrite: + +``` +logging: +- type: std + level: debug +``` + +Look specifically for lines that contain `level=error` or `level=warning`. + +## 2. Federation tester + +If you are experiencing problems federating with other homeservers, you should check +that the [Federation Tester](https://federationtester.matrix.org) is passing for your +server. + +Common reasons that it may not pass include: + +1. Incorrect DNS configuration; +2. Misconfigured DNS SRV entries or well-known files; +3. Invalid TLS/SSL certificates; +4. Reverse proxy configuration issues (if applicable). + +Correct any errors if shown and re-run the federation tester to check the results. + +## 3. System time + +Matrix relies heavily on TLS which requires the system time to be correct. If the clock +drifts then you may find that federation no works reliably (or at all) and clients may +struggle to connect to your Dendrite server. + +Ensure that your system time is correct and consider syncing to a reliable NTP source. + +## 4. Database connections + +If you are using the PostgreSQL database, you should ensure that Dendrite's configured +number of database connections does not exceed the maximum allowed by PostgreSQL. + +Open your `postgresql.conf` configuration file and check the value of `max_connections` +(which is typically `100` by default). Then open your `dendrite.yaml` configuration file +and ensure that: + +1. If you are using the `global.database` section, that `max_open_conns` does not exceed + that number; +2. If you are **not** using the `global.database` section, that the sum total of all + `max_open_conns` across all `database` blocks does not exceed that number. + +## 5. File descriptors + +Dendrite requires a sufficient number of file descriptors for every connection it makes +to a remote server, every connection to the database engine and every file it is reading +or writing to at a given time (media, logs etc). We recommend ensuring that the limit is +no lower than 65535 for Dendrite. + +Dendrite will check at startup if there are a sufficient number of available descriptors. +If there aren't, you will see a log lines like this: + +``` +level=warning msg="IMPORTANT: Process file descriptor limit is currently 65535, it is recommended to raise the limit for Dendrite to at least 65535 to avoid issues" +``` + +Follow the [Optimisation](../installation/10_optimisation.md) instructions to correct the +available number of file descriptors. diff --git a/federationapi/consumers/sendtodevice.go b/federationapi/consumers/sendtodevice.go index 84c9f620d..f99a895e0 100644 --- a/federationapi/consumers/sendtodevice.go +++ b/federationapi/consumers/sendtodevice.go @@ -95,6 +95,11 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msg *nats.Ms return true } + // The SyncAPI is already handling sendToDevice for the local server + if destServerName == t.ServerName { + return true + } + // Pack the EDU and marshal it edu := &gomatrixserverlib.EDU{ Type: gomatrixserverlib.MDirectToDevice, diff --git a/go.mod b/go.mod index 2a2a037c1..1e62e1d93 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20220711125303-3bb2e997a44c + github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46 github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 diff --git a/go.sum b/go.sum index 98549f702..86656ccd5 100644 --- a/go.sum +++ b/go.sum @@ -341,8 +341,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220711125303-3bb2e997a44c h1:mt30TDK8kXKV+nCmVfnqoXsh842N+74kvZw7DXuS/JQ= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220711125303-3bb2e997a44c/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46 h1:5X/kXY3nwqKOwwrE9tnMKrjbsi3PHigQYvrvDBSntO8= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a h1:DdG8vXMlZ65EAtc4V+3t7zHZ2Gqs24pSnyXS+4BRHUs= github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= diff --git a/internal/caching/cache_roominfo.go b/internal/caching/cache_roominfo.go index d03a61077..5dfed3c85 100644 --- a/internal/caching/cache_roominfo.go +++ b/internal/caching/cache_roominfo.go @@ -16,18 +16,18 @@ import ( // a room Info cache. It must only be used from the roomserver only // It is not safe for use from other components. type RoomInfoCache interface { - GetRoomInfo(roomID string) (roomInfo types.RoomInfo, ok bool) - StoreRoomInfo(roomID string, roomInfo types.RoomInfo) + GetRoomInfo(roomID string) (roomInfo *types.RoomInfo, ok bool) + StoreRoomInfo(roomID string, roomInfo *types.RoomInfo) } // GetRoomInfo must only be called from the roomserver only. It is not // safe for use from other components. -func (c Caches) GetRoomInfo(roomID string) (types.RoomInfo, bool) { +func (c Caches) GetRoomInfo(roomID string) (*types.RoomInfo, bool) { return c.RoomInfos.Get(roomID) } // StoreRoomInfo must only be called from the roomserver only. It is not // safe for use from other components. -func (c Caches) StoreRoomInfo(roomID string, roomInfo types.RoomInfo) { +func (c Caches) StoreRoomInfo(roomID string, roomInfo *types.RoomInfo) { c.RoomInfos.Set(roomID, roomInfo) } diff --git a/internal/caching/caches.go b/internal/caching/caches.go index 14b232dd0..e7914ce7d 100644 --- a/internal/caching/caches.go +++ b/internal/caching/caches.go @@ -28,7 +28,7 @@ type Caches struct { RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID RoomServerRoomIDs Cache[int64, string] // room NID -> room ID RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event - RoomInfos Cache[string, types.RoomInfo] // room ID -> room info + RoomInfos Cache[string, *types.RoomInfo] // room ID -> room info FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response diff --git a/internal/caching/impl_ristretto.go b/internal/caching/impl_ristretto.go index 6d625b552..677218b5e 100644 --- a/internal/caching/impl_ristretto.go +++ b/internal/caching/impl_ristretto.go @@ -100,7 +100,7 @@ func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enableProm MaxAge: maxAge, }, }, - RoomInfos: &RistrettoCachePartition[string, types.RoomInfo]{ // room ID -> room info + RoomInfos: &RistrettoCachePartition[string, *types.RoomInfo]{ // room ID -> room info cache: cache, Prefix: roomInfosCache, Mutable: true, diff --git a/roomserver/storage/shared/room_updater.go b/roomserver/storage/shared/room_updater.go index 8f4e011bf..c35ac653c 100644 --- a/roomserver/storage/shared/room_updater.go +++ b/roomserver/storage/shared/room_updater.go @@ -225,13 +225,12 @@ func (u *RoomUpdater) SetLatestEvents( if err := u.d.RoomsTable.UpdateLatestEventNIDs(u.ctx, txn, roomNID, eventNIDs, lastEventNIDSent, currentStateSnapshotNID); err != nil { return fmt.Errorf("u.d.RoomsTable.updateLatestEventNIDs: %w", err) } - if roomID, ok := u.d.Cache.GetRoomServerRoomID(roomNID); ok { - if roomInfo, ok := u.d.Cache.GetRoomInfo(roomID); ok { - roomInfo.StateSnapshotNID = currentStateSnapshotNID - roomInfo.IsStub = false - u.d.Cache.StoreRoomInfo(roomID, roomInfo) - } - } + + // Since it's entirely possible that this types.RoomInfo came from the + // cache, we should make sure to update that entry so that the next run + // works from live data. + u.roomInfo.StateSnapshotNID = currentStateSnapshotNID + u.roomInfo.IsStub = false return nil }) } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 692af1f6c..d8d5f67c8 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -139,13 +139,13 @@ func (d *Database) RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo } func (d *Database) roomInfo(ctx context.Context, txn *sql.Tx, roomID string) (*types.RoomInfo, error) { - if roomInfo, ok := d.Cache.GetRoomInfo(roomID); ok { - return &roomInfo, nil + if roomInfo, ok := d.Cache.GetRoomInfo(roomID); ok && roomInfo != nil { + return roomInfo, nil } roomInfo, err := d.RoomsTable.SelectRoomInfo(ctx, txn, roomID) if err == nil && roomInfo != nil { d.Cache.StoreRoomServerRoomID(roomInfo.RoomNID, roomID) - d.Cache.StoreRoomInfo(roomID, *roomInfo) + d.Cache.StoreRoomInfo(roomID, roomInfo) } return roomInfo, err } diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index d96718d20..03df9285c 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -21,6 +21,7 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" keytypes "github.com/matrix-org/dendrite/keyserver/types" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -46,7 +47,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.SyncKeyAPI, userID, devi // was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST // be already filled in with join/leave information. func DeviceListCatchup( - ctx context.Context, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI, + ctx context.Context, db storage.SharedUsers, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, res *types.Response, from, to types.StreamPosition, ) (newPos types.StreamPosition, hasNew bool, err error) { @@ -93,7 +94,7 @@ func DeviceListCatchup( queryRes.UserIDs = append(queryRes.UserIDs, leaveUserIDs...) queryRes.UserIDs = util.UniqueStrings(queryRes.UserIDs) var sharedUsersMap map[string]int - sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, rsAPI, userID, queryRes.UserIDs) + sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, db, userID, queryRes.UserIDs) util.GetLogger(ctx).Debugf( "QueryKeyChanges request off=%d,to=%d response off=%d uids=%v", offset, toOffset, queryRes.Offset, queryRes.UserIDs, @@ -215,30 +216,28 @@ func TrackChangedUsers( return changed, left, nil } +// filterSharedUsers takes a list of remote users whose keys have changed and filters +// it down to include only users who the requesting user shares a room with. func filterSharedUsers( - ctx context.Context, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, usersWithChangedKeys []string, + ctx context.Context, db storage.SharedUsers, userID string, usersWithChangedKeys []string, ) (map[string]int, []string) { - var result []string - var sharedUsersRes roomserverAPI.QuerySharedUsersResponse - err := rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{ - UserID: userID, - OtherUserIDs: usersWithChangedKeys, - }, &sharedUsersRes) + sharedUsersMap := make(map[string]int, len(usersWithChangedKeys)) + for _, userID := range usersWithChangedKeys { + sharedUsersMap[userID] = 0 + } + sharedUsers, err := db.SharedUsers(ctx, userID, usersWithChangedKeys) if err != nil { // default to all users so we do needless queries rather than miss some important device update return nil, usersWithChangedKeys } + for _, userID := range sharedUsers { + sharedUsersMap[userID]++ + } // We forcibly put ourselves in this list because we should be notified about our own device updates // and if we are in 0 rooms then we don't technically share any room with ourselves so we wouldn't // be notified about key changes. - sharedUsersRes.UserIDsToCount[userID] = 1 - - for _, uid := range usersWithChangedKeys { - if sharedUsersRes.UserIDsToCount[uid] > 0 { - result = append(result, uid) - } - } - return sharedUsersRes.UserIDsToCount, result + sharedUsersMap[userID] = 1 + return sharedUsersMap, sharedUsers } func joinedRooms(res *types.Response, userID string) []string { diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 219b35e2c..79ed440e7 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -11,6 +11,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" ) var ( @@ -105,6 +106,22 @@ func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.Query return nil } +// This is actually a database function, but seeing as we track the state inside the +// *mockRoomserverAPI, we'll just comply with the interface here instead. +func (s *mockRoomserverAPI) SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) { + commonUsers := []string{} + for _, members := range s.roomIDToJoinedMembers { + for _, member := range members { + for _, userID := range otherUserIDs { + if member == userID { + commonUsers = append(commonUsers, userID) + } + } + } + } + return util.UniqueStrings(commonUsers), nil +} + type wantCatchup struct { hasNew bool changed []string @@ -178,7 +195,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { "!another:room": {syncingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -201,7 +218,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { "!another:room": {syncingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -224,7 +241,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { "!another:room": {syncingUser, existingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -246,7 +263,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { "!another:room": {syncingUser, existingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -305,7 +322,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { roomID: {syncingUser, existingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -333,7 +350,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { "!another:room": {syncingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -419,7 +436,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { }, } _, hasNew, err := DeviceListCatchup( - context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken, + context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken, ) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 5a036d889..055426030 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -27,6 +27,8 @@ import ( type Database interface { Presence + SharedUsers + MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) @@ -165,3 +167,8 @@ type Presence interface { PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) } + +type SharedUsers interface { + // SharedUsers returns a subset of otherUserIDs that share a room with userID. + SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) +} diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 8ee387b39..c4667bafd 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -107,6 +107,11 @@ const selectEventsWithEventIDsSQL = "" + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + " FROM syncapi_current_room_state WHERE event_id = ANY($1)" +const selectSharedUsersSQL = "" + + "SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" + + " SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" + + ") AND state_key = ANY($2) AND membership='join';" + type currentRoomStateStatements struct { upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt @@ -118,6 +123,7 @@ type currentRoomStateStatements struct { selectJoinedUsersInRoomStmt *sql.Stmt selectEventsWithEventIDsStmt *sql.Stmt selectStateEventStmt *sql.Stmt + selectSharedUsersStmt *sql.Stmt } func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) { @@ -156,6 +162,9 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { return nil, err } + if s.selectSharedUsersStmt, err = db.Prepare(selectSharedUsersSQL); err != nil { + return nil, err + } return s, nil } @@ -379,3 +388,24 @@ func (s *currentRoomStateStatements) SelectStateEvent( } return &ev, err } + +func (s *currentRoomStateStatements) SelectSharedUsers( + ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string, +) ([]string, error) { + stmt := sqlutil.TxStmt(txn, s.selectSharedUsersStmt) + rows, err := stmt.QueryContext(ctx, userID, otherUserIDs) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectSharedUsersStmt: rows.close() failed") + + var stateKey string + result := make([]string, 0, len(otherUserIDs)) + for rows.Next() { + if err := rows.Scan(&stateKey); err != nil { + return nil, err + } + result = append(result, stateKey) + } + return result, rows.Err() +} diff --git a/syncapi/storage/postgres/send_to_device_table.go b/syncapi/storage/postgres/send_to_device_table.go index 47c1cdaed..96d6844fd 100644 --- a/syncapi/storage/postgres/send_to_device_table.go +++ b/syncapi/storage/postgres/send_to_device_table.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/sirupsen/logrus" ) const sendToDeviceSchema = ` @@ -51,7 +52,7 @@ const selectSendToDeviceMessagesSQL = ` SELECT id, user_id, device_id, content FROM syncapi_send_to_device WHERE user_id = $1 AND device_id = $2 AND id > $3 AND id <= $4 - ORDER BY id DESC + ORDER BY id ASC ` const deleteSendToDeviceMessagesSQL = ` @@ -112,17 +113,18 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages( if err = rows.Scan(&id, &userID, &deviceID, &content); err != nil { return } - if id > lastPos { - lastPos = id - } event := types.SendToDeviceEvent{ ID: id, UserID: userID, DeviceID: deviceID, } if err = json.Unmarshal([]byte(content), &event.SendToDeviceEvent); err != nil { + logrus.WithError(err).Errorf("Failed to unmarshal send-to-device message") continue } + if id > lastPos { + lastPos = id + } events = append(events, event) } if lastPos == 0 { diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 76114aff8..d1c5e2d16 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -176,6 +176,10 @@ func (d *Database) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]t return d.Peeks.SelectPeekingDevices(ctx) } +func (d *Database) SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) { + return d.CurrentRoomState.SelectSharedUsers(ctx, nil, userID, otherUserIDs) +} + func (d *Database) GetStateEvent( ctx context.Context, roomID, evType, stateKey string, ) (*gomatrixserverlib.HeaderedEvent, error) { diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index f0a1c7bb7..376c3a3d2 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -91,6 +91,11 @@ const selectEventsWithEventIDsSQL = "" + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + " FROM syncapi_current_room_state WHERE event_id IN ($1)" +const selectSharedUsersSQL = "" + + "SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" + + " SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" + + ") AND state_key IN ($2) AND membership='join';" + type currentRoomStateStatements struct { db *sql.DB streamIDStatements *StreamIDStatements @@ -100,8 +105,9 @@ type currentRoomStateStatements struct { selectRoomIDsWithMembershipStmt *sql.Stmt selectRoomIDsWithAnyMembershipStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt - //selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic + //selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic selectStateEventStmt *sql.Stmt + //selectSharedUsersSQL *sql.Stmt - prepared at runtime due to variadic } func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (tables.CurrentRoomState, error) { @@ -396,3 +402,29 @@ func (s *currentRoomStateStatements) SelectStateEvent( } return &ev, err } + +func (s *currentRoomStateStatements) SelectSharedUsers( + ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string, +) ([]string, error) { + query := strings.Replace(selectSharedUsersSQL, "($2)", sqlutil.QueryVariadicOffset(len(otherUserIDs), 1), 1) + stmt, err := s.db.Prepare(query) + if err != nil { + return nil, fmt.Errorf("SelectSharedUsers s.db.Prepare: %w", err) + } + defer internal.CloseAndLogIfError(ctx, stmt, "SelectSharedUsers: stmt.close() failed") + rows, err := sqlutil.TxStmt(txn, stmt).QueryContext(ctx, userID, otherUserIDs) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectSharedUsersStmt: rows.close() failed") + + var stateKey string + result := make([]string, 0, len(otherUserIDs)) + for rows.Next() { + if err := rows.Scan(&stateKey); err != nil { + return nil, err + } + result = append(result, stateKey) + } + return result, rows.Err() +} diff --git a/syncapi/storage/sqlite3/send_to_device_table.go b/syncapi/storage/sqlite3/send_to_device_table.go index 0b1d5bbf2..5285acbe6 100644 --- a/syncapi/storage/sqlite3/send_to_device_table.go +++ b/syncapi/storage/sqlite3/send_to_device_table.go @@ -49,7 +49,7 @@ const selectSendToDeviceMessagesSQL = ` SELECT id, user_id, device_id, content FROM syncapi_send_to_device WHERE user_id = $1 AND device_id = $2 AND id > $3 AND id <= $4 - ORDER BY id DESC + ORDER BY id ASC ` const deleteSendToDeviceMessagesSQL = ` @@ -120,9 +120,6 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages( logrus.WithError(err).Errorf("Failed to retrieve send-to-device message") return } - if id > lastPos { - lastPos = id - } event := types.SendToDeviceEvent{ ID: id, UserID: userID, @@ -132,6 +129,9 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages( logrus.WithError(err).Errorf("Failed to unmarshal send-to-device message") continue } + if id > lastPos { + lastPos = id + } events = append(events, event) } if lastPos == 0 { diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 563c92e34..c74151700 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -1,7 +1,9 @@ package storage_test import ( + "bytes" "context" + "encoding/json" "fmt" "reflect" "testing" @@ -394,90 +396,125 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) { from = topologyTokenBefore(t, db, paginatedEvents[len(paginatedEvents)-1].EventID()) } } +*/ func TestSendToDeviceBehaviour(t *testing.T) { - //t.Parallel() - db := MustCreateDatabase(t) + t.Parallel() + alice := test.NewUser(t) + bob := test.NewUser(t) + deviceID := "one" + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + db, close := MustCreateDatabase(t, dbType) + defer close() + // At this point there should be no messages. We haven't sent anything + // yet. + _, events, err := db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100) + if err != nil { + t.Fatal(err) + } + if len(events) != 0 { + t.Fatal("first call should have no updates") + } - // At this point there should be no messages. We haven't sent anything - // yet. - _, events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{}) - if err != nil { - t.Fatal(err) - } - if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 { - t.Fatal("first call should have no updates") - } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{}) - if err != nil { - return - } + err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, 100) + if err != nil { + return + } - // Try sending a message. - streamPos, err := db.StoreNewSendForDeviceMessage(ctx, "alice", "one", gomatrixserverlib.SendToDeviceEvent{ - Sender: "bob", - Type: "m.type", - Content: json.RawMessage("{}"), + // Try sending a message. + streamPos, err := db.StoreNewSendForDeviceMessage(ctx, alice.ID, deviceID, gomatrixserverlib.SendToDeviceEvent{ + Sender: bob.ID, + Type: "m.type", + Content: json.RawMessage("{}"), + }) + if err != nil { + t.Fatal(err) + } + + // At this point we should get exactly one message. We're sending the sync position + // that we were given from the update and the send-to-device update will be updated + // in the database to reflect that this was the sync position we sent the message at. + streamPos, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, streamPos) + if err != nil { + t.Fatal(err) + } + if count := len(events); count != 1 { + t.Fatalf("second call should have one update, got %d", count) + } + err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos) + if err != nil { + return + } + + // At this point we should still have one message because we haven't progressed the + // sync position yet. This is equivalent to the client failing to /sync and retrying + // with the same position. + streamPos, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100) + if err != nil { + t.Fatal(err) + } + if len(events) != 1 { + t.Fatal("third call should have one update still") + } + err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos+1) + if err != nil { + return + } + + // At this point we should now have no updates, because we've progressed the sync + // position. Therefore the update from before will not be sent again. + _, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, streamPos+1, streamPos+2) + if err != nil { + t.Fatal(err) + } + if len(events) != 0 { + t.Fatal("fourth call should have no updates") + } + err = db.CleanSendToDeviceUpdates(context.Background(), alice.ID, deviceID, streamPos+1) + if err != nil { + return + } + + // At this point we should still have no updates, because no new updates have been + // sent. + _, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, streamPos, streamPos+2) + if err != nil { + t.Fatal(err) + } + if len(events) != 0 { + t.Fatal("fifth call should have no updates") + } + + // Send some more messages and verify the ordering is correct ("in order of arrival") + var lastPos types.StreamPosition = 0 + for i := 0; i < 10; i++ { + streamPos, err = db.StoreNewSendForDeviceMessage(ctx, alice.ID, deviceID, gomatrixserverlib.SendToDeviceEvent{ + Sender: bob.ID, + Type: "m.type", + Content: json.RawMessage(fmt.Sprintf(`{ "count": %d }`, i)), + }) + if err != nil { + t.Fatal(err) + } + lastPos = streamPos + } + + _, events, err = db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, lastPos) + if err != nil { + t.Fatalf("unable to get events: %v", err) + } + + for i := 0; i < 10; i++ { + want := json.RawMessage(fmt.Sprintf(`{"count":%d}`, i)) + got := events[i].Content + if !bytes.Equal(got, want) { + t.Fatalf("messages are out of order\nwant: %s\ngot: %s", string(want), string(got)) + } + } }) - if err != nil { - t.Fatal(err) - } - - // At this point we should get exactly one message. We're sending the sync position - // that we were given from the update and the send-to-device update will be updated - // in the database to reflect that this was the sync position we sent the message at. - _, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos}) - if err != nil { - t.Fatal(err) - } - if len(events) != 1 || len(updates) != 1 || len(deletions) != 0 { - t.Fatal("second call should have one update") - } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos}) - if err != nil { - return - } - - // At this point we should still have one message because we haven't progressed the - // sync position yet. This is equivalent to the client failing to /sync and retrying - // with the same position. - _, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos}) - if err != nil { - t.Fatal(err) - } - if len(events) != 1 || len(updates) != 0 || len(deletions) != 0 { - t.Fatal("third call should have one update still") - } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos}) - if err != nil { - return - } - - // At this point we should now have no updates, because we've progressed the sync - // position. Therefore the update from before will not be sent again. - _, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 1}) - if err != nil { - t.Fatal(err) - } - if len(events) != 0 || len(updates) != 0 || len(deletions) != 1 { - t.Fatal("fourth call should have no updates") - } - err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.StreamingToken{SendToDevicePosition: streamPos + 1}) - if err != nil { - return - } - - // At this point we should still have no updates, because no new updates have been - // sent. - _, events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.StreamingToken{SendToDevicePosition: streamPos + 2}) - if err != nil { - t.Fatal(err) - } - if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 { - t.Fatal("fifth call should have no updates") - } } +/* func TestInviteBehaviour(t *testing.T) { db := MustCreateDatabase(t) inviteRoom1 := "!inviteRoom1:somewhere" diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index ccdebfdbd..08568d9ac 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -104,6 +104,8 @@ type CurrentRoomState interface { SelectJoinedUsers(ctx context.Context) (map[string][]string, error) // SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room. SelectJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error) + // SelectSharedUsers returns a subset of otherUserIDs that share a room with userID. + SelectSharedUsers(ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string) ([]string, error) } // BackwardsExtremities keeps track of backwards extremities for a room. diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index f42099510..5448ee5bd 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -28,7 +28,7 @@ func (p *DeviceListStreamProvider) IncrementalSync( from, to types.StreamPosition, ) types.StreamPosition { var err error - to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) + to, _, err = internal.DeviceListCatchup(context.Background(), p.DB, p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) if err != nil { req.Log.WithError(err).Error("internal.DeviceListCatchup failed") return from diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 6f0849e08..b6b4779a5 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -429,7 +429,7 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use } rp.streams.PDUStreamProvider.IncrementalSync(req.Context(), syncReq, fromToken.PDUPosition, toToken.PDUPosition) _, _, err = internal.DeviceListCatchup( - req.Context(), rp.keyAPI, rp.rsAPI, syncReq.Device.UserID, + req.Context(), rp.db, rp.keyAPI, rp.rsAPI, syncReq.Device.UserID, syncReq.Response, fromToken.DeviceListPosition, toToken.DeviceListPosition, ) if err != nil { diff --git a/sytest-whitelist b/sytest-whitelist index ea25c75d0..2a145291f 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -719,4 +719,5 @@ registration is idempotent, with username specified Setting state twice is idempotent Joining room twice is idempotent Inbound federation can return missing events for shared visibility -Inbound federation ignores redactions from invalid servers room > v3 \ No newline at end of file +Inbound federation ignores redactions from invalid servers room > v3 +Newly joined room includes presence in incremental sync \ No newline at end of file