mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-08 14:43:09 -06:00
Merge branch 'brianathere/test_race' into brianathere/test_race_keyserver
This commit is contained in:
commit
761bcb3fa0
30
CHANGES.md
30
CHANGES.md
|
|
@ -1,5 +1,35 @@
|
|||
# Changelog
|
||||
|
||||
## Dendrite 0.9.0 (2022-08-01)
|
||||
|
||||
### Features
|
||||
|
||||
* Dendrite now uses Ristretto for managing in-memory caches
|
||||
* Should improve cache utilisation considerably over time by more intelligently selecting and managing cache entries compared to the previous LRU-based cache
|
||||
* Defaults to a 1GB cache size if not configured otherwise
|
||||
* The estimated cache size in memory and maximum age can now be configured with new [configuration options](https://github.com/matrix-org/dendrite/blob/e94ef84aaba30e12baf7f524c4e7a36d2fdeb189/dendrite-sample.monolith.yaml#L44-L61) to prevent unbounded cache growth
|
||||
* Added support for serving the `/.well-known/matrix/client` hint directly from Dendrite
|
||||
* Configurable with the new [configuration option](https://github.com/matrix-org/dendrite/blob/e94ef84aaba30e12baf7f524c4e7a36d2fdeb189/dendrite-sample.monolith.yaml#L67-L69)
|
||||
* Refactored membership updater, which should eliminate some bugs caused by the membership table getting out of sync with the room state
|
||||
* The User API is now responsible for sending account data updates to other components, which may fix some races and duplicate account data events
|
||||
* Optimised database query for checking whether a remote server is allowed to request an event over federation without using anywhere near as much CPU time (PostgreSQL only)
|
||||
* Database migrations have been refactored to eliminate some problems that were present with `goose` and upgrading from older Dendrite versions
|
||||
* Media fetching will now use the `/v3` endpoints for downloading media from remote homeservers
|
||||
* HTTP 404 and HTTP 405 errors from the client-facing APIs should now be returned with CORS headers so that web-based clients do not produce incorrect access control warnings for unknown endpoints
|
||||
* Some preparation work for full history visibility support
|
||||
|
||||
### Fixes
|
||||
|
||||
* Fixes a crash that could occur during event redaction
|
||||
* The `/members` endpoint will no longer incorrectly return HTTP 500 as a result of some invite events
|
||||
* Send-to-device messages should now be ordered more reliably and the last position in the stream updated correctly
|
||||
* Parsing of appservice configuration files is now less strict (contributed by [Kab1r](https://github.com/Kab1r))
|
||||
* The sync API should now identify shared users correctly when waking up for E2EE key changes
|
||||
* The federation `/state` endpoint will now return a HTTP 403 when the state before an event isn't known instead of a HTTP 500
|
||||
* Presence timestamps should now be calculated with the correct precision
|
||||
* A race condition in the roomserver's room info has been fixed
|
||||
* A race condition in the sync API has been fixed
|
||||
|
||||
## Dendrite 0.8.9 (2022-07-01)
|
||||
|
||||
### Features
|
||||
|
|
|
|||
|
|
@ -113,6 +113,11 @@ global:
|
|||
addresses:
|
||||
# - localhost:4222
|
||||
|
||||
# Disable the validation of TLS certificates of NATS. This is
|
||||
# not recommended in production since it may allow NATS traffic
|
||||
# to be sent to an insecure endpoint.
|
||||
disable_tls_validation: false
|
||||
|
||||
# Persistent directory to store JetStream streams in. This directory should be
|
||||
# preserved across Dendrite restarts.
|
||||
storage_path: ./
|
||||
|
|
|
|||
|
|
@ -103,6 +103,11 @@ global:
|
|||
addresses:
|
||||
- hostname:4222
|
||||
|
||||
# Disable the validation of TLS certificates of NATS. This is
|
||||
# not recommended in production since it may allow NATS traffic
|
||||
# to be sent to an insecure endpoint.
|
||||
disable_tls_validation: false
|
||||
|
||||
# The prefix to use for stream names for this homeserver - really only useful
|
||||
# if you are running more than one Dendrite server on the same NATS deployment.
|
||||
topic_prefix: Dendrite
|
||||
|
|
|
|||
|
|
@ -202,6 +202,14 @@ func SendJoin(
|
|||
}
|
||||
}
|
||||
|
||||
// Check that the event is from the server sending the request.
|
||||
if event.Origin() != request.Origin() {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusForbidden,
|
||||
JSON: jsonerror.Forbidden("The join must be sent by the server it originated on"),
|
||||
}
|
||||
}
|
||||
|
||||
// Check that a state key is provided.
|
||||
if event.StateKey() == nil || event.StateKeyEquals("") {
|
||||
return util.JSONResponse{
|
||||
|
|
@ -216,6 +224,22 @@ func SendJoin(
|
|||
}
|
||||
}
|
||||
|
||||
// Check that the sender belongs to the server that is sending us
|
||||
// the request. By this point we've already asserted that the sender
|
||||
// and the state key are equal so we don't need to check both.
|
||||
var domain gomatrixserverlib.ServerName
|
||||
if _, domain, err = gomatrixserverlib.SplitID('@', event.Sender()); err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusForbidden,
|
||||
JSON: jsonerror.Forbidden("The sender of the join is invalid"),
|
||||
}
|
||||
} else if domain != request.Origin() {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusForbidden,
|
||||
JSON: jsonerror.Forbidden("The sender of the join must belong to the origin server"),
|
||||
}
|
||||
}
|
||||
|
||||
// Check that the room ID is correct.
|
||||
if event.RoomID() != roomID {
|
||||
return util.JSONResponse{
|
||||
|
|
@ -242,14 +266,6 @@ func SendJoin(
|
|||
}
|
||||
}
|
||||
|
||||
// Check that the event is from the server sending the request.
|
||||
if event.Origin() != request.Origin() {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusForbidden,
|
||||
JSON: jsonerror.Forbidden("The join must be sent by the server it originated on"),
|
||||
}
|
||||
}
|
||||
|
||||
// Check that this is in fact a join event
|
||||
membership, err := event.Membership()
|
||||
if err != nil {
|
||||
|
|
|
|||
14
go.mod
14
go.mod
|
|
@ -1,9 +1,5 @@
|
|||
module github.com/matrix-org/dendrite
|
||||
|
||||
replace github.com/nats-io/nats-server/v2 => github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f
|
||||
|
||||
replace github.com/nats-io/nats.go => github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673
|
||||
|
||||
require (
|
||||
github.com/Arceliar/ironwood v0.0.0-20220306165321-319147a02d98
|
||||
github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979
|
||||
|
|
@ -29,8 +25,8 @@ require (
|
|||
github.com/matrix-org/pinecone v0.0.0-20220708135211-1ce778fcde6a
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||
github.com/mattn/go-sqlite3 v1.14.13
|
||||
github.com/nats-io/nats-server/v2 v2.7.4-0.20220309205833-773636c1c5bb
|
||||
github.com/nats-io/nats.go v1.14.0
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220731184415-903a06a5b4ee
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220731182438-87bbea85922b
|
||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
||||
github.com/ngrok/sqlmw v0.0.0-20220520173518-97c9c04efc79
|
||||
|
|
@ -76,7 +72,7 @@ require (
|
|||
github.com/h2non/filetype v1.1.3 // indirect
|
||||
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 // indirect
|
||||
github.com/juju/testing v0.0.0-20220203020004-a0ff61f03494 // indirect
|
||||
github.com/klauspost/compress v1.14.4 // indirect
|
||||
github.com/klauspost/compress v1.15.9 // indirect
|
||||
github.com/lucas-clemente/quic-go v0.26.0 // indirect
|
||||
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
|
||||
github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect
|
||||
|
|
@ -86,7 +82,7 @@ require (
|
|||
github.com/minio/highwayhash v1.0.2 // indirect
|
||||
github.com/moby/term v0.0.0-20210610120745-9d4ed1856297 // indirect
|
||||
github.com/morikuni/aec v1.0.0 // indirect
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
|
||||
github.com/nats-io/jwt/v2 v2.3.0 // indirect
|
||||
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/nxadm/tail v1.4.8 // indirect
|
||||
|
|
@ -104,7 +100,7 @@ require (
|
|||
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
|
||||
golang.org/x/text v0.3.8-0.20211004125949-5bd84dd9b33b // indirect
|
||||
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
|
||||
golang.org/x/tools v0.1.10 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
|
|
|
|||
20
go.sum
20
go.sum
|
|
@ -302,8 +302,8 @@ github.com/kardianos/minwinsvc v1.0.0/go.mod h1:Bgd0oc+D0Qo3bBytmNtyRKVlp85dAloL
|
|||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
|
||||
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
|
||||
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
|
|
@ -381,8 +381,12 @@ github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7P
|
|||
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
||||
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
|
||||
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220731184415-903a06a5b4ee h1:vAtoZ+LW6eIUjkCWWwO1DZ6o16UGrVOG+ot/AkwejO8=
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220731184415-903a06a5b4ee/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220731182438-87bbea85922b h1:CE9wSYLvwq8aC/0+6zH8lhhtZYvJ9p8PzwvZeYgdBc0=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220731182438-87bbea85922b/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
|
|
@ -390,10 +394,6 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS
|
|||
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
|
||||
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
|
||||
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
|
||||
github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f h1:Fc+TjdV1mOy0oISSzfoxNWdTqjg7tN/Vdgf+B2cwvdo=
|
||||
github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4=
|
||||
github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673 h1:TcKfa3Tf0qwUotv63PQVu2d1bBoLi2iEA4RHVMGDh5M=
|
||||
github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 h1:lrVQzBtkeQEGGYUHwSX1XPe1E5GL6U3KYCNe2G4bncQ=
|
||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9/go.mod h1:NPHGhPc0/wudcaCqL/H5AOddkRf8GPRhzOujuUKGQu8=
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
|
||||
|
|
@ -764,8 +764,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
|
|||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M=
|
||||
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306 h1:+gHMid33q6pen7kv9xvT+JRinntgeXO2AeZVd0AWD3w=
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
|
|
|||
|
|
@ -48,9 +48,11 @@ type Migration struct {
|
|||
Down func(ctx context.Context, txn *sql.Tx) error
|
||||
}
|
||||
|
||||
// Gaurs the internal state of Goose from being modified by concurrent tests or goroutines
|
||||
var gooseMutex sync.Mutex
|
||||
|
||||
// Migrator
|
||||
type Migrator struct {
|
||||
gooseMutex sync.Mutex
|
||||
db *sql.DB
|
||||
migrations []Migration
|
||||
knownMigrations map[string]struct{}
|
||||
|
|
@ -82,8 +84,8 @@ func (m *Migrator) AddMigrations(migrations ...Migration) {
|
|||
|
||||
// Up executes all migrations in order they were added.
|
||||
func (m *Migrator) Up(ctx context.Context) error {
|
||||
m.gooseMutex.Lock()
|
||||
defer m.gooseMutex.Unlock()
|
||||
gooseMutex.Lock()
|
||||
defer gooseMutex.Unlock()
|
||||
var (
|
||||
err error
|
||||
dendriteVersion = internal.VersionString()
|
||||
|
|
|
|||
|
|
@ -16,8 +16,8 @@ var build string
|
|||
|
||||
const (
|
||||
VersionMajor = 0
|
||||
VersionMinor = 8
|
||||
VersionPatch = 9
|
||||
VersionMinor = 9
|
||||
VersionPatch = 0
|
||||
VersionTag = "" // example: "rc1"
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -50,14 +50,14 @@ func CheckForSoftFail(
|
|||
if err != nil {
|
||||
return false, fmt.Errorf("db.RoomNID: %w", err)
|
||||
}
|
||||
if roomInfo == nil || roomInfo.IsStub {
|
||||
if roomInfo == nil || roomInfo.IsStub() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Then get the state entries for the current state snapshot.
|
||||
// We'll use this to check if the event is allowed right now.
|
||||
roomState := state.NewStateResolution(db, roomInfo)
|
||||
authStateEntries, err = roomState.LoadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID)
|
||||
authStateEntries, err = roomState.LoadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID())
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("roomState.LoadStateAtSnapshot: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -236,13 +236,34 @@ func LoadStateEvents(
|
|||
func CheckServerAllowedToSeeEvent(
|
||||
ctx context.Context, db storage.Database, info *types.RoomInfo, eventID string, serverName gomatrixserverlib.ServerName, isServerInRoom bool,
|
||||
) (bool, error) {
|
||||
stateAtEvent, err := db.GetHistoryVisibilityState(ctx, info, eventID, string(serverName))
|
||||
switch err {
|
||||
case nil:
|
||||
// No error, so continue normally
|
||||
case tables.OptimisationNotSupportedError:
|
||||
// The database engine didn't support this optimisation, so fall back to using
|
||||
// the old and slow method
|
||||
stateAtEvent, err = slowGetHistoryVisibilityState(ctx, db, info, eventID, serverName)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
default:
|
||||
// Something else went wrong
|
||||
return false, err
|
||||
}
|
||||
return auth.IsServerAllowed(serverName, isServerInRoom, stateAtEvent), nil
|
||||
}
|
||||
|
||||
func slowGetHistoryVisibilityState(
|
||||
ctx context.Context, db storage.Database, info *types.RoomInfo, eventID string, serverName gomatrixserverlib.ServerName,
|
||||
) ([]*gomatrixserverlib.Event, error) {
|
||||
roomState := state.NewStateResolution(db, info)
|
||||
stateEntries, err := roomState.LoadStateAtEvent(ctx, eventID)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return false, nil
|
||||
return nil, nil
|
||||
}
|
||||
return false, fmt.Errorf("roomState.LoadStateAtEvent: %w", err)
|
||||
return nil, fmt.Errorf("roomState.LoadStateAtEvent: %w", err)
|
||||
}
|
||||
|
||||
// Extract all of the event state key NIDs from the room state.
|
||||
|
|
@ -254,7 +275,7 @@ func CheckServerAllowedToSeeEvent(
|
|||
// Then request those state key NIDs from the database.
|
||||
stateKeys, err := db.EventStateKeys(ctx, stateKeyNIDs)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("db.EventStateKeys: %w", err)
|
||||
return nil, fmt.Errorf("db.EventStateKeys: %w", err)
|
||||
}
|
||||
|
||||
// If the event state key doesn't match the given servername
|
||||
|
|
@ -277,15 +298,10 @@ func CheckServerAllowedToSeeEvent(
|
|||
}
|
||||
|
||||
if len(filteredEntries) == 0 {
|
||||
return false, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
stateAtEvent, err := LoadStateEvents(ctx, db, filteredEntries)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return auth.IsServerAllowed(serverName, isServerInRoom, stateAtEvent), nil
|
||||
return LoadStateEvents(ctx, db, filteredEntries)
|
||||
}
|
||||
|
||||
// TODO: Remove this when we have tests to assert correctness of this function
|
||||
|
|
@ -393,7 +409,7 @@ func QueryLatestEventsAndState(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if roomInfo == nil || roomInfo.IsStub {
|
||||
if roomInfo == nil || roomInfo.IsStub() {
|
||||
response.RoomExists = false
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -375,11 +375,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room
|
|||
defer span.Finish()
|
||||
|
||||
var res parsedRespState
|
||||
roomInfo, err := t.db.RoomInfo(ctx, roomID)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
roomState := state.NewStateResolution(t.db, roomInfo)
|
||||
roomState := state.NewStateResolution(t.db, t.roomInfo)
|
||||
stateAtEvents, err := t.db.StateAtEventIDs(ctx, []string{eventID})
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithField("room_id", roomID).WithError(err).Warnf("failed to get state after %s locally", eventID)
|
||||
|
|
@ -754,9 +750,8 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
|
|||
|
||||
// Define what we'll do in order to fetch the missing event ID.
|
||||
fetch := func(missingEventID string) {
|
||||
var h *gomatrixserverlib.Event
|
||||
h, err = t.lookupEvent(ctx, roomVersion, roomID, missingEventID, false)
|
||||
switch err.(type) {
|
||||
h, herr := t.lookupEvent(ctx, roomVersion, roomID, missingEventID, false)
|
||||
switch herr.(type) {
|
||||
case verifySigError:
|
||||
return
|
||||
case nil:
|
||||
|
|
@ -765,7 +760,7 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
|
|||
util.GetLogger(ctx).WithFields(logrus.Fields{
|
||||
"event_id": missingEventID,
|
||||
"room_id": roomID,
|
||||
}).Warn("Failed to fetch missing event")
|
||||
}).WithError(herr).Warn("Failed to fetch missing event")
|
||||
return
|
||||
}
|
||||
haveEventsMutex.Lock()
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ func (r *Admin) PerformAdminEvacuateRoom(
|
|||
}
|
||||
return
|
||||
}
|
||||
if roomInfo == nil || roomInfo.IsStub {
|
||||
if roomInfo == nil || roomInfo.IsStub() {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorNoRoom,
|
||||
Msg: fmt.Sprintf("Room %s not found", req.RoomID),
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ func (r *Backfiller) PerformBackfill(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info == nil || info.IsStub {
|
||||
if info == nil || info.IsStub() {
|
||||
return fmt.Errorf("PerformBackfill: missing room info for room %s", request.RoomID)
|
||||
}
|
||||
|
||||
|
|
@ -106,7 +106,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info == nil || info.IsStub {
|
||||
if info == nil || info.IsStub() {
|
||||
return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID)
|
||||
}
|
||||
requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities, r.PreferServers)
|
||||
|
|
@ -434,7 +434,7 @@ FindSuccessor:
|
|||
logrus.WithError(err).WithField("room_id", roomID).Error("ServersAtEvent: failed to get RoomInfo for room")
|
||||
return nil
|
||||
}
|
||||
if info == nil || info.IsStub {
|
||||
if info == nil || info.IsStub() {
|
||||
logrus.WithField("room_id", roomID).Error("ServersAtEvent: failed to get RoomInfo for room, room is missing")
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ func (r *InboundPeeker) PerformInboundPeek(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info == nil || info.IsStub {
|
||||
if info == nil || info.IsStub() {
|
||||
return nil
|
||||
}
|
||||
response.RoomExists = true
|
||||
|
|
|
|||
|
|
@ -125,7 +125,7 @@ func (r *Inviter) PerformInvite(
|
|||
return outputUpdates, nil
|
||||
}
|
||||
|
||||
if (info == nil || info.IsStub) && !isOriginLocal && isTargetLocal {
|
||||
if (info == nil || info.IsStub()) && !isOriginLocal && isTargetLocal {
|
||||
// The invite came in over federation for a room that we don't know about
|
||||
// yet. We need to handle this a bit differently to most invites because
|
||||
// we don't know the room state, therefore the roomserver can't process
|
||||
|
|
@ -276,7 +276,7 @@ func buildInviteStrippedState(
|
|||
}
|
||||
roomState := state.NewStateResolution(db, info)
|
||||
stateEntries, err := roomState.LoadStateAtSnapshotForStringTuples(
|
||||
ctx, info.StateSnapshotNID, stateWanted,
|
||||
ctx, info.StateSnapshotNID(), stateWanted,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ func (r *Queryer) QueryStateAfterEvents(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info == nil || info.IsStub {
|
||||
if info == nil || info.IsStub() {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -302,7 +302,7 @@ func (r *Queryer) QueryServerJoinedToRoom(
|
|||
if err != nil {
|
||||
return fmt.Errorf("r.DB.RoomInfo: %w", err)
|
||||
}
|
||||
if info == nil || info.IsStub {
|
||||
if info == nil || info.IsStub() {
|
||||
return nil
|
||||
}
|
||||
response.RoomExists = true
|
||||
|
|
@ -351,8 +351,8 @@ func (r *Queryer) QueryServerAllowedToSeeEvent(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info == nil {
|
||||
return fmt.Errorf("QueryServerAllowedToSeeEvent: no room info for room %s", roomID)
|
||||
if info == nil || info.IsStub() {
|
||||
return nil
|
||||
}
|
||||
response.AllowedToSeeEvent, err = helpers.CheckServerAllowedToSeeEvent(
|
||||
ctx, r.DB, info, request.EventID, request.ServerName, inRoomRes.IsInRoom,
|
||||
|
|
@ -390,7 +390,7 @@ func (r *Queryer) QueryMissingEvents(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info == nil || info.IsStub {
|
||||
if info == nil || info.IsStub() {
|
||||
return fmt.Errorf("missing RoomInfo for room %s", events[0].RoomID())
|
||||
}
|
||||
|
||||
|
|
@ -429,7 +429,7 @@ func (r *Queryer) QueryStateAndAuthChain(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info == nil || info.IsStub {
|
||||
if info == nil || info.IsStub() {
|
||||
return nil
|
||||
}
|
||||
response.RoomExists = true
|
||||
|
|
@ -774,7 +774,7 @@ func (r *Queryer) QueryRestrictedJoinAllowed(ctx context.Context, req *api.Query
|
|||
if err != nil {
|
||||
return fmt.Errorf("r.DB.RoomInfo: %w", err)
|
||||
}
|
||||
if roomInfo == nil || roomInfo.IsStub {
|
||||
if roomInfo == nil || roomInfo.IsStub() {
|
||||
return nil // fmt.Errorf("room %q doesn't exist or is stub room", req.RoomID)
|
||||
}
|
||||
// If the room version doesn't allow restricted joins then don't
|
||||
|
|
@ -837,7 +837,7 @@ func (r *Queryer) QueryRestrictedJoinAllowed(ctx context.Context, req *api.Query
|
|||
// See if the room exists. If it doesn't exist or if it's a stub
|
||||
// room entry then we can't check memberships.
|
||||
targetRoomInfo, err := r.DB.RoomInfo(ctx, rule.RoomID)
|
||||
if err != nil || targetRoomInfo == nil || targetRoomInfo.IsStub {
|
||||
if err != nil || targetRoomInfo == nil || targetRoomInfo.IsStub() {
|
||||
res.Resident = false
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -124,6 +124,29 @@ func (v *StateResolution) LoadStateAtEvent(
|
|||
return stateEntries, nil
|
||||
}
|
||||
|
||||
// LoadStateAtEvent loads the full state of a room before a particular event.
|
||||
func (v *StateResolution) LoadStateAtEventForHistoryVisibility(
|
||||
ctx context.Context, eventID string,
|
||||
) ([]types.StateEntry, error) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAtEvent")
|
||||
defer span.Finish()
|
||||
|
||||
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err)
|
||||
}
|
||||
if snapshotNID == 0 {
|
||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID)
|
||||
}
|
||||
|
||||
stateEntries, err := v.LoadStateAtSnapshot(ctx, snapshotNID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return stateEntries, nil
|
||||
}
|
||||
|
||||
// LoadCombinedStateAfterEvents loads a snapshot of the state after each of the events
|
||||
// and combines those snapshots together into a single list. At this point it is
|
||||
// possible to run into duplicate (type, state key) tuples.
|
||||
|
|
|
|||
|
|
@ -166,4 +166,6 @@ type Database interface {
|
|||
GetKnownRooms(ctx context.Context) ([]string, error)
|
||||
// ForgetRoom sets a flag in the membership table, that the user wishes to forget a specific room
|
||||
ForgetRoom(ctx context.Context, userID, roomID string, forget bool) error
|
||||
|
||||
GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]*gomatrixserverlib.Event, error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -147,14 +147,16 @@ func (s *roomStatements) InsertRoomNID(
|
|||
func (s *roomStatements) SelectRoomInfo(ctx context.Context, txn *sql.Tx, roomID string) (*types.RoomInfo, error) {
|
||||
var info types.RoomInfo
|
||||
var latestNIDs pq.Int64Array
|
||||
var stateSnapshotNID types.StateSnapshotNID
|
||||
stmt := sqlutil.TxStmt(txn, s.selectRoomInfoStmt)
|
||||
err := stmt.QueryRowContext(ctx, roomID).Scan(
|
||||
&info.RoomVersion, &info.RoomNID, &info.StateSnapshotNID, &latestNIDs,
|
||||
&info.RoomVersion, &info.RoomNID, &stateSnapshotNID, &latestNIDs,
|
||||
)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
info.IsStub = len(latestNIDs) == 0
|
||||
info.SetStateSnapshotNID(stateSnapshotNID)
|
||||
info.SetIsStub(len(latestNIDs) == 0)
|
||||
return &info, err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -72,9 +72,35 @@ const bulkSelectStateBlockNIDsSQL = "" +
|
|||
"SELECT state_snapshot_nid, state_block_nids FROM roomserver_state_snapshots" +
|
||||
" WHERE state_snapshot_nid = ANY($1) ORDER BY state_snapshot_nid ASC"
|
||||
|
||||
// Looks up both the history visibility event and relevant membership events from
|
||||
// a given domain name from a given state snapshot. This is used to optimise the
|
||||
// helpers.CheckServerAllowedToSeeEvent function.
|
||||
// TODO: There's a sequence scan here because of the hash join strategy, which is
|
||||
// probably O(n) on state key entries, so there must be a way to avoid that somehow.
|
||||
// Event type NIDs are:
|
||||
// - 5: m.room.member as per https://github.com/matrix-org/dendrite/blob/c7f7aec4d07d59120d37d5b16a900f6d608a75c4/roomserver/storage/postgres/event_types_table.go#L40
|
||||
// - 7: m.room.history_visibility as per https://github.com/matrix-org/dendrite/blob/c7f7aec4d07d59120d37d5b16a900f6d608a75c4/roomserver/storage/postgres/event_types_table.go#L42
|
||||
const bulkSelectStateForHistoryVisibilitySQL = `
|
||||
SELECT event_nid FROM (
|
||||
SELECT event_nid, event_type_nid, event_state_key_nid FROM roomserver_events
|
||||
WHERE (event_type_nid = 5 OR event_type_nid = 7)
|
||||
AND event_nid = ANY(
|
||||
SELECT UNNEST(event_nids) FROM roomserver_state_block
|
||||
WHERE state_block_nid = ANY(
|
||||
SELECT UNNEST(state_block_nids) FROM roomserver_state_snapshots
|
||||
WHERE state_snapshot_nid = $1
|
||||
)
|
||||
)
|
||||
) AS roomserver_events
|
||||
INNER JOIN roomserver_event_state_keys
|
||||
ON roomserver_events.event_state_key_nid = roomserver_event_state_keys.event_state_key_nid
|
||||
AND (event_type_nid = 7 OR event_state_key LIKE '%:' || $2);
|
||||
`
|
||||
|
||||
type stateSnapshotStatements struct {
|
||||
insertStateStmt *sql.Stmt
|
||||
bulkSelectStateBlockNIDsStmt *sql.Stmt
|
||||
insertStateStmt *sql.Stmt
|
||||
bulkSelectStateBlockNIDsStmt *sql.Stmt
|
||||
bulkSelectStateForHistoryVisibilityStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateStateSnapshotTable(db *sql.DB) error {
|
||||
|
|
@ -88,6 +114,7 @@ func PrepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
|
|||
return s, sqlutil.StatementList{
|
||||
{&s.insertStateStmt, insertStateSQL},
|
||||
{&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL},
|
||||
{&s.bulkSelectStateForHistoryVisibilityStmt, bulkSelectStateForHistoryVisibilitySQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
|
@ -136,3 +163,23 @@ func (s *stateSnapshotStatements) BulkSelectStateBlockNIDs(
|
|||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility(
|
||||
ctx context.Context, txn *sql.Tx, stateSnapshotNID types.StateSnapshotNID, domain string,
|
||||
) ([]types.EventNID, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.bulkSelectStateForHistoryVisibilityStmt)
|
||||
rows, err := stmt.QueryContext(ctx, stateSnapshotNID, domain)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
results := make([]types.EventNID, 0, 16)
|
||||
for rows.Next() {
|
||||
var eventNID types.EventNID
|
||||
if err = rows.Scan(&eventNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
results = append(results, eventNID)
|
||||
}
|
||||
return results, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -217,6 +217,14 @@ func (u *RoomUpdater) SetLatestEvents(
|
|||
roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID,
|
||||
currentStateSnapshotNID types.StateSnapshotNID,
|
||||
) error {
|
||||
switch {
|
||||
case len(latest) == 0:
|
||||
return fmt.Errorf("cannot set latest events with no latest event references")
|
||||
case currentStateSnapshotNID == 0:
|
||||
return fmt.Errorf("cannot set latest events with invalid state snapshot NID")
|
||||
case lastEventNIDSent == 0:
|
||||
return fmt.Errorf("cannot set latest events with invalid latest event NID")
|
||||
}
|
||||
eventNIDs := make([]types.EventNID, len(latest))
|
||||
for i := range latest {
|
||||
eventNIDs[i] = latest[i].EventNID
|
||||
|
|
@ -229,8 +237,10 @@ func (u *RoomUpdater) SetLatestEvents(
|
|||
// Since it's entirely possible that this types.RoomInfo came from the
|
||||
// cache, we should make sure to update that entry so that the next run
|
||||
// works from live data.
|
||||
u.roomInfo.StateSnapshotNID = currentStateSnapshotNID
|
||||
u.roomInfo.IsStub = false
|
||||
if u.roomInfo != nil {
|
||||
u.roomInfo.SetStateSnapshotNID(currentStateSnapshotNID)
|
||||
u.roomInfo.SetIsStub(false)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -156,15 +156,30 @@ func (d *Database) RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo
|
|||
}
|
||||
|
||||
func (d *Database) roomInfo(ctx context.Context, txn *sql.Tx, roomID string) (*types.RoomInfo, error) {
|
||||
if roomInfo, ok := d.Cache.GetRoomInfo(roomID); ok && roomInfo != nil {
|
||||
roomInfo, ok := d.Cache.GetRoomInfo(roomID)
|
||||
if ok && roomInfo != nil && !roomInfo.IsStub() {
|
||||
// The data that's in the cache is not stubby, so return it.
|
||||
return roomInfo, nil
|
||||
}
|
||||
roomInfo, err := d.RoomsTable.SelectRoomInfo(ctx, txn, roomID)
|
||||
if err == nil && roomInfo != nil {
|
||||
d.Cache.StoreRoomServerRoomID(roomInfo.RoomNID, roomID)
|
||||
d.Cache.StoreRoomInfo(roomID, roomInfo)
|
||||
// At this point we either don't have an entry in the cache, or
|
||||
// it is stubby, so let's check the roomserver_rooms table again.
|
||||
roomInfoFromDB, err := d.RoomsTable.SelectRoomInfo(ctx, txn, roomID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return roomInfo, err
|
||||
// If we have a stubby cache entry already, update it and return
|
||||
// the reference to the cache entry.
|
||||
if roomInfo != nil {
|
||||
roomInfo.CopyFrom(roomInfoFromDB)
|
||||
return roomInfo, nil
|
||||
}
|
||||
// Otherwise, try to admit the data into the cache and return the
|
||||
// new reference from the database.
|
||||
if roomInfoFromDB != nil {
|
||||
d.Cache.StoreRoomServerRoomID(roomInfoFromDB.RoomNID, roomID)
|
||||
d.Cache.StoreRoomInfo(roomID, roomInfoFromDB)
|
||||
}
|
||||
return roomInfoFromDB, err
|
||||
}
|
||||
|
||||
func (d *Database) AddState(
|
||||
|
|
@ -676,7 +691,7 @@ func (d *Database) storeEvent(
|
|||
succeeded := false
|
||||
if updater == nil {
|
||||
var roomInfo *types.RoomInfo
|
||||
roomInfo, err = d.RoomInfo(ctx, event.RoomID())
|
||||
roomInfo, err = d.roomInfo(ctx, txn, event.RoomID())
|
||||
if err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
|
||||
}
|
||||
|
|
@ -988,6 +1003,38 @@ func (d *Database) loadEvent(ctx context.Context, eventID string) *types.Event {
|
|||
return &evs[0]
|
||||
}
|
||||
|
||||
func (d *Database) GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]*gomatrixserverlib.Event, error) {
|
||||
eventStates, err := d.EventsTable.BulkSelectStateAtEventByID(ctx, nil, []string{eventID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stateSnapshotNID := eventStates[0].BeforeStateSnapshotNID
|
||||
if stateSnapshotNID == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
eventNIDs, err := d.StateSnapshotTable.BulkSelectStateForHistoryVisibility(ctx, nil, stateSnapshotNID, domain)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventIDs, _ := d.EventsTable.BulkSelectEventID(ctx, nil, eventNIDs)
|
||||
if err != nil {
|
||||
eventIDs = map[types.EventNID]string{}
|
||||
}
|
||||
events := make([]*gomatrixserverlib.Event, 0, len(eventNIDs))
|
||||
for _, eventNID := range eventNIDs {
|
||||
data, err := d.EventJSONTable.BulkSelectEventJSON(ctx, nil, []types.EventNID{eventNID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSONWithEventID(eventIDs[eventNID], data[0].EventJSON, false, roomInfo.RoomVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events = append(events, ev)
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// GetStateEvent returns the current state event of a given type for a given room with a given state key
|
||||
// If no event could be found, returns nil
|
||||
// If there was an issue during the retrieval, returns an error
|
||||
|
|
@ -1000,7 +1047,7 @@ func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey s
|
|||
return nil, fmt.Errorf("room %s doesn't exist", roomID)
|
||||
}
|
||||
// e.g invited rooms
|
||||
if roomInfo.IsStub {
|
||||
if roomInfo.IsStub() {
|
||||
return nil, nil
|
||||
}
|
||||
eventTypeNID, err := d.EventTypesTable.SelectEventTypeNID(ctx, nil, evType)
|
||||
|
|
@ -1019,7 +1066,7 @@ func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey s
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
entries, err := d.loadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID)
|
||||
entries, err := d.loadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -1065,7 +1112,7 @@ func (d *Database) GetStateEventsWithEventType(ctx context.Context, roomID, evTy
|
|||
return nil, fmt.Errorf("room %s doesn't exist", roomID)
|
||||
}
|
||||
// e.g invited rooms
|
||||
if roomInfo.IsStub {
|
||||
if roomInfo.IsStub() {
|
||||
return nil, nil
|
||||
}
|
||||
eventTypeNID, err := d.EventTypesTable.SelectEventTypeNID(ctx, nil, evType)
|
||||
|
|
@ -1076,7 +1123,7 @@ func (d *Database) GetStateEventsWithEventType(ctx context.Context, roomID, evTy
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
entries, err := d.loadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID)
|
||||
entries, err := d.loadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -1193,10 +1240,10 @@ func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tu
|
|||
return nil, fmt.Errorf("GetBulkStateContent: failed to load room info for room %s : %w", roomID, err2)
|
||||
}
|
||||
// for unknown rooms or rooms which we don't have the current state, skip them.
|
||||
if roomInfo == nil || roomInfo.IsStub {
|
||||
if roomInfo == nil || roomInfo.IsStub() {
|
||||
continue
|
||||
}
|
||||
entries, err2 := d.loadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID)
|
||||
entries, err2 := d.loadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID())
|
||||
if err2 != nil {
|
||||
return nil, fmt.Errorf("GetBulkStateContent: failed to load state for room %s : %w", roomID, err2)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -129,9 +129,10 @@ func (s *roomStatements) SelectRoomIDsWithEvents(ctx context.Context, txn *sql.T
|
|||
func (s *roomStatements) SelectRoomInfo(ctx context.Context, txn *sql.Tx, roomID string) (*types.RoomInfo, error) {
|
||||
var info types.RoomInfo
|
||||
var latestNIDsJSON string
|
||||
var stateSnapshotNID types.StateSnapshotNID
|
||||
stmt := sqlutil.TxStmt(txn, s.selectRoomInfoStmt)
|
||||
err := stmt.QueryRowContext(ctx, roomID).Scan(
|
||||
&info.RoomVersion, &info.RoomNID, &info.StateSnapshotNID, &latestNIDsJSON,
|
||||
&info.RoomVersion, &info.RoomNID, &stateSnapshotNID, &latestNIDsJSON,
|
||||
)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
|
|
@ -143,7 +144,8 @@ func (s *roomStatements) SelectRoomInfo(ctx context.Context, txn *sql.Tx, roomID
|
|||
if err = json.Unmarshal([]byte(latestNIDsJSON), &latestNIDs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
info.IsStub = len(latestNIDs) == 0
|
||||
info.SetStateSnapshotNID(stateSnapshotNID)
|
||||
info.SetIsStub(len(latestNIDs) == 0)
|
||||
return &info, err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -140,3 +140,9 @@ func (s *stateSnapshotStatements) BulkSelectStateBlockNIDs(
|
|||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility(
|
||||
ctx context.Context, txn *sql.Tx, stateSnapshotNID types.StateSnapshotNID, domain string,
|
||||
) ([]types.EventNID, error) {
|
||||
return nil, tables.OptimisationNotSupportedError
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,12 +3,15 @@ package tables
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
var OptimisationNotSupportedError = errors.New("optimisation not supported")
|
||||
|
||||
type EventJSONPair struct {
|
||||
EventNID types.EventNID
|
||||
EventJSON []byte
|
||||
|
|
@ -80,6 +83,10 @@ type Rooms interface {
|
|||
type StateSnapshot interface {
|
||||
InsertState(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs types.StateBlockNIDs) (stateNID types.StateSnapshotNID, err error)
|
||||
BulkSelectStateBlockNIDs(ctx context.Context, txn *sql.Tx, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
|
||||
// BulkSelectStateForHistoryVisibility is a PostgreSQL-only optimisation for finding
|
||||
// which users are in a room faster than having to load the entire room state. In the
|
||||
// case of SQLite, this will return tables.OptimisationNotSupportedError.
|
||||
BulkSelectStateForHistoryVisibility(ctx context.Context, txn *sql.Tx, stateSnapshotNID types.StateSnapshotNID, domain string) ([]types.EventNID, error)
|
||||
}
|
||||
|
||||
type StateBlock interface {
|
||||
|
|
|
|||
|
|
@ -63,12 +63,12 @@ func TestRoomsTable(t *testing.T) {
|
|||
|
||||
roomInfo, err := tab.SelectRoomInfo(ctx, nil, room.ID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, &types.RoomInfo{
|
||||
RoomNID: wantRoomNID,
|
||||
RoomVersion: room.Version,
|
||||
StateSnapshotNID: 0,
|
||||
IsStub: true, // there are no latestEventNIDs
|
||||
}, roomInfo)
|
||||
expected := &types.RoomInfo{
|
||||
RoomNID: wantRoomNID,
|
||||
RoomVersion: room.Version,
|
||||
}
|
||||
expected.SetIsStub(true) // there are no latestEventNIDs
|
||||
assert.Equal(t, expected, roomInfo)
|
||||
|
||||
roomInfo, err = tab.SelectRoomInfo(ctx, nil, "!doesnotexist:localhost")
|
||||
assert.NoError(t, err)
|
||||
|
|
@ -103,12 +103,12 @@ func TestRoomsTable(t *testing.T) {
|
|||
|
||||
roomInfo, err = tab.SelectRoomInfo(ctx, nil, room.ID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, &types.RoomInfo{
|
||||
RoomNID: wantRoomNID,
|
||||
RoomVersion: room.Version,
|
||||
StateSnapshotNID: 1,
|
||||
IsStub: false,
|
||||
}, roomInfo)
|
||||
expected = &types.RoomInfo{
|
||||
RoomNID: wantRoomNID,
|
||||
RoomVersion: room.Version,
|
||||
}
|
||||
expected.SetStateSnapshotNID(1)
|
||||
assert.Equal(t, expected, roomInfo)
|
||||
|
||||
eventNIDs, snapshotNID, err := tab.SelectLatestEventNIDs(ctx, nil, wantRoomNID)
|
||||
assert.NoError(t, err)
|
||||
|
|
|
|||
|
|
@ -23,6 +23,15 @@ func mustCreateStateSnapshotTable(t *testing.T, dbType test.DBType) (tab tables.
|
|||
assert.NoError(t, err)
|
||||
switch dbType {
|
||||
case test.DBTypePostgres:
|
||||
// for the PostgreSQL history visibility optimisation to work,
|
||||
// we also need some other tables to exist
|
||||
err = postgres.CreateEventStateKeysTable(db)
|
||||
assert.NoError(t, err)
|
||||
err = postgres.CreateEventsTable(db)
|
||||
assert.NoError(t, err)
|
||||
err = postgres.CreateStateBlockTable(db)
|
||||
assert.NoError(t, err)
|
||||
// ... and then the snapshot table itself
|
||||
err = postgres.CreateStateSnapshotTable(db)
|
||||
assert.NoError(t, err)
|
||||
tab, err = postgres.PrepareStateSnapshotTable(db)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"encoding/json"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
|
|
@ -279,8 +280,46 @@ func (e RejectedError) Error() string { return string(e) }
|
|||
|
||||
// RoomInfo contains metadata about a room
|
||||
type RoomInfo struct {
|
||||
mu sync.RWMutex
|
||||
RoomNID RoomNID
|
||||
RoomVersion gomatrixserverlib.RoomVersion
|
||||
StateSnapshotNID StateSnapshotNID
|
||||
IsStub bool
|
||||
stateSnapshotNID StateSnapshotNID
|
||||
isStub bool
|
||||
}
|
||||
|
||||
func (r *RoomInfo) StateSnapshotNID() StateSnapshotNID {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
return r.stateSnapshotNID
|
||||
}
|
||||
|
||||
func (r *RoomInfo) IsStub() bool {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
return r.isStub
|
||||
}
|
||||
|
||||
func (r *RoomInfo) SetStateSnapshotNID(nid StateSnapshotNID) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.stateSnapshotNID = nid
|
||||
}
|
||||
|
||||
func (r *RoomInfo) SetIsStub(isStub bool) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.isStub = isStub
|
||||
}
|
||||
|
||||
func (r *RoomInfo) CopyFrom(r2 *RoomInfo) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
r2.mu.RLock()
|
||||
defer r2.mu.RUnlock()
|
||||
|
||||
r.RoomNID = r2.RoomNID
|
||||
r.RoomVersion = r2.RoomVersion
|
||||
r.stateSnapshotNID = r2.stateSnapshotNID
|
||||
r.isStub = r2.isStub
|
||||
}
|
||||
|
|
|
|||
|
|
@ -369,6 +369,25 @@ func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationCli
|
|||
return client
|
||||
}
|
||||
|
||||
func (b *BaseDendrite) configureHTTPErrors() {
|
||||
notAllowedHandler := func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
_, _ = w.Write([]byte(fmt.Sprintf("405 %s not allowed on this endpoint", r.Method)))
|
||||
}
|
||||
|
||||
notFoundCORSHandler := httputil.WrapHandlerInCORS(http.NotFoundHandler())
|
||||
notAllowedCORSHandler := httputil.WrapHandlerInCORS(http.HandlerFunc(notAllowedHandler))
|
||||
|
||||
for _, router := range []*mux.Router{
|
||||
b.PublicClientAPIMux, b.PublicMediaAPIMux,
|
||||
b.DendriteAdminMux, b.SynapseAdminMux,
|
||||
b.PublicWellKnownAPIMux,
|
||||
} {
|
||||
router.NotFoundHandler = notFoundCORSHandler
|
||||
router.MethodNotAllowedHandler = notAllowedCORSHandler
|
||||
}
|
||||
}
|
||||
|
||||
// SetupAndServeHTTP sets up the HTTP server to serve endpoints registered on
|
||||
// ApiMux under /api/ and adds a prometheus handler under /metrics.
|
||||
func (b *BaseDendrite) SetupAndServeHTTP(
|
||||
|
|
@ -409,6 +428,8 @@ func (b *BaseDendrite) SetupAndServeHTTP(
|
|||
}
|
||||
}
|
||||
|
||||
b.configureHTTPErrors()
|
||||
|
||||
internalRouter.PathPrefix(httputil.InternalPathPrefix).Handler(b.InternalAPIMux)
|
||||
if b.Cfg.Global.Metrics.Enabled {
|
||||
internalRouter.Handle("/metrics", httputil.WrapHandlerInBasicAuth(promhttp.Handler(), b.Cfg.Global.Metrics.BasicAuth))
|
||||
|
|
|
|||
|
|
@ -17,6 +17,10 @@ type JetStream struct {
|
|||
TopicPrefix string `yaml:"topic_prefix"`
|
||||
// Keep all storage in memory. This is mostly useful for unit tests.
|
||||
InMemory bool `yaml:"in_memory"`
|
||||
// Disable logging. This is mostly useful for unit tests.
|
||||
NoLog bool `yaml:"-"`
|
||||
// Disables TLS validation. This should NOT be used in production
|
||||
DisableTLSValidation bool `yaml:"disable_tls_validation"`
|
||||
}
|
||||
|
||||
func (c *JetStream) Prefixed(name string) string {
|
||||
|
|
@ -32,6 +36,8 @@ func (c *JetStream) Defaults(generate bool) {
|
|||
c.TopicPrefix = "Dendrite"
|
||||
if generate {
|
||||
c.StoragePath = Path("./")
|
||||
c.NoLog = true
|
||||
c.DisableTLSValidation = true
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package jetstream
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
|
@ -46,6 +47,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
|
|||
NoSystemAccount: true,
|
||||
MaxPayload: 16 * 1024 * 1024,
|
||||
NoSigs: true,
|
||||
NoLog: cfg.NoLog,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
@ -75,7 +77,13 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
|
|||
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
|
||||
if nc == nil {
|
||||
var err error
|
||||
nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ","))
|
||||
opts := []natsclient.Option{}
|
||||
if cfg.DisableTLSValidation {
|
||||
opts = append(opts, natsclient.Secure(&tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
}))
|
||||
}
|
||||
nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ","), opts...)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("Unable to connect to NATS")
|
||||
return nil, nil
|
||||
|
|
|
|||
|
|
@ -109,12 +109,11 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
p.queue(func() {
|
||||
defer reqWaitGroup.Done()
|
||||
|
||||
var jr *types.JoinResponse
|
||||
jr, err = p.getJoinResponseForCompleteSync(
|
||||
jr, jerr := p.getJoinResponseForCompleteSync(
|
||||
ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device,
|
||||
)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||
if jerr != nil {
|
||||
req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -262,9 +261,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
|||
var pos types.StreamPosition
|
||||
if _, pos, err = p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
|
||||
switch {
|
||||
case r.Backwards && pos > latestPosition:
|
||||
case r.Backwards && pos < latestPosition:
|
||||
fallthrough
|
||||
case !r.Backwards && pos < latestPosition:
|
||||
case !r.Backwards && pos > latestPosition:
|
||||
latestPosition = pos
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -128,19 +128,15 @@ func TestRequestPool_updatePresence(t *testing.T) {
|
|||
go rp.cleanPresence(db, time.Millisecond*50)
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
beforeCount := func() int {
|
||||
publisher.lock.Lock()
|
||||
defer publisher.lock.Unlock()
|
||||
return publisher.count
|
||||
}()
|
||||
publisher.lock.Lock()
|
||||
beforeCount := publisher.count
|
||||
publisher.lock.Unlock()
|
||||
rp.updatePresence(db, tt.args.presence, tt.args.userID)
|
||||
func() {
|
||||
publisher.lock.Lock()
|
||||
defer publisher.lock.Unlock()
|
||||
if tt.wantIncrease && publisher.count <= beforeCount {
|
||||
t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount)
|
||||
}
|
||||
}()
|
||||
publisher.lock.Lock()
|
||||
if tt.wantIncrease && publisher.count <= beforeCount {
|
||||
t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount)
|
||||
}
|
||||
publisher.lock.Unlock()
|
||||
time.Sleep(tt.args.sleep)
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,3 +49,7 @@ Notifications can be viewed with GET /notifications
|
|||
|
||||
If remote user leaves room we no longer receive device updates
|
||||
Guest users can join guest_access rooms
|
||||
|
||||
# You'll be shocked to discover this is flakey too
|
||||
|
||||
Inbound /v1/send_join rejects joins from other servers
|
||||
|
|
|
|||
Loading…
Reference in a new issue