mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-11 08:03:09 -06:00
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/syncunreadcount
This commit is contained in:
commit
cff6e944c1
35
CHANGES.md
35
CHANGES.md
|
|
@ -1,5 +1,40 @@
|
|||
# Changelog
|
||||
|
||||
## Dendrite 0.9.8 (2022-09-12)
|
||||
|
||||
### Important
|
||||
|
||||
* This is a **security release** to fix a vulnerability where missing events retrieved from other servers did not have their signatures verified in all cases, affecting all versions of Dendrite before 0.9.8. Upgrading to this version is highly recommended. For more information, [see here](https://github.com/matrix-org/dendrite/security/advisories/GHSA-pfw4-xjgm-267c).
|
||||
|
||||
### Features
|
||||
|
||||
* The built-in NATS Server has been updated to the final 2.9.0 release version
|
||||
|
||||
### Fixes
|
||||
|
||||
* Dendrite will now correctly verify the signatures of events retrieved using `/get_missing_events`
|
||||
|
||||
## Dendrite 0.9.7 (2022-09-09)
|
||||
|
||||
### Features
|
||||
|
||||
* Initial supporting code to enable full-text search has been merged (although not ready for use yet)
|
||||
* Newly created rooms now have higher default power levels for enabling encryption, setting server ACLs or sending tombstone events
|
||||
* Incoming signing key updates over federation are now queued in JetStream for processing, so that they cannot be dropped accidentally
|
||||
|
||||
### Fixes
|
||||
|
||||
* A race condition between the roomserver output events being generated, forward extremities being updated and room info being updated has been fixed
|
||||
* Appservices will no longer receive invite events which they are not interested in, which caused heavy load in some cases or excessive request sizes in others
|
||||
* A bug in state resolution v2 where events could incorrectly be classified as control events has been fixed
|
||||
* A bug in state resolution v2 where some specific events with unexpected non-empty state keys are dropped has been fixed
|
||||
* A bug in state resolution v2 when fetching auth events vs partial state has been fixed
|
||||
* Stale device lists should now be handled correctly for all user IDs, which may help with E2EE reliability
|
||||
* A number of database writer issues have been fixed in the user API and sync API, which should help to reduce `database is locked` errors with SQLite databases
|
||||
* Database migrations should now be detected more reliably to prevent unexpected errors at startup
|
||||
* A number of minor database transaction issues have been fixed, particularly for assigning NIDs in the roomserver, cleaning up device keys and cleaning up notifications
|
||||
* The database query for finding shared users in the sync API has been optimised, using significantly less CPU time as a result
|
||||
|
||||
## Dendrite 0.9.6 (2022-09-01)
|
||||
|
||||
### Features
|
||||
|
|
|
|||
|
|
@ -218,6 +218,9 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -233,8 +236,13 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
var err error
|
||||
m.listener, err = net.Listen("tcp", "localhost:65432")
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -87,6 +87,9 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -103,8 +106,13 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
var err error
|
||||
m.listener, err = net.Listen("tcp", "localhost:65432")
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -21,12 +21,13 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
)
|
||||
|
||||
// SyncAPIProducer produces events for the sync API server to consume
|
||||
|
|
@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
|||
|
||||
func (p *SyncAPIProducer) SendToDevice(
|
||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||
message interface{},
|
||||
message json.RawMessage,
|
||||
) error {
|
||||
devices := []string{}
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
|
|
@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
devices = append(devices, deviceID)
|
||||
}
|
||||
|
||||
js, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"user_id": userID,
|
||||
"num_devices": len(devices),
|
||||
"type": eventType,
|
||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||
for _, device := range devices {
|
||||
for i, device := range devices {
|
||||
ote := &types.OutputSendToDeviceEvent{
|
||||
UserID: userID,
|
||||
DeviceID: device,
|
||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||
Sender: sender,
|
||||
Type: eventType,
|
||||
Content: js,
|
||||
Content: message,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||
return err
|
||||
}
|
||||
m := &nats.Msg{
|
||||
Subject: p.TopicSendToDeviceEvent,
|
||||
Data: eventJSON,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||
m.Data = eventJSON
|
||||
m.Header.Set("sender", sender)
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
|
||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
||||
if i < len(devices)-1 {
|
||||
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||
continue
|
||||
}
|
||||
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -98,6 +98,9 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -114,7 +117,13 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
cfg.Defaults(config.DefaultOpts{
|
||||
Generate: true,
|
||||
Monolithic: true,
|
||||
|
|
@ -136,7 +145,6 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
||||
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
|
||||
|
||||
|
|
|
|||
|
|
@ -86,6 +86,9 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -102,8 +105,13 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
// use custom config if config flag is set
|
||||
if configFlagSet {
|
||||
cfg = setup.ParseFlags(true)
|
||||
|
|
@ -129,7 +137,6 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
||||
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
|
||||
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
|||
|
||||
func (p *SyncAPIProducer) SendToDevice(
|
||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||
message interface{},
|
||||
message json.RawMessage,
|
||||
) error {
|
||||
devices := []string{}
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
|
|
@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
devices = append(devices, deviceID)
|
||||
}
|
||||
|
||||
js, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"user_id": userID,
|
||||
"num_devices": len(devices),
|
||||
"type": eventType,
|
||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||
for _, device := range devices {
|
||||
for i, device := range devices {
|
||||
ote := &types.OutputSendToDeviceEvent{
|
||||
UserID: userID,
|
||||
DeviceID: device,
|
||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||
Sender: sender,
|
||||
Type: eventType,
|
||||
Content: js,
|
||||
Content: message,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||
return err
|
||||
}
|
||||
m := &nats.Msg{
|
||||
Subject: p.TopicSendToDeviceEvent,
|
||||
Data: eventJSON,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||
m.Data = eventJSON
|
||||
m.Header.Set("sender", sender)
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
|
||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
||||
if i < len(devices)-1 {
|
||||
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||
continue
|
||||
}
|
||||
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
16
go.mod
16
go.mod
|
|
@ -17,16 +17,17 @@ require (
|
|||
github.com/google/uuid v1.3.0
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/kardianos/minwinsvc v1.0.0
|
||||
github.com/lib/pq v1.10.5
|
||||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661
|
||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a
|
||||
github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||
github.com/mattn/go-sqlite3 v1.14.13
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995
|
||||
github.com/nats-io/nats-server/v2 v2.9.0
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0
|
||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
||||
github.com/ngrok/sqlmw v0.0.0-20220520173518-97c9c04efc79
|
||||
|
|
@ -42,7 +43,7 @@ require (
|
|||
github.com/uber/jaeger-lib v2.4.1+incompatible
|
||||
github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c
|
||||
go.uber.org/atomic v1.9.0
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
|
||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90
|
||||
golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9
|
||||
golang.org/x/mobile v0.0.0-20220722155234-aaac322e2105
|
||||
golang.org/x/net v0.0.0-20220728211354-c7608f3a8462
|
||||
|
|
@ -92,7 +93,6 @@ require (
|
|||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 // indirect
|
||||
github.com/juju/testing v0.0.0-20220203020004-a0ff61f03494 // indirect
|
||||
github.com/kardianos/minwinsvc v1.0.0 // indirect
|
||||
github.com/klauspost/compress v1.15.9 // indirect
|
||||
github.com/lucas-clemente/quic-go v0.28.1 // indirect
|
||||
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
|
||||
|
|
@ -124,9 +124,9 @@ require (
|
|||
github.com/tidwall/pretty v1.2.0 // indirect
|
||||
go.etcd.io/bbolt v1.3.5 // indirect
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
|
||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d // indirect
|
||||
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 // indirect
|
||||
golang.org/x/text v0.3.8-0.20211004125949-5bd84dd9b33b // indirect
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
|
||||
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
|
||||
golang.org/x/tools v0.1.12 // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
gopkg.in/macaroon.v2 v2.1.0 // indirect
|
||||
|
|
|
|||
27
go.sum
27
go.sum
|
|
@ -388,10 +388,10 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
|
|||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661 h1:dww9rH0HVfAO9JOBD1nxq26GHKbEw07thAJTu1DrAQs=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed h1:YMcCnrmTbT5M1LtTiagiFFaj9vEgvC6iVEzWsIb0tQQ=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a h1:XeGBDZZsUe4kgj3myl0EiuDNVWxszJecMTrON3Wn9sI=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 h1:XuJYAJNkdG3zj9cO0yQSvL+Sp2xogsTOuZRx7PwdtoA=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||
|
|
@ -429,10 +429,10 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
|
|||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
|
||||
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c h1:U5qngWGZ7E/nQxz0544IpIEdKFUUaOJxQN2LHCYLGhg=
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c/go.mod h1:+f++B/5jpr71JATt7b5KCX+G7bt43iWx1OYWGkpE/Kk=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995 h1:CUcSQR8jwa9//qNgN/t3tW53DObnTPQ/G/K+qnS7yRc=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats-server/v2 v2.9.0 h1:DLWu+7/VgGOoChcDKytnUZPAmudpv7o/MhKmNrnH1RE=
|
||||
github.com/nats-io/nats-server/v2 v2.9.0/go.mod h1:BWKY6217RvhI+FDoOLZ2BH+hOC37xeKRBlQ1Lz7teKI=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0 h1:dPUKD6Iv8M1y9MU8PK6H4a4/12yx5/CbaYWz/Z1arY8=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
|
|
@ -627,8 +627,9 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm
|
|||
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c=
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM=
|
||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
|
|
@ -812,8 +813,8 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220730100132-1609e554cd39/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d h1:Sv5ogFZatcgIMMtBSTTAgMYsicp25MXBubjXNDKwm80=
|
||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 h1:C1tElbkWrsSkn3IRl1GCW/gETw1TywWIPgwZtXTZbYg=
|
||||
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467 h1:CBpWXWQpIRjzmkkA+M7q9Fqnwd2mZr3AFqexg8YTfoM=
|
||||
|
|
@ -831,8 +832,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
|
|||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306 h1:+gHMid33q6pen7kv9xvT+JRinntgeXO2AeZVd0AWD3w=
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 h1:ftMN5LMiBFjbzleLqtoBZk7KdJwhuybIU+FckUHgoyQ=
|
||||
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
|
|
|||
|
|
@ -16,6 +16,12 @@ func mRuleContainsUserNameDefinition(localpart string) *Rule {
|
|||
Default: true,
|
||||
Enabled: true,
|
||||
Pattern: localpart,
|
||||
Conditions: []*Condition{
|
||||
{
|
||||
Kind: EventMatchCondition,
|
||||
Key: "content.body",
|
||||
},
|
||||
},
|
||||
Actions: []*Action{
|
||||
{Kind: NotifyAction},
|
||||
{
|
||||
|
|
|
|||
|
|
@ -7,8 +7,9 @@ func defaultOverrideRules(userID string) []*Rule {
|
|||
mRuleInviteForMeDefinition(userID),
|
||||
&mRuleMemberEventDefinition,
|
||||
&mRuleContainsDisplayNameDefinition,
|
||||
&mRuleTombstoneDefinition,
|
||||
&mRuleRoomNotifDefinition,
|
||||
&mRuleTombstoneDefinition,
|
||||
&mRuleReactionDefinition,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -20,6 +21,7 @@ const (
|
|||
MRuleContainsDisplayName = ".m.rule.contains_display_name"
|
||||
MRuleTombstone = ".m.rule.tombstone"
|
||||
MRuleRoomNotif = ".m.rule.roomnotif"
|
||||
MRuleReaction = ".m.rule.reaction"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -96,7 +98,7 @@ var (
|
|||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
Value: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -120,10 +122,25 @@ var (
|
|||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
Value: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
mRuleReactionDefinition = Rule{
|
||||
RuleID: MRuleReaction,
|
||||
Default: true,
|
||||
Enabled: true,
|
||||
Conditions: []*Condition{
|
||||
{
|
||||
Kind: EventMatchCondition,
|
||||
Key: "type",
|
||||
Pattern: "m.reaction",
|
||||
},
|
||||
},
|
||||
Actions: []*Action{
|
||||
{Kind: DontNotifyAction},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func mRuleInviteForMeDefinition(userID string) *Rule {
|
||||
|
|
|
|||
|
|
@ -10,8 +10,8 @@ const (
|
|||
|
||||
var defaultUnderrideRules = []*Rule{
|
||||
&mRuleCallDefinition,
|
||||
&mRuleEncryptedRoomOneToOneDefinition,
|
||||
&mRuleRoomOneToOneDefinition,
|
||||
&mRuleEncryptedRoomOneToOneDefinition,
|
||||
&mRuleMessageDefinition,
|
||||
&mRuleEncryptedDefinition,
|
||||
}
|
||||
|
|
@ -59,6 +59,11 @@ var (
|
|||
},
|
||||
Actions: []*Action{
|
||||
{Kind: NotifyAction},
|
||||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: SoundTweak,
|
||||
Value: "default",
|
||||
},
|
||||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
|
|
@ -88,6 +93,11 @@ var (
|
|||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
},
|
||||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
mRuleMessageDefinition = Rule{
|
||||
|
|
@ -101,7 +111,14 @@ var (
|
|||
Pattern: "m.room.message",
|
||||
},
|
||||
},
|
||||
Actions: []*Action{{Kind: NotifyAction}},
|
||||
Actions: []*Action{
|
||||
{Kind: NotifyAction},
|
||||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
mRuleEncryptedDefinition = Rule{
|
||||
RuleID: MRuleEncrypted,
|
||||
|
|
@ -114,6 +131,13 @@ var (
|
|||
Pattern: "m.room.encrypted",
|
||||
},
|
||||
},
|
||||
Actions: []*Action{{Kind: NotifyAction}},
|
||||
Actions: []*Action{
|
||||
{Kind: NotifyAction},
|
||||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
|
|
|||
|
|
@ -24,24 +24,28 @@ func TestRuleSetEvaluatorMatchEvent(t *testing.T) {
|
|||
Default: false,
|
||||
Enabled: true,
|
||||
}
|
||||
defaultRuleset := DefaultGlobalRuleSet("test", "test")
|
||||
tsts := []struct {
|
||||
Name string
|
||||
RuleSet RuleSet
|
||||
Want *Rule
|
||||
Event *gomatrixserverlib.Event
|
||||
}{
|
||||
{"empty", RuleSet{}, nil},
|
||||
{"defaultCanWin", RuleSet{Override: []*Rule{defaultEnabled}}, defaultEnabled},
|
||||
{"userWins", RuleSet{Override: []*Rule{defaultEnabled, userEnabled}}, userEnabled},
|
||||
{"defaultOverrideWins", RuleSet{Override: []*Rule{defaultEnabled}, Underride: []*Rule{userEnabled}}, defaultEnabled},
|
||||
{"overrideContent", RuleSet{Override: []*Rule{userEnabled}, Content: []*Rule{userEnabled2}}, userEnabled},
|
||||
{"overrideRoom", RuleSet{Override: []*Rule{userEnabled}, Room: []*Rule{userEnabled2}}, userEnabled},
|
||||
{"overrideSender", RuleSet{Override: []*Rule{userEnabled}, Sender: []*Rule{userEnabled2}}, userEnabled},
|
||||
{"overrideUnderride", RuleSet{Override: []*Rule{userEnabled}, Underride: []*Rule{userEnabled2}}, userEnabled},
|
||||
{"empty", RuleSet{}, nil, ev},
|
||||
{"defaultCanWin", RuleSet{Override: []*Rule{defaultEnabled}}, defaultEnabled, ev},
|
||||
{"userWins", RuleSet{Override: []*Rule{defaultEnabled, userEnabled}}, userEnabled, ev},
|
||||
{"defaultOverrideWins", RuleSet{Override: []*Rule{defaultEnabled}, Underride: []*Rule{userEnabled}}, defaultEnabled, ev},
|
||||
{"overrideContent", RuleSet{Override: []*Rule{userEnabled}, Content: []*Rule{userEnabled2}}, userEnabled, ev},
|
||||
{"overrideRoom", RuleSet{Override: []*Rule{userEnabled}, Room: []*Rule{userEnabled2}}, userEnabled, ev},
|
||||
{"overrideSender", RuleSet{Override: []*Rule{userEnabled}, Sender: []*Rule{userEnabled2}}, userEnabled, ev},
|
||||
{"overrideUnderride", RuleSet{Override: []*Rule{userEnabled}, Underride: []*Rule{userEnabled2}}, userEnabled, ev},
|
||||
{"reactions don't notify", *defaultRuleset, &mRuleReactionDefinition, mustEventFromJSON(t, `{"type":"m.reaction"}`)},
|
||||
{"receipts don't notify", *defaultRuleset, nil, mustEventFromJSON(t, `{"type":"m.receipt"}`)},
|
||||
}
|
||||
for _, tst := range tsts {
|
||||
t.Run(tst.Name, func(t *testing.T) {
|
||||
rse := NewRuleSetEvaluator(nil, &tst.RuleSet)
|
||||
got, err := rse.MatchEvent(ev)
|
||||
rse := NewRuleSetEvaluator(fakeEvaluationContext{3}, &tst.RuleSet)
|
||||
got, err := rse.MatchEvent(tst.Event)
|
||||
if err != nil {
|
||||
t.Fatalf("MatchEvent failed: %v", err)
|
||||
}
|
||||
|
|
@ -128,7 +132,7 @@ func TestConditionMatches(t *testing.T) {
|
|||
}
|
||||
for _, tst := range tsts {
|
||||
t.Run(tst.Name, func(t *testing.T) {
|
||||
got, err := conditionMatches(&tst.Cond, mustEventFromJSON(t, tst.EventJSON), &fakeEvaluationContext{})
|
||||
got, err := conditionMatches(&tst.Cond, mustEventFromJSON(t, tst.EventJSON), &fakeEvaluationContext{2})
|
||||
if err != nil {
|
||||
t.Fatalf("conditionMatches failed: %v", err)
|
||||
}
|
||||
|
|
@ -139,10 +143,10 @@ func TestConditionMatches(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type fakeEvaluationContext struct{}
|
||||
type fakeEvaluationContext struct{ memberCount int }
|
||||
|
||||
func (fakeEvaluationContext) UserDisplayName() string { return "Dear User" }
|
||||
func (fakeEvaluationContext) RoomMemberCount() (int, error) { return 2, nil }
|
||||
func (fakeEvaluationContext) UserDisplayName() string { return "Dear User" }
|
||||
func (f fakeEvaluationContext) RoomMemberCount() (int, error) { return f.memberCount, nil }
|
||||
func (fakeEvaluationContext) HasPowerLevel(userID, levelKey string) (bool, error) {
|
||||
return userID == "@poweruser:example.com" && levelKey == "powerlevel", nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import (
|
|||
// kind and a tweaks map. Returns a nil map if it would have been
|
||||
// empty.
|
||||
func ActionsToTweaks(as []*Action) (ActionKind, map[string]interface{}, error) {
|
||||
var kind ActionKind
|
||||
kind := UnknownAction
|
||||
tweaks := map[string]interface{}{}
|
||||
|
||||
for _, a := range as {
|
||||
|
|
|
|||
|
|
@ -21,8 +21,9 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
)
|
||||
|
||||
const createDBMigrationsSQL = "" +
|
||||
|
|
@ -95,11 +96,11 @@ func (m *Migrator) Up(ctx context.Context) error {
|
|||
for i := range m.migrations {
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
migration := m.migrations[i]
|
||||
logrus.Debugf("Executing database migration '%s'", migration.Version)
|
||||
// Skip migration if it was already executed
|
||||
if _, ok := executedMigrations[migration.Version]; ok {
|
||||
continue
|
||||
}
|
||||
logrus.Debugf("Executing database migration '%s'", migration.Version)
|
||||
err = migration.Up(ctx, txn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to execute migration '%s': %w", migration.Version, err)
|
||||
|
|
@ -140,3 +141,24 @@ func (m *Migrator) ExecutedMigrations(ctx context.Context) (map[string]struct{},
|
|||
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
// InsertMigration creates the migrations table if it doesn't exist and
|
||||
// inserts a migration given their name to the database.
|
||||
// This should only be used when manually inserting migrations.
|
||||
func InsertMigration(ctx context.Context, db *sql.DB, migrationName string) error {
|
||||
_, err := db.ExecContext(ctx, createDBMigrationsSQL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create db_migrations: %w", err)
|
||||
}
|
||||
_, err = db.ExecContext(ctx, insertVersionSQL,
|
||||
migrationName,
|
||||
time.Now().Format(time.RFC3339),
|
||||
internal.VersionString(),
|
||||
)
|
||||
// If the migration was already executed, we'll get a unique constraint error,
|
||||
// return nil instead, to avoid unnecessary logging.
|
||||
if IsUniqueConstraintViolationErr(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,12 +12,27 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build wasm
|
||||
// +build wasm
|
||||
//go:build !wasm
|
||||
// +build !wasm
|
||||
|
||||
package sqlutil
|
||||
|
||||
// IsUniqueConstraintViolationErr no-ops for this architecture
|
||||
import (
|
||||
"github.com/lib/pq"
|
||||
"github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
|
||||
func IsUniqueConstraintViolationErr(err error) bool {
|
||||
switch e := err.(type) {
|
||||
case *pq.Error:
|
||||
return e.Code == "23505"
|
||||
case pq.Error:
|
||||
return e.Code == "23505"
|
||||
case *sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
case sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
@ -12,15 +12,20 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !wasm
|
||||
// +build !wasm
|
||||
//go:build wasm
|
||||
// +build wasm
|
||||
|
||||
package sqlutil
|
||||
|
||||
import "github.com/lib/pq"
|
||||
import "github.com/mattn/go-sqlite3"
|
||||
|
||||
// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error
|
||||
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
|
||||
func IsUniqueConstraintViolationErr(err error) bool {
|
||||
pqErr, ok := err.(*pq.Error)
|
||||
return ok && pqErr.Code == "23505"
|
||||
switch e := err.(type) {
|
||||
case *sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
case sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@ var build string
|
|||
const (
|
||||
VersionMajor = 0
|
||||
VersionMinor = 9
|
||||
VersionPatch = 6
|
||||
VersionPatch = 8
|
||||
VersionTag = "" // example: "rc1"
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ package postgres
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
|
@ -63,30 +63,36 @@ func NewPostgresKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) {
|
|||
return s, err
|
||||
}
|
||||
|
||||
if err = executeMigration(context.Background(), db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column partition was removed from the table
|
||||
var count int
|
||||
err = db.QueryRow("SELECT partition FROM keyserver_key_changes LIMIT 1;").Scan(&count)
|
||||
if err == nil {
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: "keyserver: refactor key changes",
|
||||
Up: deltas.UpRefactorKeyChanges,
|
||||
})
|
||||
return s, m.Up(context.Background())
|
||||
} else {
|
||||
switch e := err.(type) {
|
||||
case *pq.Error:
|
||||
// ignore undefined_column (42703) errors, as this is expected at this point
|
||||
if e.Code != "42703" {
|
||||
return nil, err
|
||||
migrationName := "keyserver: refactor key changes"
|
||||
|
||||
var cName string
|
||||
err := db.QueryRowContext(ctx, "select column_name from information_schema.columns where table_name = 'keyserver_key_changes' AND column_name = 'partition'").Scan(&cName)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
default:
|
||||
return nil, err
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
return s, nil
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: migrationName,
|
||||
Up: deltas.UpRefactorKeyChanges,
|
||||
})
|
||||
|
||||
return m.Up(ctx)
|
||||
}
|
||||
|
||||
func (s *keyChangesStatements) Prepare() (err error) {
|
||||
|
|
|
|||
|
|
@ -158,7 +158,7 @@ func (d *Database) MarkDeviceListStale(ctx context.Context, userID string, isSta
|
|||
// DeleteDeviceKeys removes the device keys for a given user/device, and any accompanying
|
||||
// cross-signing signatures relating to that device.
|
||||
func (d *Database) DeleteDeviceKeys(ctx context.Context, userID string, deviceIDs []gomatrixserverlib.KeyID) error {
|
||||
return d.Writer.Do(nil, nil, func(txn *sql.Tx) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
for _, deviceID := range deviceIDs {
|
||||
if err := d.CrossSigningSigsTable.DeleteCrossSigningSigsForTarget(ctx, txn, userID, deviceID); err != nil && err != sql.ErrNoRows {
|
||||
return fmt.Errorf("d.CrossSigningSigsTable.DeleteCrossSigningSigsForTarget: %w", err)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ package sqlite3
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
|
@ -58,23 +60,39 @@ func NewSqliteKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) {
|
|||
if err != nil {
|
||||
return s, err
|
||||
}
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column partition was removed from the table
|
||||
var count int
|
||||
err = db.QueryRow("SELECT partition FROM keyserver_key_changes LIMIT 1;").Scan(&count)
|
||||
if err == nil {
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: "keyserver: refactor key changes",
|
||||
Up: deltas.UpRefactorKeyChanges,
|
||||
})
|
||||
return s, m.Up(context.Background())
|
||||
|
||||
if err = executeMigration(context.Background(), db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column partition was removed from the table
|
||||
migrationName := "keyserver: refactor key changes"
|
||||
|
||||
var cName string
|
||||
err := db.QueryRowContext(ctx, `SELECT p.name FROM sqlite_master AS m JOIN pragma_table_info(m.name) AS p WHERE m.name = 'keyserver_key_changes' AND p.name = 'partition'`).Scan(&cName)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: migrationName,
|
||||
Up: deltas.UpRefactorKeyChanges,
|
||||
})
|
||||
return m.Up(ctx)
|
||||
}
|
||||
|
||||
func (s *keyChangesStatements) Prepare() (err error) {
|
||||
if s.upsertKeyChangeStmt, err = s.db.Prepare(upsertKeyChangeSQL); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -553,11 +553,14 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
|
|||
|
||||
// Make sure events from the missingResp are using the cache - missing events
|
||||
// will be added and duplicates will be removed.
|
||||
logger.Debugf("get_missing_events returned %d events", len(missingResp.Events))
|
||||
missingEvents := make([]*gomatrixserverlib.Event, 0, len(missingResp.Events))
|
||||
for _, ev := range missingResp.Events.UntrustedEvents(roomVersion) {
|
||||
if err = ev.VerifyEventSignatures(ctx, t.keys); err != nil {
|
||||
continue
|
||||
}
|
||||
missingEvents = append(missingEvents, t.cacheAndReturn(ev))
|
||||
}
|
||||
logger.Debugf("get_missing_events returned %d events (%d passed signature checks)", len(missingResp.Events), len(missingEvents))
|
||||
|
||||
// topologically sort and sanity check that we are making forward progress
|
||||
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingEvents, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
||||
|
|
|
|||
|
|
@ -16,10 +16,11 @@
|
|||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/lib/pq"
|
||||
// Import the postgres database driver.
|
||||
_ "github.com/lib/pq"
|
||||
|
||||
|
|
@ -52,30 +53,8 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
|
|||
|
||||
// Special case, since this migration uses several tables, so it needs to
|
||||
// be sure that all tables are created first.
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column event_nid was removed from the table
|
||||
var eventNID int
|
||||
err = db.QueryRow("SELECT event_nid FROM roomserver_state_block LIMIT 1;").Scan(&eventNID)
|
||||
if err == nil {
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: "roomserver: state blocks refactor",
|
||||
Up: deltas.UpStateBlocksRefactor,
|
||||
})
|
||||
if err = m.Up(base.Context()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
switch e := err.(type) {
|
||||
case *pq.Error:
|
||||
// ignore undefined_column (42703) errors, as this is expected at this point
|
||||
if e.Code != "42703" {
|
||||
return nil, err
|
||||
}
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
if err = executeMigration(base.Context(), db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Then prepare the statements. Now that the migrations have run, any columns referred
|
||||
|
|
@ -87,6 +66,32 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
|
|||
return &d, nil
|
||||
}
|
||||
|
||||
func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column event_nid was removed from the table
|
||||
migrationName := "roomserver: state blocks refactor"
|
||||
|
||||
var cName string
|
||||
err := db.QueryRowContext(ctx, "select column_name from information_schema.columns where table_name = 'roomserver_state_block' AND column_name = 'event_nid'").Scan(&cName)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: migrationName,
|
||||
Up: deltas.UpStateBlocksRefactor,
|
||||
})
|
||||
|
||||
return m.Up(ctx)
|
||||
}
|
||||
|
||||
func (d *Database) create(db *sql.DB) error {
|
||||
if err := CreateEventStateKeysTable(db); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -26,11 +26,11 @@ func NewMembershipUpdater(
|
|||
var targetUserNID types.EventStateKeyNID
|
||||
var err error
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
roomNID, err = d.assignRoomNID(ctx, roomID, roomVersion)
|
||||
roomNID, err = d.assignRoomNID(ctx, txn, roomID, roomVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
targetUserNID, err = d.assignStateKeyNID(ctx, targetUserID)
|
||||
targetUserNID, err = d.assignStateKeyNID(ctx, txn, targetUserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -101,7 +101,7 @@ func (u *MembershipUpdater) Update(newMembership tables.MembershipState, event *
|
|||
var inserted bool // Did the query result in a membership change?
|
||||
var retired []string // Did we retire any updates in the process?
|
||||
return inserted, retired, u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, event.Sender())
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, event.Sender())
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -402,7 +402,7 @@ func (d *Database) RemoveRoomAlias(ctx context.Context, alias string) error {
|
|||
func (d *Database) GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom, isRoomforgotten bool, err error) {
|
||||
var requestSenderUserNID types.EventStateKeyNID
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
requestSenderUserNID, err = d.assignStateKeyNID(ctx, requestSenderUserID)
|
||||
requestSenderUserNID, err = d.assignStateKeyNID(ctx, txn, requestSenderUserID)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
|
|
@ -596,7 +596,9 @@ func (d *Database) storeEvent(
|
|||
if updater != nil && updater.txn != nil {
|
||||
txn = updater.txn
|
||||
}
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
// First writer is with a database-provided transaction, so that NIDs are assigned
|
||||
// globally outside of the updater context, to help avoid races.
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
// TODO: Here we should aim to have two different code paths for new rooms
|
||||
// vs existing ones.
|
||||
|
||||
|
|
@ -611,11 +613,11 @@ func (d *Database) storeEvent(
|
|||
return fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err)
|
||||
}
|
||||
|
||||
if roomNID, err = d.assignRoomNID(ctx, event.RoomID(), roomVersion); err != nil {
|
||||
if roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion); err != nil {
|
||||
return fmt.Errorf("d.assignRoomNID: %w", err)
|
||||
}
|
||||
|
||||
if eventTypeNID, err = d.assignEventTypeNID(ctx, event.Type()); err != nil {
|
||||
if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, event.Type()); err != nil {
|
||||
return fmt.Errorf("d.assignEventTypeNID: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -623,11 +625,19 @@ func (d *Database) storeEvent(
|
|||
// Assigned a numeric ID for the state_key if there is one present.
|
||||
// Otherwise set the numeric ID for the state_key to 0.
|
||||
if eventStateKey != nil {
|
||||
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, *eventStateKey); err != nil {
|
||||
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil {
|
||||
return fmt.Errorf("d.assignStateKeyNID: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
|
||||
}
|
||||
// Second writer is using the database-provided transaction, probably from the
|
||||
// room updater, for easy roll-back if required.
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
if eventNID, stateNID, err = d.EventsTable.InsertEvent(
|
||||
ctx,
|
||||
txn,
|
||||
|
|
@ -749,48 +759,48 @@ func (d *Database) MissingAuthPrevEvents(
|
|||
}
|
||||
|
||||
func (d *Database) assignRoomNID(
|
||||
ctx context.Context, roomID string, roomVersion gomatrixserverlib.RoomVersion,
|
||||
ctx context.Context, txn *sql.Tx, roomID string, roomVersion gomatrixserverlib.RoomVersion,
|
||||
) (types.RoomNID, error) {
|
||||
// Check if we already have a numeric ID in the database.
|
||||
roomNID, err := d.RoomsTable.SelectRoomNID(ctx, nil, roomID)
|
||||
roomNID, err := d.RoomsTable.SelectRoomNID(ctx, txn, roomID)
|
||||
if err == sql.ErrNoRows {
|
||||
// We don't have a numeric ID so insert one into the database.
|
||||
roomNID, err = d.RoomsTable.InsertRoomNID(ctx, nil, roomID, roomVersion)
|
||||
roomNID, err = d.RoomsTable.InsertRoomNID(ctx, txn, roomID, roomVersion)
|
||||
if err == sql.ErrNoRows {
|
||||
// We raced with another insert so run the select again.
|
||||
roomNID, err = d.RoomsTable.SelectRoomNID(ctx, nil, roomID)
|
||||
roomNID, err = d.RoomsTable.SelectRoomNID(ctx, txn, roomID)
|
||||
}
|
||||
}
|
||||
return roomNID, err
|
||||
}
|
||||
|
||||
func (d *Database) assignEventTypeNID(
|
||||
ctx context.Context, eventType string,
|
||||
ctx context.Context, txn *sql.Tx, eventType string,
|
||||
) (types.EventTypeNID, error) {
|
||||
// Check if we already have a numeric ID in the database.
|
||||
eventTypeNID, err := d.EventTypesTable.SelectEventTypeNID(ctx, nil, eventType)
|
||||
eventTypeNID, err := d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType)
|
||||
if err == sql.ErrNoRows {
|
||||
// We don't have a numeric ID so insert one into the database.
|
||||
eventTypeNID, err = d.EventTypesTable.InsertEventTypeNID(ctx, nil, eventType)
|
||||
eventTypeNID, err = d.EventTypesTable.InsertEventTypeNID(ctx, txn, eventType)
|
||||
if err == sql.ErrNoRows {
|
||||
// We raced with another insert so run the select again.
|
||||
eventTypeNID, err = d.EventTypesTable.SelectEventTypeNID(ctx, nil, eventType)
|
||||
eventTypeNID, err = d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType)
|
||||
}
|
||||
}
|
||||
return eventTypeNID, err
|
||||
}
|
||||
|
||||
func (d *Database) assignStateKeyNID(
|
||||
ctx context.Context, eventStateKey string,
|
||||
ctx context.Context, txn *sql.Tx, eventStateKey string,
|
||||
) (types.EventStateKeyNID, error) {
|
||||
// Check if we already have a numeric ID in the database.
|
||||
eventStateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, eventStateKey)
|
||||
eventStateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey)
|
||||
if err == sql.ErrNoRows {
|
||||
// We don't have a numeric ID so insert one into the database.
|
||||
eventStateKeyNID, err = d.EventStateKeysTable.InsertEventStateKeyNID(ctx, nil, eventStateKey)
|
||||
eventStateKeyNID, err = d.EventStateKeysTable.InsertEventStateKeyNID(ctx, txn, eventStateKey)
|
||||
if err == sql.ErrNoRows {
|
||||
// We raced with another insert so run the select again.
|
||||
eventStateKeyNID, err = d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, eventStateKey)
|
||||
eventStateKeyNID, err = d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey)
|
||||
}
|
||||
}
|
||||
return eventStateKeyNID, err
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package sqlite3
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -61,20 +62,8 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
|
|||
|
||||
// Special case, since this migration uses several tables, so it needs to
|
||||
// be sure that all tables are created first.
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column event_nid was removed from the table
|
||||
var eventNID int
|
||||
err = db.QueryRow("SELECT event_nid FROM roomserver_state_block LIMIT 1;").Scan(&eventNID)
|
||||
if err == nil {
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: "roomserver: state blocks refactor",
|
||||
Up: deltas.UpStateBlocksRefactor,
|
||||
})
|
||||
if err = m.Up(base.Context()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = executeMigration(base.Context(), db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Then prepare the statements. Now that the migrations have run, any columns referred
|
||||
|
|
@ -86,6 +75,31 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
|
|||
return &d, nil
|
||||
}
|
||||
|
||||
func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column event_nid was removed from the table
|
||||
migrationName := "roomserver: state blocks refactor"
|
||||
|
||||
var cName string
|
||||
err := db.QueryRowContext(ctx, `SELECT p.name FROM sqlite_master AS m JOIN pragma_table_info(m.name) AS p WHERE m.name = 'roomserver_state_block' AND p.name = 'event_nid'`).Scan(&cName)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: migrationName,
|
||||
Up: deltas.UpStateBlocksRefactor,
|
||||
})
|
||||
return m.Up(ctx)
|
||||
}
|
||||
|
||||
func (d *Database) create(db *sql.DB) error {
|
||||
if err := CreateEventStateKeysTable(db); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -59,6 +59,9 @@ func (f *Fulltext) Defaults(opts DefaultOpts) {
|
|||
}
|
||||
|
||||
func (f *Fulltext) Verify(configErrs *ConfigErrors, isMonolith bool) {
|
||||
if !f.Enabled {
|
||||
return
|
||||
}
|
||||
checkNotEmpty(configErrs, "syncapi.fulltext.index_path", string(f.IndexPath))
|
||||
checkNotEmpty(configErrs, "syncapi.fulltext.language", f.Language)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,16 +19,17 @@ import (
|
|||
"encoding/json"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
|
||||
|
|
@ -79,16 +80,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
|||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message")
|
||||
return true
|
||||
}
|
||||
if domain != s.serverName {
|
||||
log.Tracef("ignoring send-to-device event with destination %s", domain)
|
||||
return true
|
||||
}
|
||||
|
||||
var output types.OutputSendToDeviceEvent
|
||||
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("output log: message parse failure")
|
||||
log.WithError(err).Errorf("send-to-device: message parse failure")
|
||||
sentry.CaptureException(err)
|
||||
return true
|
||||
}
|
||||
|
|
@ -105,7 +108,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
|||
)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).Errorf("failed to store send-to-device message")
|
||||
log.WithError(err).Errorf("send-to-device: failed to store message")
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -111,8 +111,8 @@ const selectEventsWithEventIDsSQL = "" +
|
|||
|
||||
const selectSharedUsersSQL = "" +
|
||||
"SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" +
|
||||
" SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
|
||||
") AND state_key = ANY($2) AND membership IN ('join', 'invite');"
|
||||
" SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
|
||||
") AND type = 'm.room.member' AND state_key = ANY($2) AND membership IN ('join', 'invite');"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
|
@ -61,10 +62,10 @@ func NewPostgresIgnoresTable(db *sql.DB) (tables.Ignores, error) {
|
|||
}
|
||||
|
||||
func (s *ignoresStatements) SelectIgnores(
|
||||
ctx context.Context, userID string,
|
||||
ctx context.Context, txn *sql.Tx, userID string,
|
||||
) (*types.IgnoredUsers, error) {
|
||||
var ignoresData []byte
|
||||
err := s.selectIgnoresStmt.QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
err := sqlutil.TxStmt(txn, s.selectIgnoresStmt).QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -76,12 +77,12 @@ func (s *ignoresStatements) SelectIgnores(
|
|||
}
|
||||
|
||||
func (s *ignoresStatements) UpsertIgnores(
|
||||
ctx context.Context, userID string, ignores *types.IgnoredUsers,
|
||||
ctx context.Context, txn *sql.Tx, userID string, ignores *types.IgnoredUsers,
|
||||
) error {
|
||||
ignoresJSON, err := json.Marshal(ignores)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.upsertIgnoresStmt.ExecContext(ctx, userID, ignoresJSON)
|
||||
_, err = sqlutil.TxStmt(txn, s.upsertIgnoresStmt).ExecContext(ctx, userID, ignoresJSON)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,13 +70,13 @@ const selectUserUnreadNotificationCountsSQL = `SELECT
|
|||
|
||||
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
|
||||
|
||||
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
||||
err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
|
||||
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
||||
err = sqlutil.TxStmt(txn, r.upsertRoomUnreadCounts).QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
|
||||
return
|
||||
}
|
||||
|
||||
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
|
||||
rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl)
|
||||
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
|
||||
rows, err := sqlutil.TxStmt(txn, r.selectUserUnreadCounts).QueryContext(ctx, userID, fromExcl, toIncl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -101,8 +101,8 @@ func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context,
|
|||
return roomCounts, rows.Err()
|
||||
}
|
||||
|
||||
func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) {
|
||||
func (r *notificationDataStatements) SelectMaxID(ctx context.Context, txn *sql.Tx) (int64, error) {
|
||||
var id int64
|
||||
err := r.selectMaxID.QueryRowContext(ctx).Scan(&id)
|
||||
err := sqlutil.TxStmt(txn, r.selectMaxID).QueryRowContext(ctx).Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.S
|
|||
}
|
||||
|
||||
func (d *Database) MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error) {
|
||||
id, err := d.NotificationData.SelectMaxID(ctx)
|
||||
id, err := d.NotificationData.SelectMaxID(ctx, nil)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("d.NotificationData.SelectMaxID: %w", err)
|
||||
}
|
||||
|
|
@ -1029,15 +1029,15 @@ func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, stream
|
|||
}
|
||||
|
||||
func (d *Database) UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
||||
err = d.Writer.Do(nil, nil, func(_ *sql.Tx) error {
|
||||
pos, err = d.NotificationData.UpsertRoomUnreadCounts(ctx, userID, roomID, notificationCount, highlightCount)
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
pos, err = d.NotificationData.UpsertRoomUnreadCounts(ctx, txn, userID, roomID, notificationCount, highlightCount)
|
||||
return err
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (d *Database) GetUserUnreadNotificationCounts(ctx context.Context, userID string, from, to types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
|
||||
return d.NotificationData.SelectUserUnreadCounts(ctx, userID, from, to)
|
||||
return d.NotificationData.SelectUserUnreadCounts(ctx, nil, userID, from, to)
|
||||
}
|
||||
|
||||
func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
|
||||
|
|
@ -1052,15 +1052,23 @@ func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID s
|
|||
}
|
||||
|
||||
func (d *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) {
|
||||
return d.Ignores.SelectIgnores(ctx, userID)
|
||||
return d.Ignores.SelectIgnores(ctx, nil, userID)
|
||||
}
|
||||
|
||||
func (d *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error {
|
||||
return d.Ignores.UpsertIgnores(ctx, userID, ignores)
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.Ignores.UpsertIgnores(ctx, txn, userID, ignores)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||
return d.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
|
||||
var pos types.StreamPosition
|
||||
var err error
|
||||
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
pos, err = d.Presence.UpsertPresence(ctx, txn, userID, statusMsg, presence, lastActiveTS, fromSync)
|
||||
return nil
|
||||
})
|
||||
return pos, err
|
||||
}
|
||||
|
||||
func (d *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
||||
|
|
|
|||
|
|
@ -95,8 +95,8 @@ const selectEventsWithEventIDsSQL = "" +
|
|||
|
||||
const selectSharedUsersSQL = "" +
|
||||
"SELECT state_key FROM syncapi_current_room_state WHERE room_id IN(" +
|
||||
" SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
|
||||
") AND state_key IN ($2) AND membership IN ('join', 'invite');"
|
||||
" SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
|
||||
") AND type = 'm.room.member' AND state_key IN ($2) AND membership IN ('join', 'invite');"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
db *sql.DB
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
|
@ -61,10 +62,10 @@ func NewSqliteIgnoresTable(db *sql.DB) (tables.Ignores, error) {
|
|||
}
|
||||
|
||||
func (s *ignoresStatements) SelectIgnores(
|
||||
ctx context.Context, userID string,
|
||||
ctx context.Context, txn *sql.Tx, userID string,
|
||||
) (*types.IgnoredUsers, error) {
|
||||
var ignoresData []byte
|
||||
err := s.selectIgnoresStmt.QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
err := sqlutil.TxStmt(txn, s.selectIgnoresStmt).QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -76,12 +77,12 @@ func (s *ignoresStatements) SelectIgnores(
|
|||
}
|
||||
|
||||
func (s *ignoresStatements) UpsertIgnores(
|
||||
ctx context.Context, userID string, ignores *types.IgnoredUsers,
|
||||
ctx context.Context, txn *sql.Tx, userID string, ignores *types.IgnoredUsers,
|
||||
) error {
|
||||
ignoresJSON, err := json.Marshal(ignores)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.upsertIgnoresStmt.ExecContext(ctx, userID, ignoresJSON)
|
||||
_, err = sqlutil.TxStmt(txn, s.upsertIgnoresStmt).ExecContext(ctx, userID, ignoresJSON)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ const selectUserUnreadNotificationCountsSQL = `SELECT
|
|||
|
||||
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
|
||||
|
||||
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
||||
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
||||
pos, err = r.streamIDStatements.nextNotificationID(ctx, nil)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
@ -81,8 +81,8 @@ func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context,
|
|||
return
|
||||
}
|
||||
|
||||
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
|
||||
rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl)
|
||||
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
|
||||
rows, err := sqlutil.TxStmt(txn, r.selectUserUnreadCounts).QueryContext(ctx, userID, fromExcl, toIncl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -107,8 +107,8 @@ func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context,
|
|||
return roomCounts, rows.Err()
|
||||
}
|
||||
|
||||
func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) {
|
||||
func (r *notificationDataStatements) SelectMaxID(ctx context.Context, txn *sql.Tx) (int64, error) {
|
||||
var id int64
|
||||
err := r.selectMaxID.QueryRowContext(ctx).Scan(&id)
|
||||
err := sqlutil.TxStmt(txn, r.selectMaxID).QueryRowContext(ctx).Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -189,14 +189,14 @@ type Memberships interface {
|
|||
}
|
||||
|
||||
type NotificationData interface {
|
||||
UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
|
||||
SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
|
||||
SelectMaxID(ctx context.Context) (int64, error)
|
||||
UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
|
||||
SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
|
||||
SelectMaxID(ctx context.Context, txn *sql.Tx) (int64, error)
|
||||
}
|
||||
|
||||
type Ignores interface {
|
||||
SelectIgnores(ctx context.Context, userID string) (*types.IgnoredUsers, error)
|
||||
UpsertIgnores(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
|
||||
SelectIgnores(ctx context.Context, txn *sql.Tx, userID string) (*types.IgnoredUsers, error)
|
||||
UpsertIgnores(ctx context.Context, txn *sql.Tx, userID string, ignores *types.IgnoredUsers) error
|
||||
}
|
||||
|
||||
type Presence interface {
|
||||
|
|
|
|||
|
|
@ -624,9 +624,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
|
|||
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
|
||||
for i := 0; i < tc.sendMessagesCount; i++ {
|
||||
msgCounter++
|
||||
msg := map[string]string{
|
||||
"dummy": fmt.Sprintf("message %d", msgCounter),
|
||||
}
|
||||
msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
|
||||
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
|
||||
t.Fatalf("unable to send to device message: %v", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import (
|
|||
type OutputRoomEventConsumer struct {
|
||||
ctx context.Context
|
||||
cfg *config.UserAPI
|
||||
userAPI api.UserInternalAPI
|
||||
rsAPI rsapi.UserRoomserverAPI
|
||||
jetstream nats.JetStreamContext
|
||||
durable string
|
||||
|
|
@ -45,7 +44,6 @@ func NewOutputRoomEventConsumer(
|
|||
js nats.JetStreamContext,
|
||||
store storage.Database,
|
||||
pgClient pushgateway.Client,
|
||||
userAPI api.UserInternalAPI,
|
||||
rsAPI rsapi.UserRoomserverAPI,
|
||||
syncProducer *producers.SyncAPI,
|
||||
) *OutputRoomEventConsumer {
|
||||
|
|
@ -57,7 +55,6 @@ func NewOutputRoomEventConsumer(
|
|||
durable: cfg.Matrix.JetStream.Durable("UserAPIRoomServerConsumer"),
|
||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
|
||||
pgClient: pgClient,
|
||||
userAPI: userAPI,
|
||||
rsAPI: rsAPI,
|
||||
syncProducer: syncProducer,
|
||||
}
|
||||
|
|
@ -308,7 +305,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatr
|
|||
"event_id": event.EventID(),
|
||||
"room_id": event.RoomID(),
|
||||
"localpart": mem.Localpart,
|
||||
}).Tracef("Push rule evaluation rejected the event")
|
||||
}).Debugf("Push rule evaluation rejected the event")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -425,8 +422,8 @@ func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *
|
|||
return nil, fmt.Errorf("user %s is ignored", sender)
|
||||
}
|
||||
}
|
||||
var res api.QueryPushRulesResponse
|
||||
if err = s.userAPI.QueryPushRules(ctx, &api.QueryPushRulesRequest{UserID: mem.UserID}, &res); err != nil {
|
||||
ruleSets, err := s.db.QueryPushRules(ctx, mem.Localpart)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
@ -437,7 +434,7 @@ func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *
|
|||
roomID: event.RoomID(),
|
||||
roomSize: roomSize,
|
||||
}
|
||||
eval := pushrules.NewRuleSetEvaluator(ec, &res.RuleSets.Global)
|
||||
eval := pushrules.NewRuleSetEvaluator(ec, &ruleSets.Global)
|
||||
rule, err := eval.MatchEvent(event.Event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
129
userapi/consumers/roomserver_test.go
Normal file
129
userapi/consumers/roomserver_test.go
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
package consumers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/pushrules"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/matrix-org/dendrite/userapi/storage"
|
||||
)
|
||||
|
||||
func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
|
||||
t.Helper()
|
||||
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||
db, err := storage.NewUserAPIDatabase(nil, &config.DatabaseOptions{
|
||||
ConnectionString: config.DataSource(connStr),
|
||||
}, "", 4, 0, 0, "")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new user db: %v", err)
|
||||
}
|
||||
return db, close
|
||||
}
|
||||
|
||||
func mustCreateEvent(t *testing.T, content string) *gomatrixserverlib.HeaderedEvent {
|
||||
t.Helper()
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON([]byte(content), false, gomatrixserverlib.RoomVersionV10)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create event: %v", err)
|
||||
}
|
||||
return ev.Headered(gomatrixserverlib.RoomVersionV10)
|
||||
}
|
||||
|
||||
func Test_evaluatePushRules(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
db, close := mustCreateDatabase(t, dbType)
|
||||
defer close()
|
||||
consumer := OutputRoomEventConsumer{db: db}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
eventContent string
|
||||
wantAction pushrules.ActionKind
|
||||
wantActions []*pushrules.Action
|
||||
wantNotify bool
|
||||
}{
|
||||
{
|
||||
name: "m.receipt doesn't notify",
|
||||
eventContent: `{"type":"m.receipt"}`,
|
||||
wantAction: pushrules.UnknownAction,
|
||||
wantActions: nil,
|
||||
},
|
||||
{
|
||||
name: "m.reaction doesn't notify",
|
||||
eventContent: `{"type":"m.reaction"}`,
|
||||
wantAction: pushrules.DontNotifyAction,
|
||||
wantActions: []*pushrules.Action{
|
||||
{
|
||||
Kind: pushrules.DontNotifyAction,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "m.room.message notifies",
|
||||
eventContent: `{"type":"m.room.message"}`,
|
||||
wantNotify: true,
|
||||
wantAction: pushrules.NotifyAction,
|
||||
wantActions: []*pushrules.Action{
|
||||
{Kind: pushrules.NotifyAction},
|
||||
{
|
||||
Kind: pushrules.SetTweakAction,
|
||||
Tweak: pushrules.HighlightTweak,
|
||||
Value: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "m.room.message highlights",
|
||||
eventContent: `{"type":"m.room.message", "content": {"body": "test"} }`,
|
||||
wantNotify: true,
|
||||
wantAction: pushrules.NotifyAction,
|
||||
wantActions: []*pushrules.Action{
|
||||
{Kind: pushrules.NotifyAction},
|
||||
{
|
||||
Kind: pushrules.SetTweakAction,
|
||||
Tweak: pushrules.SoundTweak,
|
||||
Value: "default",
|
||||
},
|
||||
{
|
||||
Kind: pushrules.SetTweakAction,
|
||||
Tweak: pushrules.HighlightTweak,
|
||||
Value: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
actions, err := consumer.evaluatePushRules(ctx, mustCreateEvent(t, tc.eventContent), &localMembership{
|
||||
UserID: "@test:localhost",
|
||||
Localpart: "test",
|
||||
Domain: "localhost",
|
||||
}, 10)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to evaluate push rules: %v", err)
|
||||
}
|
||||
assert.Equal(t, tc.wantActions, actions)
|
||||
gotAction, _, err := pushrules.ActionsToTweaks(actions)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get actions: %v", err)
|
||||
}
|
||||
if gotAction != tc.wantAction {
|
||||
t.Fatalf("expected action to be '%s', got '%s'", tc.wantAction, gotAction)
|
||||
}
|
||||
// this is taken from `notifyLocal`
|
||||
if tc.wantNotify && gotAction != pushrules.NotifyAction && gotAction != pushrules.CoalesceAction {
|
||||
t.Fatalf("expected to notify but didn't")
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -30,7 +30,6 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/clientapi/userutil"
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/internal/pushrules"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
|
@ -760,57 +759,15 @@ func (a *UserInternalAPI) PerformPushRulesPut(
|
|||
}
|
||||
|
||||
func (a *UserInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error {
|
||||
userReq := api.QueryAccountDataRequest{
|
||||
UserID: req.UserID,
|
||||
DataType: pushRulesAccountDataType,
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to split user ID %q for push rules", req.UserID)
|
||||
}
|
||||
var userRes api.QueryAccountDataResponse
|
||||
if err := a.QueryAccountData(ctx, &userReq, &userRes); err != nil {
|
||||
return err
|
||||
pushRules, err := a.DB.QueryPushRules(ctx, localpart)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query push rules: %w", err)
|
||||
}
|
||||
bs, ok := userRes.GlobalAccountData[pushRulesAccountDataType]
|
||||
if ok {
|
||||
// Legacy Dendrite users will have completely empty push rules, so we should
|
||||
// detect that situation and set some defaults.
|
||||
var rules struct {
|
||||
G struct {
|
||||
Content []json.RawMessage `json:"content"`
|
||||
Override []json.RawMessage `json:"override"`
|
||||
Room []json.RawMessage `json:"room"`
|
||||
Sender []json.RawMessage `json:"sender"`
|
||||
Underride []json.RawMessage `json:"underride"`
|
||||
} `json:"global"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(bs), &rules); err == nil {
|
||||
count := len(rules.G.Content) + len(rules.G.Override) +
|
||||
len(rules.G.Room) + len(rules.G.Sender) + len(rules.G.Underride)
|
||||
ok = count > 0
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
// If we didn't find any default push rules then we should just generate some
|
||||
// fresh ones.
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to split user ID %q for push rules", req.UserID)
|
||||
}
|
||||
pushRuleSets := pushrules.DefaultAccountRuleSets(localpart, a.ServerName)
|
||||
prbs, err := json.Marshal(pushRuleSets)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal default push rules: %w", err)
|
||||
}
|
||||
if err := a.DB.SaveAccountData(ctx, localpart, "", pushRulesAccountDataType, json.RawMessage(prbs)); err != nil {
|
||||
return fmt.Errorf("failed to save default push rules: %w", err)
|
||||
}
|
||||
res.RuleSets = pushRuleSets
|
||||
return nil
|
||||
}
|
||||
var data pushrules.AccountRuleSets
|
||||
if err := json.Unmarshal([]byte(bs), &data); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("json.Unmarshal of push rules failed")
|
||||
return err
|
||||
}
|
||||
res.RuleSets = &data
|
||||
res.RuleSets = pushRules
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import (
|
|||
"errors"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/internal/pushrules"
|
||||
"github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/userapi/types"
|
||||
|
|
@ -53,6 +54,7 @@ type AccountData interface {
|
|||
// If no account data could be found, returns nil
|
||||
// Returns an error if there was an issue with the retrieval
|
||||
GetAccountDataByType(ctx context.Context, localpart, roomID, dataType string) (data json.RawMessage, err error)
|
||||
QueryPushRules(ctx context.Context, localpart string) (*pushrules.AccountRuleSets, error)
|
||||
}
|
||||
|
||||
type Device interface {
|
||||
|
|
|
|||
|
|
@ -178,6 +178,41 @@ func (d *Database) createAccount(
|
|||
return account, nil
|
||||
}
|
||||
|
||||
func (d *Database) QueryPushRules(
|
||||
ctx context.Context,
|
||||
localpart string,
|
||||
) (*pushrules.AccountRuleSets, error) {
|
||||
data, err := d.AccountDatas.SelectAccountDataByType(ctx, localpart, "", "m.push_rules")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If we didn't find any default push rules then we should just generate some
|
||||
// fresh ones.
|
||||
if len(data) == 0 {
|
||||
pushRuleSets := pushrules.DefaultAccountRuleSets(localpart, d.ServerName)
|
||||
prbs, err := json.Marshal(pushRuleSets)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal default push rules: %w", err)
|
||||
}
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if dbErr := d.AccountDatas.InsertAccountData(ctx, txn, localpart, "", "m.push_rules", prbs); dbErr != nil {
|
||||
return fmt.Errorf("failed to save default push rules: %w", dbErr)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return pushRuleSets, err
|
||||
}
|
||||
|
||||
var pushRules pushrules.AccountRuleSets
|
||||
if err := json.Unmarshal(data, &pushRules); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pushRules, nil
|
||||
}
|
||||
|
||||
// SaveAccountData saves new account data for a given user and a given room.
|
||||
// If the account data is not specific to a room, the room ID should be an empty string
|
||||
// If an account data already exists for a given set (user, room, data type), it will
|
||||
|
|
@ -700,7 +735,9 @@ func (d *Database) GetRoomNotificationCounts(ctx context.Context, localpart, roo
|
|||
}
|
||||
|
||||
func (d *Database) DeleteOldNotifications(ctx context.Context) error {
|
||||
return d.Notifications.Clean(ctx, nil)
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.Notifications.Clean(ctx, txn)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) UpsertPusher(
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ func NewInternalAPI(
|
|||
}
|
||||
|
||||
eventConsumer := consumers.NewOutputRoomEventConsumer(
|
||||
base.ProcessContext, cfg, js, db, pgClient, userAPI, rsAPI, syncProducer,
|
||||
base.ProcessContext, cfg, js, db, pgClient, rsAPI, syncProducer,
|
||||
)
|
||||
if err := eventConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panic("failed to start user API streamed event consumer")
|
||||
|
|
|
|||
Loading…
Reference in a new issue