mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-11 16:13:10 -06:00
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/fts
This commit is contained in:
commit
ba8e842a72
35
CHANGES.md
35
CHANGES.md
|
|
@ -1,5 +1,40 @@
|
|||
# Changelog
|
||||
|
||||
## Dendrite 0.9.8 (2022-09-12)
|
||||
|
||||
### Important
|
||||
|
||||
* This is a **security release** to fix a vulnerability where missing events retrieved from other servers did not have their signatures verified in all cases, affecting all versions of Dendrite before 0.9.8. Upgrading to this version is highly recommended. For more information, [see here](https://github.com/matrix-org/dendrite/security/advisories/GHSA-pfw4-xjgm-267c).
|
||||
|
||||
### Features
|
||||
|
||||
* The built-in NATS Server has been updated to the final 2.9.0 release version
|
||||
|
||||
### Fixes
|
||||
|
||||
* Dendrite will now correctly verify the signatures of events retrieved using `/get_missing_events`
|
||||
|
||||
## Dendrite 0.9.7 (2022-09-09)
|
||||
|
||||
### Features
|
||||
|
||||
* Initial supporting code to enable full-text search has been merged (although not ready for use yet)
|
||||
* Newly created rooms now have higher default power levels for enabling encryption, setting server ACLs or sending tombstone events
|
||||
* Incoming signing key updates over federation are now queued in JetStream for processing, so that they cannot be dropped accidentally
|
||||
|
||||
### Fixes
|
||||
|
||||
* A race condition between the roomserver output events being generated, forward extremities being updated and room info being updated has been fixed
|
||||
* Appservices will no longer receive invite events which they are not interested in, which caused heavy load in some cases or excessive request sizes in others
|
||||
* A bug in state resolution v2 where events could incorrectly be classified as control events has been fixed
|
||||
* A bug in state resolution v2 where some specific events with unexpected non-empty state keys are dropped has been fixed
|
||||
* A bug in state resolution v2 when fetching auth events vs partial state has been fixed
|
||||
* Stale device lists should now be handled correctly for all user IDs, which may help with E2EE reliability
|
||||
* A number of database writer issues have been fixed in the user API and sync API, which should help to reduce `database is locked` errors with SQLite databases
|
||||
* Database migrations should now be detected more reliably to prevent unexpected errors at startup
|
||||
* A number of minor database transaction issues have been fixed, particularly for assigning NIDs in the roomserver, cleaning up device keys and cleaning up notifications
|
||||
* The database query for finding shared users in the sync API has been optimised, using significantly less CPU time as a result
|
||||
|
||||
## Dendrite 0.9.6 (2022-09-01)
|
||||
|
||||
### Features
|
||||
|
|
|
|||
|
|
@ -218,6 +218,9 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -233,8 +236,13 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
var err error
|
||||
m.listener, err = net.Listen("tcp", "localhost:65432")
|
||||
if err != nil {
|
||||
|
|
@ -247,7 +255,7 @@ func (m *DendriteMonolith) Start() {
|
|||
m.logger.SetOutput(BindLogger{})
|
||||
logrus.SetOutput(BindLogger{})
|
||||
|
||||
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
||||
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
|
||||
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
|
||||
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
|
||||
m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil)
|
||||
|
|
|
|||
|
|
@ -87,6 +87,9 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -103,8 +106,13 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
var err error
|
||||
m.listener, err = net.Listen("tcp", "localhost:65432")
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -21,12 +21,13 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
)
|
||||
|
||||
// SyncAPIProducer produces events for the sync API server to consume
|
||||
|
|
@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
|||
|
||||
func (p *SyncAPIProducer) SendToDevice(
|
||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||
message interface{},
|
||||
message json.RawMessage,
|
||||
) error {
|
||||
devices := []string{}
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
|
|
@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
devices = append(devices, deviceID)
|
||||
}
|
||||
|
||||
js, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"user_id": userID,
|
||||
"num_devices": len(devices),
|
||||
"type": eventType,
|
||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||
for _, device := range devices {
|
||||
for i, device := range devices {
|
||||
ote := &types.OutputSendToDeviceEvent{
|
||||
UserID: userID,
|
||||
DeviceID: device,
|
||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||
Sender: sender,
|
||||
Type: eventType,
|
||||
Content: js,
|
||||
Content: message,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||
return err
|
||||
}
|
||||
m := &nats.Msg{
|
||||
Subject: p.TopicSendToDeviceEvent,
|
||||
Data: eventJSON,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||
m.Data = eventJSON
|
||||
m.Header.Set("sender", sender)
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
|
||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
||||
if i < len(devices)-1 {
|
||||
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||
continue
|
||||
}
|
||||
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -98,6 +98,9 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -114,7 +117,13 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
cfg.Defaults(config.DefaultOpts{
|
||||
Generate: true,
|
||||
Monolithic: true,
|
||||
|
|
@ -136,14 +145,13 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
||||
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
|
||||
|
||||
base := base.NewBaseDendrite(cfg, "Monolith")
|
||||
defer base.Close() // nolint: errcheck
|
||||
|
||||
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
||||
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
|
||||
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
|
||||
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
|
||||
pManager := pineconeConnections.NewConnectionManager(pRouter, nil)
|
||||
|
|
|
|||
|
|
@ -86,6 +86,9 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -102,8 +105,13 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
// use custom config if config flag is set
|
||||
if configFlagSet {
|
||||
cfg = setup.ParseFlags(true)
|
||||
|
|
@ -129,7 +137,6 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
||||
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -81,11 +82,14 @@ func runTests(baseURL, branchName string) error {
|
|||
client: users[1].client, text: "4: " + branchName,
|
||||
},
|
||||
}
|
||||
wantEventIDs := make(map[string]struct{}, 8)
|
||||
for _, msg := range msgs {
|
||||
_, err = msg.client.SendText(dmRoomID, msg.text)
|
||||
var resp *gomatrix.RespSendEvent
|
||||
resp, err = msg.client.SendText(dmRoomID, msg.text)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send text in dm room: %s", err)
|
||||
}
|
||||
wantEventIDs[resp.EventID] = struct{}{}
|
||||
}
|
||||
|
||||
// attempt to create/join the shared public room
|
||||
|
|
@ -113,11 +117,48 @@ func runTests(baseURL, branchName string) error {
|
|||
}
|
||||
// send messages
|
||||
for _, msg := range msgs {
|
||||
_, err = msg.client.SendText(publicRoomID, "public "+msg.text)
|
||||
resp, err := msg.client.SendText(publicRoomID, "public "+msg.text)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send text in public room: %s", err)
|
||||
}
|
||||
wantEventIDs[resp.EventID] = struct{}{}
|
||||
}
|
||||
|
||||
// Sync until we have all expected messages
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
syncClient := users[0].client
|
||||
since := ""
|
||||
for len(wantEventIDs) > 0 {
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
syncResp, err := syncClient.SyncRequest(1000, since, "1", false, "")
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, room := range syncResp.Rooms.Join {
|
||||
for _, ev := range room.Timeline.Events {
|
||||
if ev.Type != "m.room.message" {
|
||||
continue
|
||||
}
|
||||
delete(wantEventIDs, ev.ID)
|
||||
}
|
||||
}
|
||||
since = syncResp.NextBatch
|
||||
}
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(time.Second * 10):
|
||||
close(doneCh)
|
||||
return fmt.Errorf("failed to receive all expected messages: %+v", wantEventIDs)
|
||||
case <-doneCh:
|
||||
}
|
||||
|
||||
log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID)
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
|||
|
||||
func (p *SyncAPIProducer) SendToDevice(
|
||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||
message interface{},
|
||||
message json.RawMessage,
|
||||
) error {
|
||||
devices := []string{}
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
|
|
@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
devices = append(devices, deviceID)
|
||||
}
|
||||
|
||||
js, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"user_id": userID,
|
||||
"num_devices": len(devices),
|
||||
"type": eventType,
|
||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||
for _, device := range devices {
|
||||
for i, device := range devices {
|
||||
ote := &types.OutputSendToDeviceEvent{
|
||||
UserID: userID,
|
||||
DeviceID: device,
|
||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||
Sender: sender,
|
||||
Type: eventType,
|
||||
Content: js,
|
||||
Content: message,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||
return err
|
||||
}
|
||||
m := &nats.Msg{
|
||||
Subject: p.TopicSendToDeviceEvent,
|
||||
Data: eventJSON,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||
m.Data = eventJSON
|
||||
m.Header.Set("sender", sender)
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
|
||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
||||
if i < len(devices)-1 {
|
||||
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||
continue
|
||||
}
|
||||
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
16
go.mod
16
go.mod
|
|
@ -17,16 +17,17 @@ require (
|
|||
github.com/google/uuid v1.3.0
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/kardianos/minwinsvc v1.0.0
|
||||
github.com/lib/pq v1.10.5
|
||||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661
|
||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a
|
||||
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||
github.com/mattn/go-sqlite3 v1.14.13
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995
|
||||
github.com/nats-io/nats-server/v2 v2.9.0
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0
|
||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
||||
github.com/ngrok/sqlmw v0.0.0-20220520173518-97c9c04efc79
|
||||
|
|
@ -42,7 +43,7 @@ require (
|
|||
github.com/uber/jaeger-lib v2.4.1+incompatible
|
||||
github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c
|
||||
go.uber.org/atomic v1.9.0
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
|
||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90
|
||||
golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9
|
||||
golang.org/x/mobile v0.0.0-20220722155234-aaac322e2105
|
||||
golang.org/x/net v0.0.0-20220728211354-c7608f3a8462
|
||||
|
|
@ -92,7 +93,6 @@ require (
|
|||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 // indirect
|
||||
github.com/juju/testing v0.0.0-20220203020004-a0ff61f03494 // indirect
|
||||
github.com/kardianos/minwinsvc v1.0.0 // indirect
|
||||
github.com/klauspost/compress v1.15.9 // indirect
|
||||
github.com/lucas-clemente/quic-go v0.28.1 // indirect
|
||||
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
|
||||
|
|
@ -124,9 +124,9 @@ require (
|
|||
github.com/tidwall/pretty v1.2.0 // indirect
|
||||
go.etcd.io/bbolt v1.3.5 // indirect
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
|
||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d // indirect
|
||||
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 // indirect
|
||||
golang.org/x/text v0.3.8-0.20211004125949-5bd84dd9b33b // indirect
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
|
||||
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
|
||||
golang.org/x/tools v0.1.12 // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
gopkg.in/macaroon.v2 v2.1.0 // indirect
|
||||
|
|
|
|||
27
go.sum
27
go.sum
|
|
@ -388,10 +388,10 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
|
|||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661 h1:dww9rH0HVfAO9JOBD1nxq26GHKbEw07thAJTu1DrAQs=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed h1:YMcCnrmTbT5M1LtTiagiFFaj9vEgvC6iVEzWsIb0tQQ=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a h1:XeGBDZZsUe4kgj3myl0EiuDNVWxszJecMTrON3Wn9sI=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 h1:/AIaqhK1BBi2sMEVQdgZRV8H8sNloAGCgztLZhsPqD0=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||
|
|
@ -429,10 +429,10 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
|
|||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
|
||||
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c h1:U5qngWGZ7E/nQxz0544IpIEdKFUUaOJxQN2LHCYLGhg=
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c/go.mod h1:+f++B/5jpr71JATt7b5KCX+G7bt43iWx1OYWGkpE/Kk=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995 h1:CUcSQR8jwa9//qNgN/t3tW53DObnTPQ/G/K+qnS7yRc=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats-server/v2 v2.9.0 h1:DLWu+7/VgGOoChcDKytnUZPAmudpv7o/MhKmNrnH1RE=
|
||||
github.com/nats-io/nats-server/v2 v2.9.0/go.mod h1:BWKY6217RvhI+FDoOLZ2BH+hOC37xeKRBlQ1Lz7teKI=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0 h1:dPUKD6Iv8M1y9MU8PK6H4a4/12yx5/CbaYWz/Z1arY8=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
|
|
@ -627,8 +627,9 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm
|
|||
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c=
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM=
|
||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
|
|
@ -812,8 +813,8 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220730100132-1609e554cd39/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d h1:Sv5ogFZatcgIMMtBSTTAgMYsicp25MXBubjXNDKwm80=
|
||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 h1:C1tElbkWrsSkn3IRl1GCW/gETw1TywWIPgwZtXTZbYg=
|
||||
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467 h1:CBpWXWQpIRjzmkkA+M7q9Fqnwd2mZr3AFqexg8YTfoM=
|
||||
|
|
@ -831,8 +832,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
|
|||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306 h1:+gHMid33q6pen7kv9xvT+JRinntgeXO2AeZVd0AWD3w=
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 h1:ftMN5LMiBFjbzleLqtoBZk7KdJwhuybIU+FckUHgoyQ=
|
||||
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
|
|
|||
|
|
@ -16,6 +16,12 @@ func mRuleContainsUserNameDefinition(localpart string) *Rule {
|
|||
Default: true,
|
||||
Enabled: true,
|
||||
Pattern: localpart,
|
||||
Conditions: []*Condition{
|
||||
{
|
||||
Kind: EventMatchCondition,
|
||||
Key: "content.body",
|
||||
},
|
||||
},
|
||||
Actions: []*Action{
|
||||
{Kind: NotifyAction},
|
||||
{
|
||||
|
|
|
|||
|
|
@ -7,8 +7,9 @@ func defaultOverrideRules(userID string) []*Rule {
|
|||
mRuleInviteForMeDefinition(userID),
|
||||
&mRuleMemberEventDefinition,
|
||||
&mRuleContainsDisplayNameDefinition,
|
||||
&mRuleTombstoneDefinition,
|
||||
&mRuleRoomNotifDefinition,
|
||||
&mRuleTombstoneDefinition,
|
||||
&mRuleReactionDefinition,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -20,6 +21,7 @@ const (
|
|||
MRuleContainsDisplayName = ".m.rule.contains_display_name"
|
||||
MRuleTombstone = ".m.rule.tombstone"
|
||||
MRuleRoomNotif = ".m.rule.roomnotif"
|
||||
MRuleReaction = ".m.rule.reaction"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -96,7 +98,7 @@ var (
|
|||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
Value: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -120,10 +122,25 @@ var (
|
|||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
Value: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
mRuleReactionDefinition = Rule{
|
||||
RuleID: MRuleReaction,
|
||||
Default: true,
|
||||
Enabled: true,
|
||||
Conditions: []*Condition{
|
||||
{
|
||||
Kind: EventMatchCondition,
|
||||
Key: "type",
|
||||
Pattern: "m.reaction",
|
||||
},
|
||||
},
|
||||
Actions: []*Action{
|
||||
{Kind: DontNotifyAction},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func mRuleInviteForMeDefinition(userID string) *Rule {
|
||||
|
|
|
|||
|
|
@ -10,8 +10,8 @@ const (
|
|||
|
||||
var defaultUnderrideRules = []*Rule{
|
||||
&mRuleCallDefinition,
|
||||
&mRuleEncryptedRoomOneToOneDefinition,
|
||||
&mRuleRoomOneToOneDefinition,
|
||||
&mRuleEncryptedRoomOneToOneDefinition,
|
||||
&mRuleMessageDefinition,
|
||||
&mRuleEncryptedDefinition,
|
||||
}
|
||||
|
|
@ -59,6 +59,11 @@ var (
|
|||
},
|
||||
Actions: []*Action{
|
||||
{Kind: NotifyAction},
|
||||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: SoundTweak,
|
||||
Value: "default",
|
||||
},
|
||||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
|
|
@ -88,6 +93,11 @@ var (
|
|||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
},
|
||||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
mRuleMessageDefinition = Rule{
|
||||
|
|
@ -101,7 +111,14 @@ var (
|
|||
Pattern: "m.room.message",
|
||||
},
|
||||
},
|
||||
Actions: []*Action{{Kind: NotifyAction}},
|
||||
Actions: []*Action{
|
||||
{Kind: NotifyAction},
|
||||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
mRuleEncryptedDefinition = Rule{
|
||||
RuleID: MRuleEncrypted,
|
||||
|
|
@ -114,6 +131,13 @@ var (
|
|||
Pattern: "m.room.encrypted",
|
||||
},
|
||||
},
|
||||
Actions: []*Action{{Kind: NotifyAction}},
|
||||
Actions: []*Action{
|
||||
{Kind: NotifyAction},
|
||||
{
|
||||
Kind: SetTweakAction,
|
||||
Tweak: HighlightTweak,
|
||||
Value: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
|
|
|||
|
|
@ -24,24 +24,28 @@ func TestRuleSetEvaluatorMatchEvent(t *testing.T) {
|
|||
Default: false,
|
||||
Enabled: true,
|
||||
}
|
||||
defaultRuleset := DefaultGlobalRuleSet("test", "test")
|
||||
tsts := []struct {
|
||||
Name string
|
||||
RuleSet RuleSet
|
||||
Want *Rule
|
||||
Event *gomatrixserverlib.Event
|
||||
}{
|
||||
{"empty", RuleSet{}, nil},
|
||||
{"defaultCanWin", RuleSet{Override: []*Rule{defaultEnabled}}, defaultEnabled},
|
||||
{"userWins", RuleSet{Override: []*Rule{defaultEnabled, userEnabled}}, userEnabled},
|
||||
{"defaultOverrideWins", RuleSet{Override: []*Rule{defaultEnabled}, Underride: []*Rule{userEnabled}}, defaultEnabled},
|
||||
{"overrideContent", RuleSet{Override: []*Rule{userEnabled}, Content: []*Rule{userEnabled2}}, userEnabled},
|
||||
{"overrideRoom", RuleSet{Override: []*Rule{userEnabled}, Room: []*Rule{userEnabled2}}, userEnabled},
|
||||
{"overrideSender", RuleSet{Override: []*Rule{userEnabled}, Sender: []*Rule{userEnabled2}}, userEnabled},
|
||||
{"overrideUnderride", RuleSet{Override: []*Rule{userEnabled}, Underride: []*Rule{userEnabled2}}, userEnabled},
|
||||
{"empty", RuleSet{}, nil, ev},
|
||||
{"defaultCanWin", RuleSet{Override: []*Rule{defaultEnabled}}, defaultEnabled, ev},
|
||||
{"userWins", RuleSet{Override: []*Rule{defaultEnabled, userEnabled}}, userEnabled, ev},
|
||||
{"defaultOverrideWins", RuleSet{Override: []*Rule{defaultEnabled}, Underride: []*Rule{userEnabled}}, defaultEnabled, ev},
|
||||
{"overrideContent", RuleSet{Override: []*Rule{userEnabled}, Content: []*Rule{userEnabled2}}, userEnabled, ev},
|
||||
{"overrideRoom", RuleSet{Override: []*Rule{userEnabled}, Room: []*Rule{userEnabled2}}, userEnabled, ev},
|
||||
{"overrideSender", RuleSet{Override: []*Rule{userEnabled}, Sender: []*Rule{userEnabled2}}, userEnabled, ev},
|
||||
{"overrideUnderride", RuleSet{Override: []*Rule{userEnabled}, Underride: []*Rule{userEnabled2}}, userEnabled, ev},
|
||||
{"reactions don't notify", *defaultRuleset, &mRuleReactionDefinition, mustEventFromJSON(t, `{"type":"m.reaction"}`)},
|
||||
{"receipts don't notify", *defaultRuleset, nil, mustEventFromJSON(t, `{"type":"m.receipt"}`)},
|
||||
}
|
||||
for _, tst := range tsts {
|
||||
t.Run(tst.Name, func(t *testing.T) {
|
||||
rse := NewRuleSetEvaluator(nil, &tst.RuleSet)
|
||||
got, err := rse.MatchEvent(ev)
|
||||
rse := NewRuleSetEvaluator(fakeEvaluationContext{3}, &tst.RuleSet)
|
||||
got, err := rse.MatchEvent(tst.Event)
|
||||
if err != nil {
|
||||
t.Fatalf("MatchEvent failed: %v", err)
|
||||
}
|
||||
|
|
@ -128,7 +132,7 @@ func TestConditionMatches(t *testing.T) {
|
|||
}
|
||||
for _, tst := range tsts {
|
||||
t.Run(tst.Name, func(t *testing.T) {
|
||||
got, err := conditionMatches(&tst.Cond, mustEventFromJSON(t, tst.EventJSON), &fakeEvaluationContext{})
|
||||
got, err := conditionMatches(&tst.Cond, mustEventFromJSON(t, tst.EventJSON), &fakeEvaluationContext{2})
|
||||
if err != nil {
|
||||
t.Fatalf("conditionMatches failed: %v", err)
|
||||
}
|
||||
|
|
@ -139,10 +143,10 @@ func TestConditionMatches(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type fakeEvaluationContext struct{}
|
||||
type fakeEvaluationContext struct{ memberCount int }
|
||||
|
||||
func (fakeEvaluationContext) UserDisplayName() string { return "Dear User" }
|
||||
func (fakeEvaluationContext) RoomMemberCount() (int, error) { return 2, nil }
|
||||
func (fakeEvaluationContext) UserDisplayName() string { return "Dear User" }
|
||||
func (f fakeEvaluationContext) RoomMemberCount() (int, error) { return f.memberCount, nil }
|
||||
func (fakeEvaluationContext) HasPowerLevel(userID, levelKey string) (bool, error) {
|
||||
return userID == "@poweruser:example.com" && levelKey == "powerlevel", nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import (
|
|||
// kind and a tweaks map. Returns a nil map if it would have been
|
||||
// empty.
|
||||
func ActionsToTweaks(as []*Action) (ActionKind, map[string]interface{}, error) {
|
||||
var kind ActionKind
|
||||
kind := UnknownAction
|
||||
tweaks := map[string]interface{}{}
|
||||
|
||||
for _, a := range as {
|
||||
|
|
|
|||
|
|
@ -21,8 +21,9 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
)
|
||||
|
||||
const createDBMigrationsSQL = "" +
|
||||
|
|
@ -95,11 +96,11 @@ func (m *Migrator) Up(ctx context.Context) error {
|
|||
for i := range m.migrations {
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
migration := m.migrations[i]
|
||||
logrus.Debugf("Executing database migration '%s'", migration.Version)
|
||||
// Skip migration if it was already executed
|
||||
if _, ok := executedMigrations[migration.Version]; ok {
|
||||
continue
|
||||
}
|
||||
logrus.Debugf("Executing database migration '%s'", migration.Version)
|
||||
err = migration.Up(ctx, txn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to execute migration '%s': %w", migration.Version, err)
|
||||
|
|
@ -140,3 +141,24 @@ func (m *Migrator) ExecutedMigrations(ctx context.Context) (map[string]struct{},
|
|||
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
// InsertMigration creates the migrations table if it doesn't exist and
|
||||
// inserts a migration given their name to the database.
|
||||
// This should only be used when manually inserting migrations.
|
||||
func InsertMigration(ctx context.Context, db *sql.DB, migrationName string) error {
|
||||
_, err := db.ExecContext(ctx, createDBMigrationsSQL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create db_migrations: %w", err)
|
||||
}
|
||||
_, err = db.ExecContext(ctx, insertVersionSQL,
|
||||
migrationName,
|
||||
time.Now().Format(time.RFC3339),
|
||||
internal.VersionString(),
|
||||
)
|
||||
// If the migration was already executed, we'll get a unique constraint error,
|
||||
// return nil instead, to avoid unnecessary logging.
|
||||
if IsUniqueConstraintViolationErr(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,12 +12,27 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build wasm
|
||||
// +build wasm
|
||||
//go:build !wasm
|
||||
// +build !wasm
|
||||
|
||||
package sqlutil
|
||||
|
||||
// IsUniqueConstraintViolationErr no-ops for this architecture
|
||||
import (
|
||||
"github.com/lib/pq"
|
||||
"github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
|
||||
func IsUniqueConstraintViolationErr(err error) bool {
|
||||
switch e := err.(type) {
|
||||
case *pq.Error:
|
||||
return e.Code == "23505"
|
||||
case pq.Error:
|
||||
return e.Code == "23505"
|
||||
case *sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
case sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
@ -12,15 +12,20 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !wasm
|
||||
// +build !wasm
|
||||
//go:build wasm
|
||||
// +build wasm
|
||||
|
||||
package sqlutil
|
||||
|
||||
import "github.com/lib/pq"
|
||||
import "github.com/mattn/go-sqlite3"
|
||||
|
||||
// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error
|
||||
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
|
||||
func IsUniqueConstraintViolationErr(err error) bool {
|
||||
pqErr, ok := err.(*pq.Error)
|
||||
return ok && pqErr.Code == "23505"
|
||||
switch e := err.(type) {
|
||||
case *sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
case sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@ var build string
|
|||
const (
|
||||
VersionMajor = 0
|
||||
VersionMinor = 9
|
||||
VersionPatch = 6
|
||||
VersionPatch = 8
|
||||
VersionTag = "" // example: "rc1"
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ package postgres
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
|
@ -63,30 +63,36 @@ func NewPostgresKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) {
|
|||
return s, err
|
||||
}
|
||||
|
||||
if err = executeMigration(context.Background(), db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column partition was removed from the table
|
||||
var count int
|
||||
err = db.QueryRow("SELECT partition FROM keyserver_key_changes LIMIT 1;").Scan(&count)
|
||||
if err == nil {
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: "keyserver: refactor key changes",
|
||||
Up: deltas.UpRefactorKeyChanges,
|
||||
})
|
||||
return s, m.Up(context.Background())
|
||||
} else {
|
||||
switch e := err.(type) {
|
||||
case *pq.Error:
|
||||
// ignore undefined_column (42703) errors, as this is expected at this point
|
||||
if e.Code != "42703" {
|
||||
return nil, err
|
||||
migrationName := "keyserver: refactor key changes"
|
||||
|
||||
var cName string
|
||||
err := db.QueryRowContext(ctx, "select column_name from information_schema.columns where table_name = 'keyserver_key_changes' AND column_name = 'partition'").Scan(&cName)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
default:
|
||||
return nil, err
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
return s, nil
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: migrationName,
|
||||
Up: deltas.UpRefactorKeyChanges,
|
||||
})
|
||||
|
||||
return m.Up(ctx)
|
||||
}
|
||||
|
||||
func (s *keyChangesStatements) Prepare() (err error) {
|
||||
|
|
|
|||
|
|
@ -158,7 +158,7 @@ func (d *Database) MarkDeviceListStale(ctx context.Context, userID string, isSta
|
|||
// DeleteDeviceKeys removes the device keys for a given user/device, and any accompanying
|
||||
// cross-signing signatures relating to that device.
|
||||
func (d *Database) DeleteDeviceKeys(ctx context.Context, userID string, deviceIDs []gomatrixserverlib.KeyID) error {
|
||||
return d.Writer.Do(nil, nil, func(txn *sql.Tx) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
for _, deviceID := range deviceIDs {
|
||||
if err := d.CrossSigningSigsTable.DeleteCrossSigningSigsForTarget(ctx, txn, userID, deviceID); err != nil && err != sql.ErrNoRows {
|
||||
return fmt.Errorf("d.CrossSigningSigsTable.DeleteCrossSigningSigsForTarget: %w", err)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ package sqlite3
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
|
@ -58,23 +60,39 @@ func NewSqliteKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) {
|
|||
if err != nil {
|
||||
return s, err
|
||||
}
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column partition was removed from the table
|
||||
var count int
|
||||
err = db.QueryRow("SELECT partition FROM keyserver_key_changes LIMIT 1;").Scan(&count)
|
||||
if err == nil {
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: "keyserver: refactor key changes",
|
||||
Up: deltas.UpRefactorKeyChanges,
|
||||
})
|
||||
return s, m.Up(context.Background())
|
||||
|
||||
if err = executeMigration(context.Background(), db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column partition was removed from the table
|
||||
migrationName := "keyserver: refactor key changes"
|
||||
|
||||
var cName string
|
||||
err := db.QueryRowContext(ctx, `SELECT p.name FROM sqlite_master AS m JOIN pragma_table_info(m.name) AS p WHERE m.name = 'keyserver_key_changes' AND p.name = 'partition'`).Scan(&cName)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: migrationName,
|
||||
Up: deltas.UpRefactorKeyChanges,
|
||||
})
|
||||
return m.Up(ctx)
|
||||
}
|
||||
|
||||
func (s *keyChangesStatements) Prepare() (err error) {
|
||||
if s.upsertKeyChangeStmt, err = s.db.Prepare(upsertKeyChangeSQL); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -439,6 +439,7 @@ type QueryMembershipAtEventRequest struct {
|
|||
|
||||
// QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest.
|
||||
type QueryMembershipAtEventResponse struct {
|
||||
// Memberships is a map from eventID to a list of events (if any).
|
||||
// Memberships is a map from eventID to a list of events (if any). Events that
|
||||
// do not have known state will return an empty array here.
|
||||
Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -176,7 +176,8 @@ func (r *Inputer) Start() error {
|
|||
},
|
||||
nats.HeadersOnly(),
|
||||
nats.DeliverAll(),
|
||||
nats.AckAll(),
|
||||
nats.AckExplicit(),
|
||||
nats.ReplayInstant(),
|
||||
nats.BindStream(r.InputRoomEventTopic),
|
||||
)
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -553,11 +553,14 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
|
|||
|
||||
// Make sure events from the missingResp are using the cache - missing events
|
||||
// will be added and duplicates will be removed.
|
||||
logger.Debugf("get_missing_events returned %d events", len(missingResp.Events))
|
||||
missingEvents := make([]*gomatrixserverlib.Event, 0, len(missingResp.Events))
|
||||
for _, ev := range missingResp.Events.UntrustedEvents(roomVersion) {
|
||||
if err = ev.VerifyEventSignatures(ctx, t.keys); err != nil {
|
||||
continue
|
||||
}
|
||||
missingEvents = append(missingEvents, t.cacheAndReturn(ev))
|
||||
}
|
||||
logger.Debugf("get_missing_events returned %d events (%d passed signature checks)", len(missingResp.Events), len(missingEvents))
|
||||
|
||||
// topologically sort and sanity check that we are making forward progress
|
||||
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingEvents, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
||||
|
|
|
|||
|
|
@ -106,7 +106,7 @@ func (r *Queryer) QueryStateAfterEvents(
|
|||
return err
|
||||
}
|
||||
|
||||
if len(request.PrevEventIDs) > 1 && len(request.StateToFetch) == 0 {
|
||||
if len(request.PrevEventIDs) > 1 {
|
||||
var authEventIDs []string
|
||||
for _, e := range stateEvents {
|
||||
authEventIDs = append(authEventIDs, e.AuthEventIDs()...)
|
||||
|
|
@ -208,6 +208,9 @@ func (r *Queryer) QueryMembershipForUser(
|
|||
return err
|
||||
}
|
||||
|
||||
// QueryMembershipAtEvent returns the known memberships at a given event.
|
||||
// If the state before an event is not known, an empty list will be returned
|
||||
// for that event instead.
|
||||
func (r *Queryer) QueryMembershipAtEvent(
|
||||
ctx context.Context,
|
||||
request *api.QueryMembershipAtEventRequest,
|
||||
|
|
@ -237,7 +240,11 @@ func (r *Queryer) QueryMembershipAtEvent(
|
|||
}
|
||||
|
||||
for _, eventID := range request.EventIDs {
|
||||
stateEntry := stateEntries[eventID]
|
||||
stateEntry, ok := stateEntries[eventID]
|
||||
if !ok {
|
||||
response.Memberships[eventID] = []*gomatrixserverlib.HeaderedEvent{}
|
||||
continue
|
||||
}
|
||||
memberships, err := helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get memberships at state: %w", err)
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package state
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
|
@ -134,11 +135,14 @@ func (v *StateResolution) LoadMembershipAtEvent(
|
|||
for i := range eventIDs {
|
||||
eventID := eventIDs[i]
|
||||
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
||||
if err != nil {
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err)
|
||||
}
|
||||
if snapshotNID == 0 {
|
||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID)
|
||||
// If we don't know a state snapshot for this event then we can't calculate
|
||||
// memberships at the time of the event, so skip over it. This means that
|
||||
// it isn't guaranteed that the response map will contain every single event.
|
||||
continue
|
||||
}
|
||||
snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,10 +16,11 @@
|
|||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/lib/pq"
|
||||
// Import the postgres database driver.
|
||||
_ "github.com/lib/pq"
|
||||
|
||||
|
|
@ -52,30 +53,8 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
|
|||
|
||||
// Special case, since this migration uses several tables, so it needs to
|
||||
// be sure that all tables are created first.
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column event_nid was removed from the table
|
||||
var eventNID int
|
||||
err = db.QueryRow("SELECT event_nid FROM roomserver_state_block LIMIT 1;").Scan(&eventNID)
|
||||
if err == nil {
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: "roomserver: state blocks refactor",
|
||||
Up: deltas.UpStateBlocksRefactor,
|
||||
})
|
||||
if err = m.Up(base.Context()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
switch e := err.(type) {
|
||||
case *pq.Error:
|
||||
// ignore undefined_column (42703) errors, as this is expected at this point
|
||||
if e.Code != "42703" {
|
||||
return nil, err
|
||||
}
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
if err = executeMigration(base.Context(), db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Then prepare the statements. Now that the migrations have run, any columns referred
|
||||
|
|
@ -87,6 +66,32 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
|
|||
return &d, nil
|
||||
}
|
||||
|
||||
func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column event_nid was removed from the table
|
||||
migrationName := "roomserver: state blocks refactor"
|
||||
|
||||
var cName string
|
||||
err := db.QueryRowContext(ctx, "select column_name from information_schema.columns where table_name = 'roomserver_state_block' AND column_name = 'event_nid'").Scan(&cName)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: migrationName,
|
||||
Up: deltas.UpStateBlocksRefactor,
|
||||
})
|
||||
|
||||
return m.Up(ctx)
|
||||
}
|
||||
|
||||
func (d *Database) create(db *sql.DB) error {
|
||||
if err := CreateEventStateKeysTable(db); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -26,11 +26,11 @@ func NewMembershipUpdater(
|
|||
var targetUserNID types.EventStateKeyNID
|
||||
var err error
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
roomNID, err = d.assignRoomNID(ctx, roomID, roomVersion)
|
||||
roomNID, err = d.assignRoomNID(ctx, txn, roomID, roomVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
targetUserNID, err = d.assignStateKeyNID(ctx, targetUserID)
|
||||
targetUserNID, err = d.assignStateKeyNID(ctx, txn, targetUserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -101,7 +101,7 @@ func (u *MembershipUpdater) Update(newMembership tables.MembershipState, event *
|
|||
var inserted bool // Did the query result in a membership change?
|
||||
var retired []string // Did we retire any updates in the process?
|
||||
return inserted, retired, u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, event.Sender())
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, event.Sender())
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -402,7 +402,7 @@ func (d *Database) RemoveRoomAlias(ctx context.Context, alias string) error {
|
|||
func (d *Database) GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom, isRoomforgotten bool, err error) {
|
||||
var requestSenderUserNID types.EventStateKeyNID
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
requestSenderUserNID, err = d.assignStateKeyNID(ctx, requestSenderUserID)
|
||||
requestSenderUserNID, err = d.assignStateKeyNID(ctx, txn, requestSenderUserID)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
|
|
@ -596,7 +596,9 @@ func (d *Database) storeEvent(
|
|||
if updater != nil && updater.txn != nil {
|
||||
txn = updater.txn
|
||||
}
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
// First writer is with a database-provided transaction, so that NIDs are assigned
|
||||
// globally outside of the updater context, to help avoid races.
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
// TODO: Here we should aim to have two different code paths for new rooms
|
||||
// vs existing ones.
|
||||
|
||||
|
|
@ -611,11 +613,11 @@ func (d *Database) storeEvent(
|
|||
return fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err)
|
||||
}
|
||||
|
||||
if roomNID, err = d.assignRoomNID(ctx, event.RoomID(), roomVersion); err != nil {
|
||||
if roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion); err != nil {
|
||||
return fmt.Errorf("d.assignRoomNID: %w", err)
|
||||
}
|
||||
|
||||
if eventTypeNID, err = d.assignEventTypeNID(ctx, event.Type()); err != nil {
|
||||
if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, event.Type()); err != nil {
|
||||
return fmt.Errorf("d.assignEventTypeNID: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -623,11 +625,19 @@ func (d *Database) storeEvent(
|
|||
// Assigned a numeric ID for the state_key if there is one present.
|
||||
// Otherwise set the numeric ID for the state_key to 0.
|
||||
if eventStateKey != nil {
|
||||
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, *eventStateKey); err != nil {
|
||||
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil {
|
||||
return fmt.Errorf("d.assignStateKeyNID: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
|
||||
}
|
||||
// Second writer is using the database-provided transaction, probably from the
|
||||
// room updater, for easy roll-back if required.
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
if eventNID, stateNID, err = d.EventsTable.InsertEvent(
|
||||
ctx,
|
||||
txn,
|
||||
|
|
@ -749,48 +759,48 @@ func (d *Database) MissingAuthPrevEvents(
|
|||
}
|
||||
|
||||
func (d *Database) assignRoomNID(
|
||||
ctx context.Context, roomID string, roomVersion gomatrixserverlib.RoomVersion,
|
||||
ctx context.Context, txn *sql.Tx, roomID string, roomVersion gomatrixserverlib.RoomVersion,
|
||||
) (types.RoomNID, error) {
|
||||
// Check if we already have a numeric ID in the database.
|
||||
roomNID, err := d.RoomsTable.SelectRoomNID(ctx, nil, roomID)
|
||||
roomNID, err := d.RoomsTable.SelectRoomNID(ctx, txn, roomID)
|
||||
if err == sql.ErrNoRows {
|
||||
// We don't have a numeric ID so insert one into the database.
|
||||
roomNID, err = d.RoomsTable.InsertRoomNID(ctx, nil, roomID, roomVersion)
|
||||
roomNID, err = d.RoomsTable.InsertRoomNID(ctx, txn, roomID, roomVersion)
|
||||
if err == sql.ErrNoRows {
|
||||
// We raced with another insert so run the select again.
|
||||
roomNID, err = d.RoomsTable.SelectRoomNID(ctx, nil, roomID)
|
||||
roomNID, err = d.RoomsTable.SelectRoomNID(ctx, txn, roomID)
|
||||
}
|
||||
}
|
||||
return roomNID, err
|
||||
}
|
||||
|
||||
func (d *Database) assignEventTypeNID(
|
||||
ctx context.Context, eventType string,
|
||||
ctx context.Context, txn *sql.Tx, eventType string,
|
||||
) (types.EventTypeNID, error) {
|
||||
// Check if we already have a numeric ID in the database.
|
||||
eventTypeNID, err := d.EventTypesTable.SelectEventTypeNID(ctx, nil, eventType)
|
||||
eventTypeNID, err := d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType)
|
||||
if err == sql.ErrNoRows {
|
||||
// We don't have a numeric ID so insert one into the database.
|
||||
eventTypeNID, err = d.EventTypesTable.InsertEventTypeNID(ctx, nil, eventType)
|
||||
eventTypeNID, err = d.EventTypesTable.InsertEventTypeNID(ctx, txn, eventType)
|
||||
if err == sql.ErrNoRows {
|
||||
// We raced with another insert so run the select again.
|
||||
eventTypeNID, err = d.EventTypesTable.SelectEventTypeNID(ctx, nil, eventType)
|
||||
eventTypeNID, err = d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType)
|
||||
}
|
||||
}
|
||||
return eventTypeNID, err
|
||||
}
|
||||
|
||||
func (d *Database) assignStateKeyNID(
|
||||
ctx context.Context, eventStateKey string,
|
||||
ctx context.Context, txn *sql.Tx, eventStateKey string,
|
||||
) (types.EventStateKeyNID, error) {
|
||||
// Check if we already have a numeric ID in the database.
|
||||
eventStateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, eventStateKey)
|
||||
eventStateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey)
|
||||
if err == sql.ErrNoRows {
|
||||
// We don't have a numeric ID so insert one into the database.
|
||||
eventStateKeyNID, err = d.EventStateKeysTable.InsertEventStateKeyNID(ctx, nil, eventStateKey)
|
||||
eventStateKeyNID, err = d.EventStateKeysTable.InsertEventStateKeyNID(ctx, txn, eventStateKey)
|
||||
if err == sql.ErrNoRows {
|
||||
// We raced with another insert so run the select again.
|
||||
eventStateKeyNID, err = d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, eventStateKey)
|
||||
eventStateKeyNID, err = d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey)
|
||||
}
|
||||
}
|
||||
return eventStateKeyNID, err
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package sqlite3
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -61,20 +62,8 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
|
|||
|
||||
// Special case, since this migration uses several tables, so it needs to
|
||||
// be sure that all tables are created first.
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column event_nid was removed from the table
|
||||
var eventNID int
|
||||
err = db.QueryRow("SELECT event_nid FROM roomserver_state_block LIMIT 1;").Scan(&eventNID)
|
||||
if err == nil {
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: "roomserver: state blocks refactor",
|
||||
Up: deltas.UpStateBlocksRefactor,
|
||||
})
|
||||
if err = m.Up(base.Context()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = executeMigration(base.Context(), db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Then prepare the statements. Now that the migrations have run, any columns referred
|
||||
|
|
@ -86,6 +75,31 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
|
|||
return &d, nil
|
||||
}
|
||||
|
||||
func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||
// TODO: Remove when we are sure we are not having goose artefacts in the db
|
||||
// This forces an error, which indicates the migration is already applied, since the
|
||||
// column event_nid was removed from the table
|
||||
migrationName := "roomserver: state blocks refactor"
|
||||
|
||||
var cName string
|
||||
err := db.QueryRowContext(ctx, `SELECT p.name FROM sqlite_master AS m JOIN pragma_table_info(m.name) AS p WHERE m.name = 'roomserver_state_block' AND p.name = 'event_nid'`).Scan(&cName)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: migrationName,
|
||||
Up: deltas.UpStateBlocksRefactor,
|
||||
})
|
||||
return m.Up(ctx)
|
||||
}
|
||||
|
||||
func (d *Database) create(db *sql.DB) error {
|
||||
if err := CreateEventStateKeysTable(db); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -59,6 +59,9 @@ func (f *Fulltext) Defaults(opts DefaultOpts) {
|
|||
}
|
||||
|
||||
func (f *Fulltext) Verify(configErrs *ConfigErrors, isMonolith bool) {
|
||||
if !f.Enabled {
|
||||
return
|
||||
}
|
||||
checkNotEmpty(configErrs, "syncapi.fulltext.index_path", string(f.IndexPath))
|
||||
checkNotEmpty(configErrs, "syncapi.fulltext.language", f.Language)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,16 +19,17 @@ import (
|
|||
"encoding/json"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
|
||||
|
|
@ -79,16 +80,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
|||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message")
|
||||
return true
|
||||
}
|
||||
if domain != s.serverName {
|
||||
log.Tracef("ignoring send-to-device event with destination %s", domain)
|
||||
return true
|
||||
}
|
||||
|
||||
var output types.OutputSendToDeviceEvent
|
||||
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("output log: message parse failure")
|
||||
log.WithError(err).Errorf("send-to-device: message parse failure")
|
||||
sentry.CaptureException(err)
|
||||
return true
|
||||
}
|
||||
|
|
@ -105,7 +108,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
|||
)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).Errorf("failed to store send-to-device message")
|
||||
log.WithError(err).Errorf("send-to-device: failed to store message")
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -111,8 +111,8 @@ const selectEventsWithEventIDsSQL = "" +
|
|||
|
||||
const selectSharedUsersSQL = "" +
|
||||
"SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" +
|
||||
" SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
|
||||
") AND state_key = ANY($2) AND membership IN ('join', 'invite');"
|
||||
" SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
|
||||
") AND type = 'm.room.member' AND state_key = ANY($2) AND membership IN ('join', 'invite');"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
|
@ -61,10 +62,10 @@ func NewPostgresIgnoresTable(db *sql.DB) (tables.Ignores, error) {
|
|||
}
|
||||
|
||||
func (s *ignoresStatements) SelectIgnores(
|
||||
ctx context.Context, userID string,
|
||||
ctx context.Context, txn *sql.Tx, userID string,
|
||||
) (*types.IgnoredUsers, error) {
|
||||
var ignoresData []byte
|
||||
err := s.selectIgnoresStmt.QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
err := sqlutil.TxStmt(txn, s.selectIgnoresStmt).QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -76,12 +77,12 @@ func (s *ignoresStatements) SelectIgnores(
|
|||
}
|
||||
|
||||
func (s *ignoresStatements) UpsertIgnores(
|
||||
ctx context.Context, userID string, ignores *types.IgnoredUsers,
|
||||
ctx context.Context, txn *sql.Tx, userID string, ignores *types.IgnoredUsers,
|
||||
) error {
|
||||
ignoresJSON, err := json.Marshal(ignores)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.upsertIgnoresStmt.ExecContext(ctx, userID, ignoresJSON)
|
||||
_, err = sqlutil.TxStmt(txn, s.upsertIgnoresStmt).ExecContext(ctx, userID, ignoresJSON)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,13 +70,13 @@ const selectUserUnreadNotificationCountsSQL = `SELECT
|
|||
|
||||
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
|
||||
|
||||
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
||||
err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
|
||||
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
||||
err = sqlutil.TxStmt(txn, r.upsertRoomUnreadCounts).QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
|
||||
return
|
||||
}
|
||||
|
||||
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
|
||||
rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl)
|
||||
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
|
||||
rows, err := sqlutil.TxStmt(txn, r.selectUserUnreadCounts).QueryContext(ctx, userID, fromExcl, toIncl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -101,8 +101,8 @@ func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context,
|
|||
return roomCounts, rows.Err()
|
||||
}
|
||||
|
||||
func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) {
|
||||
func (r *notificationDataStatements) SelectMaxID(ctx context.Context, txn *sql.Tx) (int64, error) {
|
||||
var id int64
|
||||
err := r.selectMaxID.QueryRowContext(ctx).Scan(&id)
|
||||
err := sqlutil.TxStmt(txn, r.selectMaxID).QueryRowContext(ctx).Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.S
|
|||
}
|
||||
|
||||
func (d *Database) MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error) {
|
||||
id, err := d.NotificationData.SelectMaxID(ctx)
|
||||
id, err := d.NotificationData.SelectMaxID(ctx, nil)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("d.NotificationData.SelectMaxID: %w", err)
|
||||
}
|
||||
|
|
@ -1029,15 +1029,15 @@ func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, stream
|
|||
}
|
||||
|
||||
func (d *Database) UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
||||
err = d.Writer.Do(nil, nil, func(_ *sql.Tx) error {
|
||||
pos, err = d.NotificationData.UpsertRoomUnreadCounts(ctx, userID, roomID, notificationCount, highlightCount)
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
pos, err = d.NotificationData.UpsertRoomUnreadCounts(ctx, txn, userID, roomID, notificationCount, highlightCount)
|
||||
return err
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (d *Database) GetUserUnreadNotificationCounts(ctx context.Context, userID string, from, to types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
|
||||
return d.NotificationData.SelectUserUnreadCounts(ctx, userID, from, to)
|
||||
return d.NotificationData.SelectUserUnreadCounts(ctx, nil, userID, from, to)
|
||||
}
|
||||
|
||||
func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
|
||||
|
|
@ -1052,15 +1052,23 @@ func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID s
|
|||
}
|
||||
|
||||
func (d *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) {
|
||||
return d.Ignores.SelectIgnores(ctx, userID)
|
||||
return d.Ignores.SelectIgnores(ctx, nil, userID)
|
||||
}
|
||||
|
||||
func (d *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error {
|
||||
return d.Ignores.UpsertIgnores(ctx, userID, ignores)
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.Ignores.UpsertIgnores(ctx, txn, userID, ignores)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||
return d.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
|
||||
var pos types.StreamPosition
|
||||
var err error
|
||||
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
pos, err = d.Presence.UpsertPresence(ctx, txn, userID, statusMsg, presence, lastActiveTS, fromSync)
|
||||
return nil
|
||||
})
|
||||
return pos, err
|
||||
}
|
||||
|
||||
func (d *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
||||
|
|
|
|||
|
|
@ -95,8 +95,8 @@ const selectEventsWithEventIDsSQL = "" +
|
|||
|
||||
const selectSharedUsersSQL = "" +
|
||||
"SELECT state_key FROM syncapi_current_room_state WHERE room_id IN(" +
|
||||
" SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
|
||||
") AND state_key IN ($2) AND membership IN ('join', 'invite');"
|
||||
" SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
|
||||
") AND type = 'm.room.member' AND state_key IN ($2) AND membership IN ('join', 'invite');"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
db *sql.DB
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
|
@ -61,10 +62,10 @@ func NewSqliteIgnoresTable(db *sql.DB) (tables.Ignores, error) {
|
|||
}
|
||||
|
||||
func (s *ignoresStatements) SelectIgnores(
|
||||
ctx context.Context, userID string,
|
||||
ctx context.Context, txn *sql.Tx, userID string,
|
||||
) (*types.IgnoredUsers, error) {
|
||||
var ignoresData []byte
|
||||
err := s.selectIgnoresStmt.QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
err := sqlutil.TxStmt(txn, s.selectIgnoresStmt).QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -76,12 +77,12 @@ func (s *ignoresStatements) SelectIgnores(
|
|||
}
|
||||
|
||||
func (s *ignoresStatements) UpsertIgnores(
|
||||
ctx context.Context, userID string, ignores *types.IgnoredUsers,
|
||||
ctx context.Context, txn *sql.Tx, userID string, ignores *types.IgnoredUsers,
|
||||
) error {
|
||||
ignoresJSON, err := json.Marshal(ignores)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.upsertIgnoresStmt.ExecContext(ctx, userID, ignoresJSON)
|
||||
_, err = sqlutil.TxStmt(txn, s.upsertIgnoresStmt).ExecContext(ctx, userID, ignoresJSON)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ const selectUserUnreadNotificationCountsSQL = `SELECT
|
|||
|
||||
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
|
||||
|
||||
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
||||
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
||||
pos, err = r.streamIDStatements.nextNotificationID(ctx, nil)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
@ -81,8 +81,8 @@ func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context,
|
|||
return
|
||||
}
|
||||
|
||||
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
|
||||
rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl)
|
||||
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
|
||||
rows, err := sqlutil.TxStmt(txn, r.selectUserUnreadCounts).QueryContext(ctx, userID, fromExcl, toIncl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -107,8 +107,8 @@ func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context,
|
|||
return roomCounts, rows.Err()
|
||||
}
|
||||
|
||||
func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) {
|
||||
func (r *notificationDataStatements) SelectMaxID(ctx context.Context, txn *sql.Tx) (int64, error) {
|
||||
var id int64
|
||||
err := r.selectMaxID.QueryRowContext(ctx).Scan(&id)
|
||||
err := sqlutil.TxStmt(txn, r.selectMaxID).QueryRowContext(ctx).Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -190,14 +190,14 @@ type Memberships interface {
|
|||
}
|
||||
|
||||
type NotificationData interface {
|
||||
UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
|
||||
SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
|
||||
SelectMaxID(ctx context.Context) (int64, error)
|
||||
UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
|
||||
SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
|
||||
SelectMaxID(ctx context.Context, txn *sql.Tx) (int64, error)
|
||||
}
|
||||
|
||||
type Ignores interface {
|
||||
SelectIgnores(ctx context.Context, userID string) (*types.IgnoredUsers, error)
|
||||
UpsertIgnores(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
|
||||
SelectIgnores(ctx context.Context, txn *sql.Tx, userID string) (*types.IgnoredUsers, error)
|
||||
UpsertIgnores(ctx context.Context, txn *sql.Tx, userID string, ignores *types.IgnoredUsers) error
|
||||
}
|
||||
|
||||
type Presence interface {
|
||||
|
|
|
|||
|
|
@ -388,7 +388,7 @@ func applyHistoryVisibilityFilter(
|
|||
if err != nil {
|
||||
// Not a fatal error, we can continue without the stateEvents,
|
||||
// they are only needed if there are state events in the timeline.
|
||||
logrus.WithError(err).Warnf("failed to get current room state")
|
||||
logrus.WithError(err).Warnf("Failed to get current room state for history visibility")
|
||||
}
|
||||
alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents))
|
||||
for _, ev := range stateEvents {
|
||||
|
|
@ -397,7 +397,6 @@ func applyHistoryVisibilityFilter(
|
|||
startTime := time.Now()
|
||||
events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
|
||||
if err != nil {
|
||||
|
||||
return nil, err
|
||||
}
|
||||
logrus.WithFields(logrus.Fields{
|
||||
|
|
@ -405,7 +404,7 @@ func applyHistoryVisibilityFilter(
|
|||
"room_id": roomID,
|
||||
"before": len(recentEvents),
|
||||
"after": len(events),
|
||||
}).Debug("applied history visibility (sync)")
|
||||
}).Trace("Applied history visibility (sync)")
|
||||
return events, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -624,9 +624,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
|
|||
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
|
||||
for i := 0; i < tc.sendMessagesCount; i++ {
|
||||
msgCounter++
|
||||
msg := map[string]string{
|
||||
"dummy": fmt.Sprintf("message %d", msgCounter),
|
||||
}
|
||||
msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
|
||||
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
|
||||
t.Fatalf("unable to send to device message: %v", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import (
|
|||
type OutputStreamEventConsumer struct {
|
||||
ctx context.Context
|
||||
cfg *config.UserAPI
|
||||
userAPI api.UserInternalAPI
|
||||
rsAPI rsapi.UserRoomserverAPI
|
||||
jetstream nats.JetStreamContext
|
||||
durable string
|
||||
|
|
@ -45,7 +44,6 @@ func NewOutputStreamEventConsumer(
|
|||
js nats.JetStreamContext,
|
||||
store storage.Database,
|
||||
pgClient pushgateway.Client,
|
||||
userAPI api.UserInternalAPI,
|
||||
rsAPI rsapi.UserRoomserverAPI,
|
||||
syncProducer *producers.SyncAPI,
|
||||
) *OutputStreamEventConsumer {
|
||||
|
|
@ -57,7 +55,6 @@ func NewOutputStreamEventConsumer(
|
|||
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
|
||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
|
||||
pgClient: pgClient,
|
||||
userAPI: userAPI,
|
||||
rsAPI: rsAPI,
|
||||
syncProducer: syncProducer,
|
||||
}
|
||||
|
|
@ -305,7 +302,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
|
|||
"event_id": event.EventID(),
|
||||
"room_id": event.RoomID(),
|
||||
"localpart": mem.Localpart,
|
||||
}).Tracef("Push rule evaluation rejected the event")
|
||||
}).Debugf("Push rule evaluation rejected the event")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -348,7 +345,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
|
|||
"localpart": mem.Localpart,
|
||||
"num_urls": len(devicesByURLAndFormat),
|
||||
"num_unread": userNumUnreadNotifs,
|
||||
}).Tracef("Notifying single member")
|
||||
}).Debugf("Notifying single member")
|
||||
|
||||
// Push gateways are out of our control, and we cannot risk
|
||||
// looking up the server on a misbehaving push gateway. Each user
|
||||
|
|
@ -422,8 +419,8 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event
|
|||
return nil, fmt.Errorf("user %s is ignored", sender)
|
||||
}
|
||||
}
|
||||
var res api.QueryPushRulesResponse
|
||||
if err = s.userAPI.QueryPushRules(ctx, &api.QueryPushRulesRequest{UserID: mem.UserID}, &res); err != nil {
|
||||
ruleSets, err := s.db.QueryPushRules(ctx, mem.Localpart)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
@ -434,7 +431,7 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event
|
|||
roomID: event.RoomID(),
|
||||
roomSize: roomSize,
|
||||
}
|
||||
eval := pushrules.NewRuleSetEvaluator(ec, &res.RuleSets.Global)
|
||||
eval := pushrules.NewRuleSetEvaluator(ec, &ruleSets.Global)
|
||||
rule, err := eval.MatchEvent(event.Event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
129
userapi/consumers/syncapi_streamevent_test.go
Normal file
129
userapi/consumers/syncapi_streamevent_test.go
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
package consumers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/pushrules"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/matrix-org/dendrite/userapi/storage"
|
||||
)
|
||||
|
||||
func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
|
||||
t.Helper()
|
||||
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||
db, err := storage.NewUserAPIDatabase(nil, &config.DatabaseOptions{
|
||||
ConnectionString: config.DataSource(connStr),
|
||||
}, "", 4, 0, 0, "")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new user db: %v", err)
|
||||
}
|
||||
return db, close
|
||||
}
|
||||
|
||||
func mustCreateEvent(t *testing.T, content string) *gomatrixserverlib.HeaderedEvent {
|
||||
t.Helper()
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON([]byte(content), false, gomatrixserverlib.RoomVersionV10)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create event: %v", err)
|
||||
}
|
||||
return ev.Headered(gomatrixserverlib.RoomVersionV10)
|
||||
}
|
||||
|
||||
func Test_evaluatePushRules(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
db, close := mustCreateDatabase(t, dbType)
|
||||
defer close()
|
||||
consumer := OutputStreamEventConsumer{db: db}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
eventContent string
|
||||
wantAction pushrules.ActionKind
|
||||
wantActions []*pushrules.Action
|
||||
wantNotify bool
|
||||
}{
|
||||
{
|
||||
name: "m.receipt doesn't notify",
|
||||
eventContent: `{"type":"m.receipt"}`,
|
||||
wantAction: pushrules.UnknownAction,
|
||||
wantActions: nil,
|
||||
},
|
||||
{
|
||||
name: "m.reaction doesn't notify",
|
||||
eventContent: `{"type":"m.reaction"}`,
|
||||
wantAction: pushrules.DontNotifyAction,
|
||||
wantActions: []*pushrules.Action{
|
||||
{
|
||||
Kind: pushrules.DontNotifyAction,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "m.room.message notifies",
|
||||
eventContent: `{"type":"m.room.message"}`,
|
||||
wantNotify: true,
|
||||
wantAction: pushrules.NotifyAction,
|
||||
wantActions: []*pushrules.Action{
|
||||
{Kind: pushrules.NotifyAction},
|
||||
{
|
||||
Kind: pushrules.SetTweakAction,
|
||||
Tweak: pushrules.HighlightTweak,
|
||||
Value: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "m.room.message highlights",
|
||||
eventContent: `{"type":"m.room.message", "content": {"body": "test"} }`,
|
||||
wantNotify: true,
|
||||
wantAction: pushrules.NotifyAction,
|
||||
wantActions: []*pushrules.Action{
|
||||
{Kind: pushrules.NotifyAction},
|
||||
{
|
||||
Kind: pushrules.SetTweakAction,
|
||||
Tweak: pushrules.SoundTweak,
|
||||
Value: "default",
|
||||
},
|
||||
{
|
||||
Kind: pushrules.SetTweakAction,
|
||||
Tweak: pushrules.HighlightTweak,
|
||||
Value: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
actions, err := consumer.evaluatePushRules(ctx, mustCreateEvent(t, tc.eventContent), &localMembership{
|
||||
UserID: "@test:localhost",
|
||||
Localpart: "test",
|
||||
Domain: "localhost",
|
||||
}, 10)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to evaluate push rules: %v", err)
|
||||
}
|
||||
assert.Equal(t, tc.wantActions, actions)
|
||||
gotAction, _, err := pushrules.ActionsToTweaks(actions)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get actions: %v", err)
|
||||
}
|
||||
if gotAction != tc.wantAction {
|
||||
t.Fatalf("expected action to be '%s', got '%s'", tc.wantAction, gotAction)
|
||||
}
|
||||
// this is taken from `notifyLocal`
|
||||
if tc.wantNotify && gotAction != pushrules.NotifyAction && gotAction != pushrules.CoalesceAction {
|
||||
t.Fatalf("expected to notify but didn't")
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -30,7 +30,6 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/clientapi/userutil"
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/internal/pushrules"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
|
@ -760,57 +759,15 @@ func (a *UserInternalAPI) PerformPushRulesPut(
|
|||
}
|
||||
|
||||
func (a *UserInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error {
|
||||
userReq := api.QueryAccountDataRequest{
|
||||
UserID: req.UserID,
|
||||
DataType: pushRulesAccountDataType,
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to split user ID %q for push rules", req.UserID)
|
||||
}
|
||||
var userRes api.QueryAccountDataResponse
|
||||
if err := a.QueryAccountData(ctx, &userReq, &userRes); err != nil {
|
||||
return err
|
||||
pushRules, err := a.DB.QueryPushRules(ctx, localpart)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query push rules: %w", err)
|
||||
}
|
||||
bs, ok := userRes.GlobalAccountData[pushRulesAccountDataType]
|
||||
if ok {
|
||||
// Legacy Dendrite users will have completely empty push rules, so we should
|
||||
// detect that situation and set some defaults.
|
||||
var rules struct {
|
||||
G struct {
|
||||
Content []json.RawMessage `json:"content"`
|
||||
Override []json.RawMessage `json:"override"`
|
||||
Room []json.RawMessage `json:"room"`
|
||||
Sender []json.RawMessage `json:"sender"`
|
||||
Underride []json.RawMessage `json:"underride"`
|
||||
} `json:"global"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(bs), &rules); err == nil {
|
||||
count := len(rules.G.Content) + len(rules.G.Override) +
|
||||
len(rules.G.Room) + len(rules.G.Sender) + len(rules.G.Underride)
|
||||
ok = count > 0
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
// If we didn't find any default push rules then we should just generate some
|
||||
// fresh ones.
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to split user ID %q for push rules", req.UserID)
|
||||
}
|
||||
pushRuleSets := pushrules.DefaultAccountRuleSets(localpart, a.ServerName)
|
||||
prbs, err := json.Marshal(pushRuleSets)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal default push rules: %w", err)
|
||||
}
|
||||
if err := a.DB.SaveAccountData(ctx, localpart, "", pushRulesAccountDataType, json.RawMessage(prbs)); err != nil {
|
||||
return fmt.Errorf("failed to save default push rules: %w", err)
|
||||
}
|
||||
res.RuleSets = pushRuleSets
|
||||
return nil
|
||||
}
|
||||
var data pushrules.AccountRuleSets
|
||||
if err := json.Unmarshal([]byte(bs), &data); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("json.Unmarshal of push rules failed")
|
||||
return err
|
||||
}
|
||||
res.RuleSets = &data
|
||||
res.RuleSets = pushRules
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import (
|
|||
"errors"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/internal/pushrules"
|
||||
"github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/userapi/types"
|
||||
|
|
@ -53,6 +54,7 @@ type AccountData interface {
|
|||
// If no account data could be found, returns nil
|
||||
// Returns an error if there was an issue with the retrieval
|
||||
GetAccountDataByType(ctx context.Context, localpart, roomID, dataType string) (data json.RawMessage, err error)
|
||||
QueryPushRules(ctx context.Context, localpart string) (*pushrules.AccountRuleSets, error)
|
||||
}
|
||||
|
||||
type Device interface {
|
||||
|
|
|
|||
|
|
@ -26,10 +26,11 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/userapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
|
||||
"github.com/matrix-org/dendrite/userapi/types"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/internal/pushrules"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
|
@ -177,6 +178,41 @@ func (d *Database) createAccount(
|
|||
return account, nil
|
||||
}
|
||||
|
||||
func (d *Database) QueryPushRules(
|
||||
ctx context.Context,
|
||||
localpart string,
|
||||
) (*pushrules.AccountRuleSets, error) {
|
||||
data, err := d.AccountDatas.SelectAccountDataByType(ctx, localpart, "", "m.push_rules")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If we didn't find any default push rules then we should just generate some
|
||||
// fresh ones.
|
||||
if len(data) == 0 {
|
||||
pushRuleSets := pushrules.DefaultAccountRuleSets(localpart, d.ServerName)
|
||||
prbs, err := json.Marshal(pushRuleSets)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal default push rules: %w", err)
|
||||
}
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if dbErr := d.AccountDatas.InsertAccountData(ctx, txn, localpart, "", "m.push_rules", prbs); dbErr != nil {
|
||||
return fmt.Errorf("failed to save default push rules: %w", dbErr)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return pushRuleSets, err
|
||||
}
|
||||
|
||||
var pushRules pushrules.AccountRuleSets
|
||||
if err := json.Unmarshal(data, &pushRules); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pushRules, nil
|
||||
}
|
||||
|
||||
// SaveAccountData saves new account data for a given user and a given room.
|
||||
// If the account data is not specific to a room, the room ID should be an empty string
|
||||
// If an account data already exists for a given set (user, room, data type), it will
|
||||
|
|
@ -699,7 +735,9 @@ func (d *Database) GetRoomNotificationCounts(ctx context.Context, localpart, roo
|
|||
}
|
||||
|
||||
func (d *Database) DeleteOldNotifications(ctx context.Context) error {
|
||||
return d.Notifications.Clean(ctx, nil)
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.Notifications.Clean(ctx, txn)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) UpsertPusher(
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/pushgateway"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
|
@ -31,7 +33,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/userapi/producers"
|
||||
"github.com/matrix-org/dendrite/userapi/storage"
|
||||
"github.com/matrix-org/dendrite/userapi/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
|
||||
|
|
@ -90,7 +91,7 @@ func NewInternalAPI(
|
|||
}
|
||||
|
||||
eventConsumer := consumers.NewOutputStreamEventConsumer(
|
||||
base.ProcessContext, cfg, js, db, pgClient, userAPI, rsAPI, syncProducer,
|
||||
base.ProcessContext, cfg, js, db, pgClient, rsAPI, syncProducer,
|
||||
)
|
||||
if err := eventConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panic("failed to start user API streamed event consumer")
|
||||
|
|
|
|||
Loading…
Reference in a new issue