diff --git a/dendrite-sample.polylith.yaml b/dendrite-sample.polylith.yaml index 83dbba0bf..78a1b3b92 100644 --- a/dendrite-sample.polylith.yaml +++ b/dendrite-sample.polylith.yaml @@ -125,7 +125,7 @@ app_service_api: # Database configuration for this component. database: - connection_string: postgresql://username@password:hostname/dendrite_appservice?sslmode=disable + connection_string: postgresql://username:password@hostname/dendrite_appservice?sslmode=disable max_open_conns: 10 max_idle_conns: 2 conn_max_lifetime: -1 @@ -209,7 +209,7 @@ federation_api: external_api: listen: http://[::]:8072 database: - connection_string: postgresql://username@password:hostname/dendrite_federationapi?sslmode=disable + connection_string: postgresql://username:password@hostname/dendrite_federationapi?sslmode=disable max_open_conns: 10 max_idle_conns: 2 conn_max_lifetime: -1 @@ -246,7 +246,7 @@ key_server: listen: http://[::]:7779 # The listen address for incoming API requests connect: http://key_server:7779 # The connect address for other components to use database: - connection_string: postgresql://username@password:hostname/dendrite_keyserver?sslmode=disable + connection_string: postgresql://username:password@hostname/dendrite_keyserver?sslmode=disable max_open_conns: 10 max_idle_conns: 2 conn_max_lifetime: -1 @@ -259,7 +259,7 @@ media_api: external_api: listen: http://[::]:8074 database: - connection_string: postgresql://username@password:hostname/dendrite_mediaapi?sslmode=disable + connection_string: postgresql://username:password@hostname/dendrite_mediaapi?sslmode=disable max_open_conns: 5 max_idle_conns: 2 conn_max_lifetime: -1 @@ -296,7 +296,7 @@ mscs: # - msc2836 # (Threading, see https://github.com/matrix-org/matrix-doc/pull/2836) # - msc2946 # (Spaces Summary, see https://github.com/matrix-org/matrix-doc/pull/2946) database: - connection_string: postgresql://username@password:hostname/dendrite_mscs?sslmode=disable + connection_string: postgresql://username:password@hostname/dendrite_mscs?sslmode=disable max_open_conns: 5 max_idle_conns: 2 conn_max_lifetime: -1 @@ -307,7 +307,7 @@ room_server: listen: http://[::]:7770 # The listen address for incoming API requests connect: http://room_server:7770 # The connect address for other components to use database: - connection_string: postgresql://username@password:hostname/dendrite_roomserver?sslmode=disable + connection_string: postgresql://username:password@hostname/dendrite_roomserver?sslmode=disable max_open_conns: 10 max_idle_conns: 2 conn_max_lifetime: -1 @@ -320,7 +320,7 @@ sync_api: external_api: listen: http://[::]:8073 database: - connection_string: postgresql://username@password:hostname/dendrite_syncapi?sslmode=disable + connection_string: postgresql://username:password@hostname/dendrite_syncapi?sslmode=disable max_open_conns: 10 max_idle_conns: 2 conn_max_lifetime: -1 @@ -336,7 +336,7 @@ user_api: listen: http://[::]:7781 # The listen address for incoming API requests connect: http://user_api:7781 # The connect address for other components to use account_database: - connection_string: postgresql://username@password:hostname/dendrite_userapi?sslmode=disable + connection_string: postgresql://username:password@hostname/dendrite_userapi?sslmode=disable max_open_conns: 10 max_idle_conns: 2 conn_max_lifetime: -1 diff --git a/docs/FAQ.md b/docs/FAQ.md index 47f39b9e6..f8255684e 100644 --- a/docs/FAQ.md +++ b/docs/FAQ.md @@ -86,9 +86,12 @@ would be a huge help too, as that will help us to understand where the memory us You may need to revisit the connection limit of your PostgreSQL server and/or make changes to the `max_connections` lines in your Dendrite configuration. Be aware that each Dendrite component opens its own database connections and has its own connection limit, even in monolith mode! -## What is being reported when enabling anonymous stats? +## What is being reported when enabling phone-home statistics? -If anonymous stats reporting is enabled, the following data is send to the defined endpoint. +Phone-home statistics contain your server's domain name, some configuration information about +your deployment and aggregated information about active users on your deployment. They are sent +to the endpoint URL configured in your Dendrite configuration file only. The following is an +example of the data that is sent: ```json { @@ -106,7 +109,7 @@ If anonymous stats reporting is enabled, the following data is send to the defin "go_arch": "amd64", "go_os": "linux", "go_version": "go1.16.13", - "homeserver": "localhost:8800", + "homeserver": "my.domain.com", "log_level": "trace", "memory_rss": 93452, "monolith": true, diff --git a/docs/administration/1_createusers.md b/docs/administration/1_createusers.md index f40b7f576..61ec2299b 100644 --- a/docs/administration/1_createusers.md +++ b/docs/administration/1_createusers.md @@ -32,6 +32,15 @@ To create a new **admin account**, add the `-admin` flag: ./bin/create-account -config /path/to/dendrite.yaml -username USERNAME -admin ``` +An example of using `create-account` when running in **Docker**, having found the `CONTAINERNAME` from `docker ps`: + +```bash +docker exec -it CONTAINERNAME /usr/bin/create-account -config /path/to/dendrite.yaml -username USERNAME +``` +```bash +docker exec -it CONTAINERNAME /usr/bin/create-account -config /path/to/dendrite.yaml -username USERNAME -admin +``` + ## Using shared secret registration Dendrite supports the Synapse-compatible shared secret registration endpoint. diff --git a/docs/installation/2_domainname.md b/docs/installation/2_domainname.md index 0d4300eca..54064acb4 100644 --- a/docs/installation/2_domainname.md +++ b/docs/installation/2_domainname.md @@ -14,15 +14,18 @@ that take the format `@user:example.com`. For federation to work, the server name must be resolvable by other homeservers on the internet — that is, the domain must be registered and properly configured with the relevant DNS records. -Matrix servers discover each other when federating using the following methods: +Matrix servers usually discover each other when federating using the following methods: -1. If a well-known delegation exists on `example.com`, use the path server from the +1. If a well-known delegation exists on `example.com`, use the domain and port from the well-known file to connect to the remote homeserver; -2. If a DNS SRV delegation exists on `example.com`, use the hostname and port from the DNS SRV +2. If a DNS SRV delegation exists on `example.com`, use the IP address and port from the DNS SRV record to connect to the remote homeserver; 3. If neither well-known or DNS SRV delegation are configured, attempt to connect to the remote homeserver by connecting to `example.com` port TCP/8448 using HTTPS. +The exact details of how server name resolution works can be found in +[the spec](https://spec.matrix.org/v1.3/server-server-api/#resolving-server-names). + ## TLS certificates Matrix federation requires that valid TLS certificates are present on the domain. You must @@ -51,17 +54,12 @@ you will be able to delegate from `example.com` to `matrix.example.com` so that Delegation can be performed in one of two ways: -* **Well-known delegation**: A well-known text file is served over HTTPS on the domain name - that you want to use, pointing to your server on `matrix.example.com` port 8448; -* **DNS SRV delegation**: A DNS SRV record is created on the domain name that you want to - use, pointing to your server on `matrix.example.com` port TCP/8448. +* **Well-known delegation (preferred)**: A well-known text file is served over HTTPS on the domain + name that you want to use, pointing to your server on `matrix.example.com` port 8448; +* **DNS SRV delegation (not recommended)**: See the SRV delegation section below for details. -If you are using a reverse proxy to forward `/_matrix` to Dendrite, your well-known or DNS SRV -delegation must refer to the hostname and port that the reverse proxy is listening on instead. - -Well-known delegation is typically easier to set up and usually preferred. However, you can use -either or both methods to delegate. If you configure both methods of delegation, it is important -that they both agree and refer to the same hostname and port. +If you are using a reverse proxy to forward `/_matrix` to Dendrite, your well-known or delegation +must refer to the hostname and port that the reverse proxy is listening on instead. ## Well-known delegation @@ -74,20 +72,36 @@ and contain the following JSON document: ```json { - "m.server": "https://matrix.example.com:8448" + "m.server": "matrix.example.com:8448" } ``` +You can also serve `.well-known` with Dendrite itself by setting the `well_known_server_name` config +option to the value you want for `m.server`. This is primarily useful if Dendrite is exposed on +`example.com:443` and you don't want to set up a separate webserver just for serving the `.well-known` +file. + +```yaml +global: +... + well_known_server_name: "example.com:443" +``` + ## DNS SRV delegation -Using DNS SRV delegation requires creating DNS SRV records on the `example.com` zone which -refer to your Dendrite installation. +This method is not recommended, as the behavior of SRV records in Matrix is rather unintuitive: +SRV records will only change the IP address and port that other servers connect to, they won't +affect the domain name. In technical terms, the `Host` header and TLS SNI of federation requests +will still be `example.com` even if the SRV record points at `matrix.example.com`. -Assuming that your Dendrite installation is listening for HTTPS connections at `matrix.example.com` -port 8448, the DNS SRV record must have the following fields: +In practice, this means that the server must be configured with valid TLS certificates for +`example.com`, rather than `matrix.example.com` as one might intuitively expect. If there's a +reverse proxy in between, the proxy configuration must be written as if it's `example.com`, as the +proxy will never see the name `matrix.example.com` in incoming requests. -* Name: `@` (or whichever term your DNS provider uses to signal the root) -* Service: `_matrix` -* Protocol: `_tcp` -* Port: `8448` -* Target: `matrix.example.com` +This behavior also means that if `example.com` and `matrix.example.com` point at the same IP +address, there is no reason to have a SRV record pointing at `matrix.example.com`. It can still +be used to change the port number, but it won't do anything else. + +If you understand how SRV records work and still want to use them, the service name is `_matrix` and +the protocol is `_tcp`. diff --git a/go.mod b/go.mod index c9170a861..7c6345684 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46 + github.com/matrix-org/gomatrixserverlib v0.0.0-20220718085240-f08f98af7d2d github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 diff --git a/go.sum b/go.sum index d86c8cb46..d9ba933d9 100644 --- a/go.sum +++ b/go.sum @@ -476,8 +476,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46 h1:5X/kXY3nwqKOwwrE9tnMKrjbsi3PHigQYvrvDBSntO8= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220713083127-fc2ea1e62e46/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220718085240-f08f98af7d2d h1:BWInUURXVOW+OiifMapoRIS7i122KWdEKj6fnDFXgBo= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220718085240-f08f98af7d2d/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a h1:DdG8vXMlZ65EAtc4V+3t7zHZ2Gqs24pSnyXS+4BRHUs= github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= diff --git a/internal/caching/cache_eventstatekeys.go b/internal/caching/cache_eventstatekeys.go new file mode 100644 index 000000000..05580ab05 --- /dev/null +++ b/internal/caching/cache_eventstatekeys.go @@ -0,0 +1,18 @@ +package caching + +import "github.com/matrix-org/dendrite/roomserver/types" + +// EventStateKeyCache contains the subset of functions needed for +// a room event state key cache. +type EventStateKeyCache interface { + GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool) + StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string) +} + +func (c Caches) GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool) { + return c.RoomServerStateKeys.Get(eventStateKeyNID) +} + +func (c Caches) StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string) { + c.RoomServerStateKeys.Set(eventStateKeyNID, eventStateKey) +} diff --git a/internal/caching/cache_roomservernids.go b/internal/caching/cache_roomservernids.go index b409aeef2..f27154f19 100644 --- a/internal/caching/cache_roomservernids.go +++ b/internal/caching/cache_roomservernids.go @@ -9,6 +9,7 @@ type RoomServerCaches interface { RoomVersionCache RoomInfoCache RoomServerEventsCache + EventStateKeyCache } // RoomServerNIDsCache contains the subset of functions needed for @@ -19,9 +20,9 @@ type RoomServerNIDsCache interface { } func (c Caches) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) { - return c.RoomServerRoomIDs.Get(int64(roomNID)) + return c.RoomServerRoomIDs.Get(roomNID) } func (c Caches) StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) { - c.RoomServerRoomIDs.Set(int64(roomNID), roomID) + c.RoomServerRoomIDs.Set(roomNID, roomID) } diff --git a/internal/caching/caches.go b/internal/caching/caches.go index e7914ce7d..f13f743d3 100644 --- a/internal/caching/caches.go +++ b/internal/caching/caches.go @@ -23,16 +23,17 @@ import ( // different implementations as long as they satisfy the Cache // interface. type Caches struct { - RoomVersions Cache[string, gomatrixserverlib.RoomVersion] // room ID -> room version - ServerKeys Cache[string, gomatrixserverlib.PublicKeyLookupResult] // server name -> server keys - RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID - RoomServerRoomIDs Cache[int64, string] // room NID -> room ID - RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event - RoomInfos Cache[string, *types.RoomInfo] // room ID -> room info - FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU - FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU - SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response - LazyLoading Cache[lazyLoadingCacheKey, string] // composite key -> event ID + RoomVersions Cache[string, gomatrixserverlib.RoomVersion] // room ID -> room version + ServerKeys Cache[string, gomatrixserverlib.PublicKeyLookupResult] // server name -> server keys + RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID + RoomServerRoomIDs Cache[types.RoomNID, string] // room NID -> room ID + RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event + RoomServerStateKeys Cache[types.EventStateKeyNID, string] // event NID -> event state key + RoomInfos Cache[string, *types.RoomInfo] // room ID -> room info + FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU + FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU + SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response + LazyLoading Cache[lazyLoadingCacheKey, string] // composite key -> event ID } // Cache is the interface that an implementation must satisfy. @@ -44,7 +45,7 @@ type Cache[K keyable, T any] interface { type keyable interface { // from https://github.com/dgraph-io/ristretto/blob/8e850b710d6df0383c375ec6a7beae4ce48fc8d5/z/z.go#L34 - uint64 | string | []byte | byte | int | int32 | uint32 | int64 | lazyLoadingCacheKey + ~uint64 | ~string | []byte | byte | ~int | ~int32 | ~uint32 | ~int64 | lazyLoadingCacheKey } type costable interface { diff --git a/internal/caching/impl_ristretto.go b/internal/caching/impl_ristretto.go index 677218b5e..fdbbb4d72 100644 --- a/internal/caching/impl_ristretto.go +++ b/internal/caching/impl_ristretto.go @@ -40,13 +40,14 @@ const ( federationEDUsCache spaceSummaryRoomsCache lazyLoadingCache + eventStateKeyCache ) func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enablePrometheus bool) *Caches { cache, err := ristretto.NewCache(&ristretto.Config{ - NumCounters: 1e5, // 10x number of expected cache items, affects bloom filter size, gives us room for 10,000 currently - BufferItems: 64, // recommended by the ristretto godocs as a sane buffer size value - MaxCost: int64(maxCost), + NumCounters: int64((maxCost / 1024) * 10), // 10 counters per 1KB data, affects bloom filter size + BufferItems: 64, // recommended by the ristretto godocs as a sane buffer size value + MaxCost: int64(maxCost), // max cost is in bytes, as per the Dendrite config Metrics: true, KeyToHash: func(key interface{}) (uint64, uint64) { return z.KeyToHash(key) @@ -88,7 +89,7 @@ func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enableProm Prefix: roomNIDsCache, MaxAge: maxAge, }, - RoomServerRoomIDs: &RistrettoCachePartition[int64, string]{ // room NID -> room ID + RoomServerRoomIDs: &RistrettoCachePartition[types.RoomNID, string]{ // room NID -> room ID cache: cache, Prefix: roomIDsCache, MaxAge: maxAge, @@ -100,6 +101,11 @@ func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enableProm MaxAge: maxAge, }, }, + RoomServerStateKeys: &RistrettoCachePartition[types.EventStateKeyNID, string]{ // event NID -> event state key + cache: cache, + Prefix: eventStateKeyCache, + MaxAge: maxAge, + }, RoomInfos: &RistrettoCachePartition[string, *types.RoomInfo]{ // room ID -> room info cache: cache, Prefix: roomInfosCache, diff --git a/roomserver/storage/shared/membership_updater.go b/roomserver/storage/shared/membership_updater.go index ebfcef569..c79ca8995 100644 --- a/roomserver/storage/shared/membership_updater.go +++ b/roomserver/storage/shared/membership_updater.go @@ -105,8 +105,15 @@ func (u *MembershipUpdater) SetToInvite(event *gomatrixserverlib.Event) (bool, e if err != nil { return fmt.Errorf("u.d.InvitesTable.InsertInviteEvent: %w", err) } + + // Look up the NID of the invite event + nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{event.EventID()}, false) + if err != nil { + return fmt.Errorf("u.d.EventNIDs: %w", err) + } + if u.membership != tables.MembershipStateInvite { - if inserted, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateInvite, 0, false); err != nil { + if inserted, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateInvite, nIDs[event.EventID()], false); err != nil { return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err) } } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index d8d5f67c8..2f9932ff8 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -72,7 +72,24 @@ func (d *Database) eventTypeNIDs( func (d *Database) EventStateKeys( ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID, ) (map[types.EventStateKeyNID]string, error) { - return d.EventStateKeysTable.BulkSelectEventStateKey(ctx, nil, eventStateKeyNIDs) + result := make(map[types.EventStateKeyNID]string, len(eventStateKeyNIDs)) + fetch := make([]types.EventStateKeyNID, 0, len(eventStateKeyNIDs)) + for _, nid := range eventStateKeyNIDs { + if key, ok := d.Cache.GetEventStateKey(nid); ok { + result[nid] = key + } else { + fetch = append(fetch, nid) + } + } + fromDB, err := d.EventStateKeysTable.BulkSelectEventStateKey(ctx, nil, fetch) + if err != nil { + return nil, err + } + for nid, key := range fromDB { + result[nid] = key + d.Cache.StoreEventStateKey(nid, key) + } + return result, nil } func (d *Database) EventStateKeyNIDs( diff --git a/setup/config/config_global.go b/setup/config/config_global.go index ac1380a4e..10827bb21 100644 --- a/setup/config/config_global.go +++ b/setup/config/config_global.go @@ -73,7 +73,7 @@ type Global struct { // ServerNotices configuration used for sending server notices ServerNotices ServerNotices `yaml:"server_notices"` - // ReportStats configures opt-in anonymous stats reporting. + // ReportStats configures opt-in phone-home statistics reporting. ReportStats ReportStats `yaml:"report_stats"` // Configuration for the caches. @@ -189,9 +189,9 @@ func (c *Cache) Verify(errors *ConfigErrors, isMonolith bool) { checkPositive(errors, "max_size_estimated", int64(c.EstimatedMaxSize)) } -// ReportStats configures opt-in anonymous stats reporting. +// ReportStats configures opt-in phone-home statistics reporting. type ReportStats struct { - // Enabled configures anonymous usage stats of the server + // Enabled configures phone-home statistics of the server Enabled bool `yaml:"enabled"` // Endpoint the endpoint to report stats to diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index f0ca2106f..328e1e9dd 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -240,6 +240,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( msg.RemovesStateEventIDs, msg.TransactionID, false, + msg.HistoryVisibility, ) if err != nil { // panic rather than continue with an inconsistent database @@ -289,7 +290,8 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( []string{}, // adds no state []string{}, // removes no state nil, // no transaction - ev.StateKey() != nil, // exclude from sync? + ev.StateKey() != nil, // exclude from sync?, + msg.HistoryVisibility, ) if err != nil { // panic rather than continue with an inconsistent database diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index d96718d20..03df9285c 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -21,6 +21,7 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" keytypes "github.com/matrix-org/dendrite/keyserver/types" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -46,7 +47,7 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.SyncKeyAPI, userID, devi // was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST // be already filled in with join/leave information. func DeviceListCatchup( - ctx context.Context, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI, + ctx context.Context, db storage.SharedUsers, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, res *types.Response, from, to types.StreamPosition, ) (newPos types.StreamPosition, hasNew bool, err error) { @@ -93,7 +94,7 @@ func DeviceListCatchup( queryRes.UserIDs = append(queryRes.UserIDs, leaveUserIDs...) queryRes.UserIDs = util.UniqueStrings(queryRes.UserIDs) var sharedUsersMap map[string]int - sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, rsAPI, userID, queryRes.UserIDs) + sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, db, userID, queryRes.UserIDs) util.GetLogger(ctx).Debugf( "QueryKeyChanges request off=%d,to=%d response off=%d uids=%v", offset, toOffset, queryRes.Offset, queryRes.UserIDs, @@ -215,30 +216,28 @@ func TrackChangedUsers( return changed, left, nil } +// filterSharedUsers takes a list of remote users whose keys have changed and filters +// it down to include only users who the requesting user shares a room with. func filterSharedUsers( - ctx context.Context, rsAPI roomserverAPI.SyncRoomserverAPI, userID string, usersWithChangedKeys []string, + ctx context.Context, db storage.SharedUsers, userID string, usersWithChangedKeys []string, ) (map[string]int, []string) { - var result []string - var sharedUsersRes roomserverAPI.QuerySharedUsersResponse - err := rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{ - UserID: userID, - OtherUserIDs: usersWithChangedKeys, - }, &sharedUsersRes) + sharedUsersMap := make(map[string]int, len(usersWithChangedKeys)) + for _, userID := range usersWithChangedKeys { + sharedUsersMap[userID] = 0 + } + sharedUsers, err := db.SharedUsers(ctx, userID, usersWithChangedKeys) if err != nil { // default to all users so we do needless queries rather than miss some important device update return nil, usersWithChangedKeys } + for _, userID := range sharedUsers { + sharedUsersMap[userID]++ + } // We forcibly put ourselves in this list because we should be notified about our own device updates // and if we are in 0 rooms then we don't technically share any room with ourselves so we wouldn't // be notified about key changes. - sharedUsersRes.UserIDsToCount[userID] = 1 - - for _, uid := range usersWithChangedKeys { - if sharedUsersRes.UserIDsToCount[uid] > 0 { - result = append(result, uid) - } - } - return sharedUsersRes.UserIDsToCount, result + sharedUsersMap[userID] = 1 + return sharedUsersMap, sharedUsers } func joinedRooms(res *types.Response, userID string) []string { diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 219b35e2c..79ed440e7 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -11,6 +11,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" ) var ( @@ -105,6 +106,22 @@ func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.Query return nil } +// This is actually a database function, but seeing as we track the state inside the +// *mockRoomserverAPI, we'll just comply with the interface here instead. +func (s *mockRoomserverAPI) SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) { + commonUsers := []string{} + for _, members := range s.roomIDToJoinedMembers { + for _, member := range members { + for _, userID := range otherUserIDs { + if member == userID { + commonUsers = append(commonUsers, userID) + } + } + } + } + return util.UniqueStrings(commonUsers), nil +} + type wantCatchup struct { hasNew bool changed []string @@ -178,7 +195,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { "!another:room": {syncingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -201,7 +218,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { "!another:room": {syncingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -224,7 +241,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { "!another:room": {syncingUser, existingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -246,7 +263,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { "!another:room": {syncingUser, existingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -305,7 +322,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { roomID: {syncingUser, existingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -333,7 +350,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { "!another:room": {syncingUser}, }, } - _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -419,7 +436,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { }, } _, hasNew, err := DeviceListCatchup( - context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken, + context.Background(), rsAPI, &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, emptyToken, ) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 24745cd55..990ca55b1 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -594,6 +594,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][] []string{}, []string{}, nil, true, + gomatrixserverlib.HistoryVisibilityShared, ) if err != nil { return nil, err diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 5a036d889..9751670b2 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -27,6 +27,8 @@ import ( type Database interface { Presence + SharedUsers + MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) @@ -67,7 +69,9 @@ type Database interface { // when generating the sync stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []*gomatrixserverlib.HeaderedEvent, - addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error) + addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool, + historyVisibility gomatrixserverlib.HistoryVisibility, + ) (types.StreamPosition, error) // PurgeRoomState completely purges room state from the sync API. This is done when // receiving an output event that completely resets the state. PurgeRoomState(ctx context.Context, roomID string) error @@ -165,3 +169,8 @@ type Presence interface { PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) } + +type SharedUsers interface { + // SharedUsers returns a subset of otherUserIDs that share a room with userID. + SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) +} diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 8ee387b39..aae2d8c33 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -51,6 +51,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( -- The serial ID of the output_room_events table when this event became -- part of the current state of the room. added_at BIGINT, + history_visibility SMALLINT NOT NULL DEFAULT 2, -- Clobber based on 3-uple of room_id, type and state_key CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key) ); @@ -63,8 +64,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_current_room_state_eventid_idx ON sync ` const upsertRoomStateSQL = "" + - "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at)" + - " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" + + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at, history_visibility)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" + " ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" + " DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9" @@ -100,13 +101,18 @@ const selectStateEventSQL = "" + "SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3" const selectEventsWithEventIDsSQL = "" + - // TODO: The session_id and transaction_id blanks are here because otherwise - // the rowsToStreamEvents expects there to be exactly six columns. We need to + // TODO: The session_id and transaction_id blanks are here because + // the rowsToStreamEvents expects there to be exactly seven columns. We need to // figure out if these really need to be in the DB, and if so, we need a // better permanent fix for this. - neilalexander, 2 Jan 2020 - "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" + " FROM syncapi_current_room_state WHERE event_id = ANY($1)" +const selectSharedUsersSQL = "" + + "SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" + + " SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" + + ") AND state_key = ANY($2) AND membership='join';" + type currentRoomStateStatements struct { upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt @@ -118,6 +124,7 @@ type currentRoomStateStatements struct { selectJoinedUsersInRoomStmt *sql.Stmt selectEventsWithEventIDsStmt *sql.Stmt selectStateEventStmt *sql.Stmt + selectSharedUsersStmt *sql.Stmt } func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) { @@ -156,6 +163,9 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { return nil, err } + if s.selectSharedUsersStmt, err = db.Prepare(selectSharedUsersSQL); err != nil { + return nil, err + } return s, nil } @@ -327,6 +337,7 @@ func (s *currentRoomStateStatements) UpsertRoomState( headeredJSON, membership, addedAt, + event.Visibility, ) return err } @@ -379,3 +390,24 @@ func (s *currentRoomStateStatements) SelectStateEvent( } return &ev, err } + +func (s *currentRoomStateStatements) SelectSharedUsers( + ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string, +) ([]string, error) { + stmt := sqlutil.TxStmt(txn, s.selectSharedUsersStmt) + rows, err := stmt.QueryContext(ctx, userID, otherUserIDs) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectSharedUsersStmt: rows.close() failed") + + var stateKey string + result := make([]string, 0, len(otherUserIDs)) + for rows.Next() { + if err := rows.Scan(&stateKey); err != nil { + return nil, err + } + result = append(result, stateKey) + } + return result, rows.Err() +} diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go new file mode 100644 index 000000000..2b4b65287 --- /dev/null +++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go @@ -0,0 +1,50 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +func LoadAddHistoryVisibilityColumn(m *sqlutil.Migrations) { + m.AddMigration(UpAddHistoryVisibilityColumn, DownAddHistoryVisibilityColumn) +} + +func UpAddHistoryVisibilityColumn(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE syncapi_output_room_events ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2; + UPDATE syncapi_output_room_events SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted'); + ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2; + UPDATE syncapi_current_room_state SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted'); + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownAddHistoryVisibilityColumn(tx *sql.Tx) error { + _, err := tx.Exec(` + ALTER TABLE syncapi_output_room_events DROP COLUMN IF EXISTS history_visibility; + ALTER TABLE syncapi_current_room_state DROP COLUMN IF EXISTS history_visibility; + `) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index d84d0cfa2..ddef27383 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -67,7 +67,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- events retrieved through backfilling that have a position in the stream -- that relates to the moment these were retrieved rather than the moment these -- were emitted. - exclude_from_sync BOOL DEFAULT FALSE + exclude_from_sync BOOL DEFAULT FALSE, + -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined) + history_visibility SMALLINT NOT NULL DEFAULT 2 ); CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type); @@ -78,16 +80,16 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" + - ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " + + "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + "ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " + "RETURNING id" const selectEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)" const selectEventsWithFilterSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)" + " AND ( $2::text[] IS NULL OR sender = ANY($2) )" + " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + @@ -96,7 +98,7 @@ const selectEventsWithFilterSQL = "" + " LIMIT $7" const selectRecentEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + @@ -105,7 +107,7 @@ const selectRecentEventsSQL = "" + " ORDER BY id DESC LIMIT $8" const selectRecentEventsForSyncSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + @@ -114,7 +116,7 @@ const selectRecentEventsForSyncSQL = "" + " ORDER BY id DESC LIMIT $8" const selectEarlyEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + @@ -130,7 +132,7 @@ const updateEventJSONSQL = "" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + - "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " AND room_id = ANY($3)" + @@ -146,10 +148,10 @@ const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" const selectContextEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" + "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" const selectContextBeforeEventSQL = "" + - "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" + + "SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + @@ -157,7 +159,7 @@ const selectContextBeforeEventSQL = "" + " ORDER BY id DESC LIMIT $3" const selectContextAfterEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" + + "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + @@ -246,14 +248,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange( for rows.Next() { var ( - eventID string - streamPos types.StreamPosition - eventBytes []byte - excludeFromSync bool - addIDs pq.StringArray - delIDs pq.StringArray + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + addIDs pq.StringArray + delIDs pq.StringArray + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs, &historyVisibility); err != nil { return nil, nil, err } // Sanity check for deleted state and whine if we see it. We don't need to do anything @@ -283,6 +286,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( needSet[id] = true } stateNeeded[ev.RoomID()] = needSet + ev.Visibility = historyVisibility eventIDToEvent[eventID] = types.StreamEvent{ HeaderedEvent: &ev, @@ -314,7 +318,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID( func (s *outputRoomEventsStatements) InsertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, - transactionID *api.TransactionID, excludeFromSync bool, + transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility, ) (streamPos types.StreamPosition, err error) { var txnID *string var sessionID *int64 @@ -351,6 +355,7 @@ func (s *outputRoomEventsStatements) InsertEvent( sessionID, txnID, excludeFromSync, + historyVisibility, ).Scan(&streamPos) return } @@ -504,13 +509,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID) var eventAsString string - if err = row.Scan(&id, &eventAsString); err != nil { + var historyVisibility gomatrixserverlib.HistoryVisibility + if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil { return 0, evt, err } if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil { return 0, evt, err } + evt.Visibility = historyVisibility return id, evt, nil } @@ -532,15 +539,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( for rows.Next() { var ( - eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + eventBytes []byte + evt *gomatrixserverlib.HeaderedEvent + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err = rows.Scan(&eventBytes); err != nil { + if err = rows.Scan(&eventBytes, &historyVisibility); err != nil { return evts, err } if err = json.Unmarshal(eventBytes, &evt); err != nil { return evts, err } + evt.Visibility = historyVisibility evts = append(evts, evt) } @@ -565,15 +574,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( for rows.Next() { var ( - eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + eventBytes []byte + evt *gomatrixserverlib.HeaderedEvent + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err = rows.Scan(&lastID, &eventBytes); err != nil { + if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil { return 0, evts, err } if err = json.Unmarshal(eventBytes, &evt); err != nil { return 0, evts, err } + evt.Visibility = historyVisibility evts = append(evts, evt) } @@ -584,15 +595,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { var ( - eventID string - streamPos types.StreamPosition - eventBytes []byte - excludeFromSync bool - sessionID *int64 - txnID *string - transactionID *api.TransactionID + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + sessionID *int64 + txnID *string + transactionID *api.TransactionID + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil { return nil, err } // TODO: Handle redacted events @@ -607,7 +619,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { TransactionID: *txnID, } } - + ev.Visibility = historyVisibility result = append(result, types.StreamEvent{ HeaderedEvent: &ev, StreamPosition: streamPos, diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 9cfe7c070..c6121e419 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -42,18 +42,16 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil { return nil, err } + if _, err = d.db.Exec(outputRoomEventsSchema); err != nil { + return nil, err + } + if _, err = d.db.Exec(currentRoomStateSchema); err != nil { + return nil, err + } accountData, err := NewPostgresAccountDataTable(d.db) if err != nil { return nil, err } - events, err := NewPostgresEventsTable(d.db) - if err != nil { - return nil, err - } - currState, err := NewPostgresCurrentRoomStateTable(d.db) - if err != nil { - return nil, err - } invites, err := NewPostgresInvitesTable(d.db) if err != nil { return nil, err @@ -101,9 +99,19 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) m := sqlutil.NewMigrations() deltas.LoadFixSequences(m) deltas.LoadRemoveSendToDeviceSentColumn(m) + deltas.LoadAddHistoryVisibilityColumn(m) if err = m.RunDeltas(d.db, dbProperties); err != nil { return nil, err } + // prepare statements after the migrations have run + events, err := NewPostgresEventsTable(d.db) + if err != nil { + return nil, err + } + currState, err := NewPostgresCurrentRoomStateTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer, diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 76114aff8..932bda164 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -176,6 +176,10 @@ func (d *Database) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]t return d.Peeks.SelectPeekingDevices(ctx) } +func (d *Database) SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) { + return d.CurrentRoomState.SelectSharedUsers(ctx, nil, userID, otherUserIDs) +} + func (d *Database) GetStateEvent( ctx context.Context, roomID, evType, stateKey string, ) (*gomatrixserverlib.HeaderedEvent, error) { @@ -364,11 +368,12 @@ func (d *Database) WriteEvent( addStateEvents []*gomatrixserverlib.HeaderedEvent, addStateEventIDs, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool, + historyVisibility gomatrixserverlib.HistoryVisibility, ) (pduPosition types.StreamPosition, returnErr error) { returnErr = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { var err error pos, err := d.OutputEvents.InsertEvent( - ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, + ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, historyVisibility, ) if err != nil { return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err) @@ -387,7 +392,9 @@ func (d *Database) WriteEvent( // Nothing to do, the event may have just been a message event. return nil } - + for i := range addStateEvents { + addStateEvents[i].Visibility = historyVisibility + } return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition, topoPosition) }) diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index f0a1c7bb7..501d0ee98 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -41,6 +41,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( headered_event_json TEXT NOT NULL, membership TEXT, added_at BIGINT, + history_visibility SMALLINT NOT NULL DEFAULT 2, -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined) UNIQUE (room_id, type, state_key) ); -- for event deletion @@ -52,8 +53,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_current_room_state_eventid_idx ON sync ` const upsertRoomStateSQL = "" + - "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at)" + - " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" + + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at, history_visibility)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" + " ON CONFLICT (room_id, type, state_key)" + " DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9" @@ -84,13 +85,18 @@ const selectStateEventSQL = "" + "SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3" const selectEventsWithEventIDsSQL = "" + - // TODO: The session_id and transaction_id blanks are here because otherwise - // the rowsToStreamEvents expects there to be exactly six columns. We need to + // TODO: The session_id and transaction_id blanks are here because + // the rowsToStreamEvents expects there to be exactly seven columns. We need to // figure out if these really need to be in the DB, and if so, we need a // better permanent fix for this. - neilalexander, 2 Jan 2020 - "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" + " FROM syncapi_current_room_state WHERE event_id IN ($1)" +const selectSharedUsersSQL = "" + + "SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" + + " SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" + + ") AND state_key IN ($2) AND membership='join';" + type currentRoomStateStatements struct { db *sql.DB streamIDStatements *StreamIDStatements @@ -100,8 +106,9 @@ type currentRoomStateStatements struct { selectRoomIDsWithMembershipStmt *sql.Stmt selectRoomIDsWithAnyMembershipStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt - //selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic + //selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic selectStateEventStmt *sql.Stmt + //selectSharedUsersSQL *sql.Stmt - prepared at runtime due to variadic } func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (tables.CurrentRoomState, error) { @@ -322,6 +329,7 @@ func (s *currentRoomStateStatements) UpsertRoomState( headeredJSON, membership, addedAt, + event.Visibility, ) return err } @@ -396,3 +404,29 @@ func (s *currentRoomStateStatements) SelectStateEvent( } return &ev, err } + +func (s *currentRoomStateStatements) SelectSharedUsers( + ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string, +) ([]string, error) { + query := strings.Replace(selectSharedUsersSQL, "($2)", sqlutil.QueryVariadicOffset(len(otherUserIDs), 1), 1) + stmt, err := s.db.Prepare(query) + if err != nil { + return nil, fmt.Errorf("SelectSharedUsers s.db.Prepare: %w", err) + } + defer internal.CloseAndLogIfError(ctx, stmt, "SelectSharedUsers: stmt.close() failed") + rows, err := sqlutil.TxStmt(txn, stmt).QueryContext(ctx, userID, otherUserIDs) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectSharedUsersStmt: rows.close() failed") + + var stateKey string + result := make([]string, 0, len(otherUserIDs)) + for rows.Next() { + if err := rows.Scan(&stateKey); err != nil { + return nil, err + } + result = append(result, stateKey) + } + return result, rows.Err() +} diff --git a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go new file mode 100644 index 000000000..f6bcaddfb --- /dev/null +++ b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go @@ -0,0 +1,82 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +func LoadAddHistoryVisibilityColumn(m *sqlutil.Migrations) { + m.AddMigration(UpAddHistoryVisibilityColumn, DownAddHistoryVisibilityColumn) +} + +func UpAddHistoryVisibilityColumn(tx *sql.Tx) error { + // SQLite doesn't have "if exists", so check if the column exists. If the query doesn't return an error, it already exists. + // Required for unit tests, as otherwise a duplicate column error will show up. + _, err := tx.Query("SELECT history_visibility FROM syncapi_output_room_events LIMIT 1") + if err == nil { + return nil + } + _, err = tx.Exec(` + ALTER TABLE syncapi_output_room_events ADD COLUMN history_visibility SMALLINT NOT NULL DEFAULT 2; + UPDATE syncapi_output_room_events SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted'); + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + + _, err = tx.Query("SELECT history_visibility FROM syncapi_current_room_state LIMIT 1") + if err == nil { + return nil + } + _, err = tx.Exec(` + ALTER TABLE syncapi_current_room_state ADD COLUMN history_visibility SMALLINT NOT NULL DEFAULT 2; + UPDATE syncapi_current_room_state SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted'); + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownAddHistoryVisibilityColumn(tx *sql.Tx) error { + // SQLite doesn't have "if exists", so check if the column exists. + _, err := tx.Query("SELECT history_visibility FROM syncapi_output_room_events LIMIT 1") + if err != nil { + // The column probably doesn't exist + return nil + } + _, err = tx.Exec(` + ALTER TABLE syncapi_output_room_events DROP COLUMN history_visibility; + `) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + _, err = tx.Query("SELECT history_visibility FROM syncapi_current_room_state LIMIT 1") + if err != nil { + // The column probably doesn't exist + return nil + } + _, err = tx.Exec(` + ALTER TABLE syncapi_current_room_state DROP COLUMN history_visibility; + `) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index f9961a9d1..b3dcb44cb 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -47,7 +47,8 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( remove_state_ids TEXT, -- JSON encoded string array session_id BIGINT, transaction_id TEXT, - exclude_from_sync BOOL NOT NULL DEFAULT FALSE + exclude_from_sync BOOL NOT NULL DEFAULT FALSE, + history_visibility SMALLINT NOT NULL DEFAULT 2 -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined) ); CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type); @@ -58,27 +59,27 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - "id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" + - ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + - "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $13)" + "id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) " + + "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $14)" const selectEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id IN ($1)" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id IN ($1)" const selectRecentEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters const selectRecentEventsForSyncSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters const selectEarlyEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters @@ -90,7 +91,7 @@ const updateEventJSONSQL = "" + "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2" const selectStateInRangeSQL = "" + - "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2)" + " AND room_id IN ($3)" + @@ -102,15 +103,15 @@ const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" const selectContextEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" + "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" const selectContextBeforeEventSQL = "" + - "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" + "SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters const selectContextAfterEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" + "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters @@ -196,14 +197,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange( for rows.Next() { var ( - eventID string - streamPos types.StreamPosition - eventBytes []byte - excludeFromSync bool - addIDsJSON string - delIDsJSON string + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + addIDsJSON string + delIDsJSON string + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON, &historyVisibility); err != nil { return nil, nil, err } @@ -239,6 +241,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( needSet[id] = true } stateNeeded[ev.RoomID()] = needSet + ev.Visibility = historyVisibility eventIDToEvent[eventID] = types.StreamEvent{ HeaderedEvent: &ev, @@ -270,7 +273,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID( func (s *outputRoomEventsStatements) InsertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, - transactionID *api.TransactionID, excludeFromSync bool, + transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility, ) (types.StreamPosition, error) { var txnID *string var sessionID *int64 @@ -326,6 +329,7 @@ func (s *outputRoomEventsStatements) InsertEvent( sessionID, txnID, excludeFromSync, + historyVisibility, excludeFromSync, ) return streamPos, err @@ -481,15 +485,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { var ( - eventID string - streamPos types.StreamPosition - eventBytes []byte - excludeFromSync bool - sessionID *int64 - txnID *string - transactionID *api.TransactionID + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + sessionID *int64 + txnID *string + transactionID *api.TransactionID + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil { return nil, err } // TODO: Handle redacted events @@ -505,6 +510,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { } } + ev.Visibility = historyVisibility + result = append(result, types.StreamEvent{ HeaderedEvent: &ev, StreamPosition: streamPos, @@ -519,13 +526,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent( ) (id int, evt gomatrixserverlib.HeaderedEvent, err error) { row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID) var eventAsString string - if err = row.Scan(&id, &eventAsString); err != nil { + var historyVisibility gomatrixserverlib.HistoryVisibility + if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil { return 0, evt, err } if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil { return 0, evt, err } + evt.Visibility = historyVisibility return id, evt, nil } @@ -550,15 +559,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( for rows.Next() { var ( - eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + eventBytes []byte + evt *gomatrixserverlib.HeaderedEvent + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err = rows.Scan(&eventBytes); err != nil { + if err = rows.Scan(&eventBytes, &historyVisibility); err != nil { return evts, err } if err = json.Unmarshal(eventBytes, &evt); err != nil { return evts, err } + evt.Visibility = historyVisibility evts = append(evts, evt) } @@ -586,15 +597,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( for rows.Next() { var ( - eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + eventBytes []byte + evt *gomatrixserverlib.HeaderedEvent + historyVisibility gomatrixserverlib.HistoryVisibility ) - if err = rows.Scan(&lastID, &eventBytes); err != nil { + if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil { return 0, evts, err } if err = json.Unmarshal(eventBytes, &evt); err != nil { return 0, evts, err } + evt.Visibility = historyVisibility evts = append(evts, evt) } return lastID, evts, rows.Err() diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index e08a0ba82..39ceec81d 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -52,18 +52,16 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er if err = d.streamID.Prepare(d.db); err != nil { return err } + if _, err = d.db.Exec(outputRoomEventsSchema); err != nil { + return err + } + if _, err = d.db.Exec(currentRoomStateSchema); err != nil { + return err + } accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID) if err != nil { return err } - events, err := NewSqliteEventsTable(d.db, &d.streamID) - if err != nil { - return err - } - roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID) - if err != nil { - return err - } invites, err := NewSqliteInvitesTable(d.db, &d.streamID) if err != nil { return err @@ -111,9 +109,19 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er m := sqlutil.NewMigrations() deltas.LoadFixSequences(m) deltas.LoadRemoveSendToDeviceSentColumn(m) + deltas.LoadAddHistoryVisibilityColumn(m) if err = m.RunDeltas(d.db, dbProperties); err != nil { return err } + // prepare statements after the migrations have run + events, err := NewSqliteEventsTable(d.db, &d.streamID) + if err != nil { + return err + } + roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID) + if err != nil { + return err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer, diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index c74151700..df03a33c2 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -37,7 +37,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserver addStateEvents = append(addStateEvents, ev) addStateEventIDs = append(addStateEventIDs, ev.EventID()) } - pos, err := db.WriteEvent(ctx, ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false) + pos, err := db.WriteEvent(ctx, ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false, gomatrixserverlib.HistoryVisibilityShared) if err != nil { t.Fatalf("WriteEvent failed: %s", err) } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index ccdebfdbd..d8fc8f508 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -52,7 +52,14 @@ type Peeks interface { type Events interface { SelectStateInRange(ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter, roomIDs []string) (map[string]map[string]bool, map[string]types.StreamEvent, error) SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error) - InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error) + InsertEvent( + ctx context.Context, txn *sql.Tx, + event *gomatrixserverlib.HeaderedEvent, + addState, removeState []string, + transactionID *api.TransactionID, + excludeFromSync bool, + historyVisibility gomatrixserverlib.HistoryVisibility, + ) (streamPos types.StreamPosition, err error) // SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync. // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`. @@ -104,6 +111,8 @@ type CurrentRoomState interface { SelectJoinedUsers(ctx context.Context) (map[string][]string, error) // SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room. SelectJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error) + // SelectSharedUsers returns a subset of otherUserIDs that share a room with userID. + SelectSharedUsers(ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string) ([]string, error) } // BackwardsExtremities keeps track of backwards extremities for a room. diff --git a/syncapi/storage/tables/output_room_events_test.go b/syncapi/storage/tables/output_room_events_test.go index 69bbd04c9..bdb17ae20 100644 --- a/syncapi/storage/tables/output_room_events_test.go +++ b/syncapi/storage/tables/output_room_events_test.go @@ -53,7 +53,7 @@ func TestOutputRoomEventsTable(t *testing.T) { events := room.Events() err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error { for _, ev := range events { - _, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false) + _, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared) if err != nil { return fmt.Errorf("failed to InsertEvent: %s", err) } @@ -79,7 +79,7 @@ func TestOutputRoomEventsTable(t *testing.T) { "body": "test.txt", "url": "mxc://test.txt", }) - if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false); err != nil { + if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared); err != nil { return fmt.Errorf("failed to InsertEvent: %s", err) } wantEventID := []string{urlEv.EventID()} diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index f42099510..5448ee5bd 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -28,7 +28,7 @@ func (p *DeviceListStreamProvider) IncrementalSync( from, to types.StreamPosition, ) types.StreamPosition { var err error - to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) + to, _, err = internal.DeviceListCatchup(context.Background(), p.DB, p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) if err != nil { req.Log.WithError(err).Error("internal.DeviceListCatchup failed") return from diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 6f0849e08..b6b4779a5 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -429,7 +429,7 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use } rp.streams.PDUStreamProvider.IncrementalSync(req.Context(), syncReq, fromToken.PDUPosition, toToken.PDUPosition) _, _, err = internal.DeviceListCatchup( - req.Context(), rp.keyAPI, rp.rsAPI, syncReq.Device.UserID, + req.Context(), rp.db, rp.keyAPI, rp.rsAPI, syncReq.Device.UserID, syncReq.Response, fromToken.DeviceListPosition, toToken.DeviceListPosition, ) if err != nil { diff --git a/userapi/util/phonehomestats.go b/userapi/util/phonehomestats.go index ad93a50e3..b17f62060 100644 --- a/userapi/util/phonehomestats.go +++ b/userapi/util/phonehomestats.go @@ -139,7 +139,7 @@ func (p *phoneHomeStats) collect() { output := bytes.Buffer{} if err = json.NewEncoder(&output).Encode(p.stats); err != nil { - logrus.WithError(err).Error("unable to encode anonymous stats") + logrus.WithError(err).Error("Unable to encode phone-home statistics") return } @@ -147,14 +147,14 @@ func (p *phoneHomeStats) collect() { request, err := http.NewRequestWithContext(ctx, http.MethodPost, p.cfg.Global.ReportStats.Endpoint, &output) if err != nil { - logrus.WithError(err).Error("unable to create anonymous stats request") + logrus.WithError(err).Error("Unable to create phone-home statistics request") return } request.Header.Set("User-Agent", "Dendrite/"+internal.VersionString()) _, err = p.client.Do(request) if err != nil { - logrus.WithError(err).Error("unable to send anonymous stats") + logrus.WithError(err).Error("Unable to send phone-home statistics") return } }