mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-11 16:13:10 -06:00
Merge commit '4accf677ea3467de48aae9dd355a245dd13668a9'
This commit is contained in:
commit
8e801aa37f
14
CHANGES.md
14
CHANGES.md
|
|
@ -1,5 +1,19 @@
|
|||
# Changelog
|
||||
|
||||
## Dendrite 0.9.8 (2022-09-12)
|
||||
|
||||
### Important
|
||||
|
||||
* This is a **security release** to fix a vulnerability where missing events retrieved from other servers did not have their signatures verified in all cases, affecting all versions of Dendrite before 0.9.8. Upgrading to this version is highly recommended. For more information, [see here](https://github.com/matrix-org/dendrite/security/advisories/GHSA-pfw4-xjgm-267c).
|
||||
|
||||
### Features
|
||||
|
||||
* The built-in NATS Server has been updated to the final 2.9.0 release version
|
||||
|
||||
### Fixes
|
||||
|
||||
* Dendrite will now correctly verify the signatures of events retrieved using `/get_missing_events`
|
||||
|
||||
## Dendrite 0.9.7 (2022-09-09)
|
||||
|
||||
### Features
|
||||
|
|
|
|||
|
|
@ -154,25 +154,23 @@ func (s *OutputRoomEventConsumer) onMessage(
|
|||
|
||||
txnID := ""
|
||||
// Try to get the message metadata, if we're able to, use the timestamp as the txnID
|
||||
if len(msgs) > 0 {
|
||||
metadata, err := msgs[0].Metadata()
|
||||
if err == nil {
|
||||
txnID = strconv.Itoa(int(metadata.Timestamp.UnixNano()))
|
||||
}
|
||||
metadata, err := msgs[0].Metadata()
|
||||
if err == nil {
|
||||
txnID = strconv.Itoa(int(metadata.Timestamp.UnixNano()))
|
||||
}
|
||||
|
||||
// Send event to any relevant application services. If we hit
|
||||
// an error here, return false, so that we negatively ack.
|
||||
log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
|
||||
return s.sendEvents(txnID, ctx, state, events) == nil
|
||||
return s.sendEvents(ctx, state, events, txnID) == nil
|
||||
}
|
||||
|
||||
// sendEvents passes events to the appservice by using the transactions
|
||||
// endpoint. It will block for the backoff period if necessary.
|
||||
func (s *OutputRoomEventConsumer) sendEvents(
|
||||
txnID string,
|
||||
ctx context.Context, state *appserviceState,
|
||||
events []*gomatrixserverlib.HeaderedEvent,
|
||||
txnID string,
|
||||
) error {
|
||||
// Create the transaction body.
|
||||
transaction, err := json.Marshal(
|
||||
|
|
|
|||
|
|
@ -218,6 +218,9 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -233,8 +236,13 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
var err error
|
||||
m.listener, err = net.Listen("tcp", "localhost:65432")
|
||||
if err != nil {
|
||||
|
|
@ -247,7 +255,7 @@ func (m *DendriteMonolith) Start() {
|
|||
m.logger.SetOutput(BindLogger{})
|
||||
logrus.SetOutput(BindLogger{})
|
||||
|
||||
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
||||
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
|
||||
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
|
||||
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
|
||||
m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil)
|
||||
|
|
|
|||
|
|
@ -87,6 +87,9 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -103,8 +106,13 @@ func (m *DendriteMonolith) Start() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
var err error
|
||||
m.listener, err = net.Listen("tcp", "localhost:65432")
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -21,12 +21,13 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
)
|
||||
|
||||
// SyncAPIProducer produces events for the sync API server to consume
|
||||
|
|
@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
|||
|
||||
func (p *SyncAPIProducer) SendToDevice(
|
||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||
message interface{},
|
||||
message json.RawMessage,
|
||||
) error {
|
||||
devices := []string{}
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
|
|
@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
devices = append(devices, deviceID)
|
||||
}
|
||||
|
||||
js, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"user_id": userID,
|
||||
"num_devices": len(devices),
|
||||
"type": eventType,
|
||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||
for _, device := range devices {
|
||||
for i, device := range devices {
|
||||
ote := &types.OutputSendToDeviceEvent{
|
||||
UserID: userID,
|
||||
DeviceID: device,
|
||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||
Sender: sender,
|
||||
Type: eventType,
|
||||
Content: js,
|
||||
Content: message,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||
return err
|
||||
}
|
||||
m := &nats.Msg{
|
||||
Subject: p.TopicSendToDeviceEvent,
|
||||
Data: eventJSON,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||
m.Data = eventJSON
|
||||
m.Header.Set("sender", sender)
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
|
||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
||||
if i < len(devices)-1 {
|
||||
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||
continue
|
||||
}
|
||||
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -98,6 +98,9 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -114,7 +117,13 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
cfg.Defaults(config.DefaultOpts{
|
||||
Generate: true,
|
||||
Monolithic: true,
|
||||
|
|
@ -136,14 +145,13 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
||||
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
|
||||
|
||||
base := base.NewBaseDendrite(cfg, "Monolith")
|
||||
defer base.Close() // nolint: errcheck
|
||||
|
||||
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
||||
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
|
||||
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
|
||||
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
|
||||
pManager := pineconeConnections.NewConnectionManager(pRouter, nil)
|
||||
|
|
|
|||
|
|
@ -86,6 +86,9 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
} else {
|
||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||
panic("failed to read the old private key: " + err.Error())
|
||||
|
|
@ -102,8 +105,13 @@ func main() {
|
|||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||
panic("failed to load PEM key: " + err.Error())
|
||||
}
|
||||
if len(sk) != ed25519.PrivateKeySize {
|
||||
panic("the private key is not long enough")
|
||||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
|
||||
// use custom config if config flag is set
|
||||
if configFlagSet {
|
||||
cfg = setup.ParseFlags(true)
|
||||
|
|
@ -129,7 +137,6 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
pk = sk.Public().(ed25519.PublicKey)
|
||||
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
||||
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -82,11 +83,14 @@ func runTests(baseURL, branchName string) error {
|
|||
client: users[1].client, text: "4: " + branchName,
|
||||
},
|
||||
}
|
||||
wantEventIDs := make(map[string]struct{}, 8)
|
||||
for _, msg := range msgs {
|
||||
_, err = msg.client.SendText(dmRoomID, msg.text)
|
||||
var resp *gomatrix.RespSendEvent
|
||||
resp, err = msg.client.SendText(dmRoomID, msg.text)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send text in dm room: %s", err)
|
||||
}
|
||||
wantEventIDs[resp.EventID] = struct{}{}
|
||||
}
|
||||
|
||||
// attempt to create/join the shared public room
|
||||
|
|
@ -114,11 +118,48 @@ func runTests(baseURL, branchName string) error {
|
|||
}
|
||||
// send messages
|
||||
for _, msg := range msgs {
|
||||
_, err = msg.client.SendText(publicRoomID, "public "+msg.text)
|
||||
resp, err := msg.client.SendText(publicRoomID, "public "+msg.text)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send text in public room: %s", err)
|
||||
}
|
||||
wantEventIDs[resp.EventID] = struct{}{}
|
||||
}
|
||||
|
||||
// Sync until we have all expected messages
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
syncClient := users[0].client
|
||||
since := ""
|
||||
for len(wantEventIDs) > 0 {
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
syncResp, err := syncClient.SyncRequest(1000, since, "1", false, "")
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, room := range syncResp.Rooms.Join {
|
||||
for _, ev := range room.Timeline.Events {
|
||||
if ev.Type != "m.room.message" {
|
||||
continue
|
||||
}
|
||||
delete(wantEventIDs, ev.ID)
|
||||
}
|
||||
}
|
||||
since = syncResp.NextBatch
|
||||
}
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(time.Second * 10):
|
||||
close(doneCh)
|
||||
return fmt.Errorf("failed to receive all expected messages: %+v", wantEventIDs)
|
||||
case <-doneCh:
|
||||
}
|
||||
|
||||
log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID)
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
|||
|
||||
func (p *SyncAPIProducer) SendToDevice(
|
||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||
message interface{},
|
||||
message json.RawMessage,
|
||||
) error {
|
||||
devices := []string{}
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
|
|
@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
devices = append(devices, deviceID)
|
||||
}
|
||||
|
||||
js, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"user_id": userID,
|
||||
"num_devices": len(devices),
|
||||
"type": eventType,
|
||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||
for _, device := range devices {
|
||||
for i, device := range devices {
|
||||
ote := &types.OutputSendToDeviceEvent{
|
||||
UserID: userID,
|
||||
DeviceID: device,
|
||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||
Sender: sender,
|
||||
Type: eventType,
|
||||
Content: js,
|
||||
Content: message,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
|||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||
return err
|
||||
}
|
||||
m := &nats.Msg{
|
||||
Subject: p.TopicSendToDeviceEvent,
|
||||
Data: eventJSON,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||
m.Data = eventJSON
|
||||
m.Header.Set("sender", sender)
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
|
||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
||||
if i < len(devices)-1 {
|
||||
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||
continue
|
||||
}
|
||||
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -167,10 +163,10 @@ func (p *SyncAPIProducer) SendPresence(
|
|||
}
|
||||
|
||||
func (p *SyncAPIProducer) SendDeviceListUpdate(
|
||||
ctx context.Context, deviceListUpdate gomatrixserverlib.RawJSON, origin string,
|
||||
ctx context.Context, deviceListUpdate gomatrixserverlib.RawJSON, origin gomatrixserverlib.ServerName,
|
||||
) (err error) {
|
||||
m := nats.NewMsg(p.TopicDeviceListUpdate)
|
||||
m.Header.Set("origin", origin)
|
||||
m.Header.Set("origin", string(origin))
|
||||
m.Data = deviceListUpdate
|
||||
log.Debugf("Sending device list update: %+v", m.Header)
|
||||
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||
|
|
|
|||
|
|
@ -359,7 +359,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
case gomatrixserverlib.MDeviceListUpdate:
|
||||
if err := t.producer.SendDeviceListUpdate(ctx, e.Content, e.Origin); err != nil {
|
||||
if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
|
||||
}
|
||||
case gomatrixserverlib.MReceipt:
|
||||
|
|
|
|||
20
go.mod
20
go.mod
|
|
@ -17,16 +17,17 @@ require (
|
|||
github.com/google/uuid v1.3.0
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/kardianos/minwinsvc v1.0.0
|
||||
github.com/lib/pq v1.10.5
|
||||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661
|
||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a
|
||||
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||
github.com/mattn/go-sqlite3 v1.14.13
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995
|
||||
github.com/nats-io/nats-server/v2 v2.9.0
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0
|
||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
||||
github.com/ngrok/sqlmw v0.0.0-20220520173518-97c9c04efc79
|
||||
|
|
@ -44,7 +45,7 @@ require (
|
|||
github.com/uber/jaeger-lib v2.4.1+incompatible
|
||||
github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c
|
||||
go.uber.org/atomic v1.9.0
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
|
||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90
|
||||
golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9
|
||||
golang.org/x/mobile v0.0.0-20220722155234-aaac322e2105
|
||||
golang.org/x/net v0.0.0-20220728211354-c7608f3a8462
|
||||
|
|
@ -54,10 +55,7 @@ require (
|
|||
nhooyr.io/websocket v1.8.7
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/ethereum/go-ethereum v1.10.15
|
||||
github.com/kardianos/minwinsvc v1.0.0
|
||||
)
|
||||
require github.com/ethereum/go-ethereum v1.10.15
|
||||
|
||||
require (
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
|
||||
|
|
@ -132,9 +130,9 @@ require (
|
|||
github.com/tidwall/pretty v1.2.0 // indirect
|
||||
go.etcd.io/bbolt v1.3.5 // indirect
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
|
||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d // indirect
|
||||
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 // indirect
|
||||
golang.org/x/text v0.3.8-0.20211004125949-5bd84dd9b33b // indirect
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
|
||||
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
|
||||
golang.org/x/tools v0.1.12 // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
gopkg.in/macaroon.v2 v2.1.0 // indirect
|
||||
|
|
|
|||
27
go.sum
27
go.sum
|
|
@ -523,10 +523,10 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
|
|||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661 h1:dww9rH0HVfAO9JOBD1nxq26GHKbEw07thAJTu1DrAQs=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed h1:YMcCnrmTbT5M1LtTiagiFFaj9vEgvC6iVEzWsIb0tQQ=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a h1:XeGBDZZsUe4kgj3myl0EiuDNVWxszJecMTrON3Wn9sI=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 h1:/AIaqhK1BBi2sMEVQdgZRV8H8sNloAGCgztLZhsPqD0=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||
|
|
@ -582,10 +582,10 @@ github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h
|
|||
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
|
||||
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
|
||||
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c h1:U5qngWGZ7E/nQxz0544IpIEdKFUUaOJxQN2LHCYLGhg=
|
||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c/go.mod h1:+f++B/5jpr71JATt7b5KCX+G7bt43iWx1OYWGkpE/Kk=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995 h1:CUcSQR8jwa9//qNgN/t3tW53DObnTPQ/G/K+qnS7yRc=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats-server/v2 v2.9.0 h1:DLWu+7/VgGOoChcDKytnUZPAmudpv7o/MhKmNrnH1RE=
|
||||
github.com/nats-io/nats-server/v2 v2.9.0/go.mod h1:BWKY6217RvhI+FDoOLZ2BH+hOC37xeKRBlQ1Lz7teKI=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0 h1:dPUKD6Iv8M1y9MU8PK6H4a4/12yx5/CbaYWz/Z1arY8=
|
||||
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
|
|
@ -828,8 +828,9 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm
|
|||
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c=
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM=
|
||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
|
|
@ -1027,8 +1028,8 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220730100132-1609e554cd39/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d h1:Sv5ogFZatcgIMMtBSTTAgMYsicp25MXBubjXNDKwm80=
|
||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 h1:C1tElbkWrsSkn3IRl1GCW/gETw1TywWIPgwZtXTZbYg=
|
||||
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
|
|
@ -1051,8 +1052,8 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb
|
|||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306 h1:+gHMid33q6pen7kv9xvT+JRinntgeXO2AeZVd0AWD3w=
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 h1:ftMN5LMiBFjbzleLqtoBZk7KdJwhuybIU+FckUHgoyQ=
|
||||
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
|
|
|||
|
|
@ -155,5 +155,10 @@ func InsertMigration(ctx context.Context, db *sql.DB, migrationName string) erro
|
|||
time.Now().Format(time.RFC3339),
|
||||
internal.VersionString(),
|
||||
)
|
||||
// If the migration was already executed, we'll get a unique constraint error,
|
||||
// return nil instead, to avoid unnecessary logging.
|
||||
if IsUniqueConstraintViolationErr(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,12 +12,27 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build wasm
|
||||
// +build wasm
|
||||
//go:build !wasm
|
||||
// +build !wasm
|
||||
|
||||
package sqlutil
|
||||
|
||||
// IsUniqueConstraintViolationErr no-ops for this architecture
|
||||
import (
|
||||
"github.com/lib/pq"
|
||||
"github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
|
||||
func IsUniqueConstraintViolationErr(err error) bool {
|
||||
switch e := err.(type) {
|
||||
case *pq.Error:
|
||||
return e.Code == "23505"
|
||||
case pq.Error:
|
||||
return e.Code == "23505"
|
||||
case *sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
case sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
@ -12,15 +12,20 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !wasm
|
||||
// +build !wasm
|
||||
//go:build wasm
|
||||
// +build wasm
|
||||
|
||||
package sqlutil
|
||||
|
||||
import "github.com/lib/pq"
|
||||
import "github.com/mattn/go-sqlite3"
|
||||
|
||||
// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error
|
||||
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
|
||||
func IsUniqueConstraintViolationErr(err error) bool {
|
||||
pqErr, ok := err.(*pq.Error)
|
||||
return ok && pqErr.Code == "23505"
|
||||
switch e := err.(type) {
|
||||
case *sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
case sqlite3.Error:
|
||||
return e.Code == sqlite3.ErrConstraint
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@ var build string
|
|||
const (
|
||||
VersionMajor = 0
|
||||
VersionMinor = 9
|
||||
VersionPatch = 7
|
||||
VersionPatch = 8
|
||||
VersionTag = "" // example: "rc1"
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -18,8 +18,7 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
|
@ -81,8 +80,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
|||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
// not a fatal error, log and continue
|
||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,8 +18,7 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
|
@ -80,8 +79,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
|||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
// not a fatal error, log and continue
|
||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -439,6 +439,7 @@ type QueryMembershipAtEventRequest struct {
|
|||
|
||||
// QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest.
|
||||
type QueryMembershipAtEventResponse struct {
|
||||
// Memberships is a map from eventID to a list of events (if any).
|
||||
// Memberships is a map from eventID to a list of events (if any). Events that
|
||||
// do not have known state will return an empty array here.
|
||||
Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -176,7 +176,8 @@ func (r *Inputer) Start() error {
|
|||
},
|
||||
nats.HeadersOnly(),
|
||||
nats.DeliverAll(),
|
||||
nats.AckAll(),
|
||||
nats.AckExplicit(),
|
||||
nats.ReplayInstant(),
|
||||
nats.BindStream(r.InputRoomEventTopic),
|
||||
)
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -264,16 +264,27 @@ func (u *latestEventsUpdater) latestState() error {
|
|||
return fmt.Errorf("roomState.CalculateAndStoreStateAfterEvents: %w", err)
|
||||
}
|
||||
|
||||
// Now that we have a new state snapshot based on the latest events,
|
||||
// we can compare that new snapshot to the previous one and see what
|
||||
// has changed. This gives us one list of removed state events and
|
||||
// another list of added ones. Replacing a value for a state-key tuple
|
||||
// will result one removed (the old event) and one added (the new event).
|
||||
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
|
||||
ctx, u.oldStateNID, u.newStateNID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
|
||||
// Include information about what changed in the state transition. If the
|
||||
// event rewrites the state (i.e. is a federated join) then we will simply
|
||||
// include the entire state snapshot as added events, as the "RewritesState"
|
||||
// flag in the output event signals downstream components to purge their
|
||||
// room state first. If it doesn't rewrite the state then we will work out
|
||||
// what the difference is between the state snapshots and send that. In all
|
||||
// cases where a state event is being replaced, the old state event will
|
||||
// appear in "removed" and the replacement will appear in "added".
|
||||
if u.rewritesState {
|
||||
u.removed = []types.StateEntry{}
|
||||
u.added, err = roomState.LoadStateAtSnapshot(ctx, u.newStateNID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("roomState.LoadStateAtSnapshot: %w", err)
|
||||
}
|
||||
} else {
|
||||
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
|
||||
ctx, u.oldStateNID, u.newStateNID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if removed := len(u.removed) - len(u.added); !u.rewritesState && removed > 0 {
|
||||
|
|
|
|||
|
|
@ -553,11 +553,14 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
|
|||
|
||||
// Make sure events from the missingResp are using the cache - missing events
|
||||
// will be added and duplicates will be removed.
|
||||
logger.Debugf("get_missing_events returned %d events", len(missingResp.Events))
|
||||
missingEvents := make([]*gomatrixserverlib.Event, 0, len(missingResp.Events))
|
||||
for _, ev := range missingResp.Events.UntrustedEvents(roomVersion) {
|
||||
if err = ev.VerifyEventSignatures(ctx, t.keys); err != nil {
|
||||
continue
|
||||
}
|
||||
missingEvents = append(missingEvents, t.cacheAndReturn(ev))
|
||||
}
|
||||
logger.Debugf("get_missing_events returned %d events (%d passed signature checks)", len(missingResp.Events), len(missingEvents))
|
||||
|
||||
// topologically sort and sanity check that we are making forward progress
|
||||
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingEvents, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
||||
|
|
|
|||
|
|
@ -106,7 +106,7 @@ func (r *Queryer) QueryStateAfterEvents(
|
|||
return err
|
||||
}
|
||||
|
||||
if len(request.PrevEventIDs) > 1 && len(request.StateToFetch) == 0 {
|
||||
if len(request.PrevEventIDs) > 1 {
|
||||
var authEventIDs []string
|
||||
for _, e := range stateEvents {
|
||||
authEventIDs = append(authEventIDs, e.AuthEventIDs()...)
|
||||
|
|
@ -208,6 +208,9 @@ func (r *Queryer) QueryMembershipForUser(
|
|||
return err
|
||||
}
|
||||
|
||||
// QueryMembershipAtEvent returns the known memberships at a given event.
|
||||
// If the state before an event is not known, an empty list will be returned
|
||||
// for that event instead.
|
||||
func (r *Queryer) QueryMembershipAtEvent(
|
||||
ctx context.Context,
|
||||
request *api.QueryMembershipAtEventRequest,
|
||||
|
|
@ -237,7 +240,11 @@ func (r *Queryer) QueryMembershipAtEvent(
|
|||
}
|
||||
|
||||
for _, eventID := range request.EventIDs {
|
||||
stateEntry := stateEntries[eventID]
|
||||
stateEntry, ok := stateEntries[eventID]
|
||||
if !ok {
|
||||
response.Memberships[eventID] = []*gomatrixserverlib.HeaderedEvent{}
|
||||
continue
|
||||
}
|
||||
memberships, err := helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get memberships at state: %w", err)
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package state
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
|
@ -134,11 +135,14 @@ func (v *StateResolution) LoadMembershipAtEvent(
|
|||
for i := range eventIDs {
|
||||
eventID := eventIDs[i]
|
||||
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
||||
if err != nil {
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err)
|
||||
}
|
||||
if snapshotNID == 0 {
|
||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID)
|
||||
// If we don't know a state snapshot for this event then we can't calculate
|
||||
// memberships at the time of the event, so skip over it. This means that
|
||||
// it isn't guaranteed that the response map will contain every single event.
|
||||
continue
|
||||
}
|
||||
snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,8 +21,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
// Import the postgres database driver.
|
||||
_ "github.com/lib/pq"
|
||||
|
||||
|
|
@ -79,8 +77,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
|||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
// not a fatal error, log and continue
|
||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
|
@ -87,8 +86,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
|||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||
// not a fatal error, log and continue
|
||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,16 +19,17 @@ import (
|
|||
"encoding/json"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
|
||||
|
|
@ -79,16 +80,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
|||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message")
|
||||
return true
|
||||
}
|
||||
if domain != s.serverName {
|
||||
log.Tracef("ignoring send-to-device event with destination %s", domain)
|
||||
return true
|
||||
}
|
||||
|
||||
var output types.OutputSendToDeviceEvent
|
||||
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("output log: message parse failure")
|
||||
log.WithError(err).Errorf("send-to-device: message parse failure")
|
||||
sentry.CaptureException(err)
|
||||
return true
|
||||
}
|
||||
|
|
@ -105,7 +108,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
|||
)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).Errorf("failed to store send-to-device message")
|
||||
log.WithError(err).Errorf("send-to-device: failed to store message")
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -388,7 +388,7 @@ func applyHistoryVisibilityFilter(
|
|||
if err != nil {
|
||||
// Not a fatal error, we can continue without the stateEvents,
|
||||
// they are only needed if there are state events in the timeline.
|
||||
logrus.WithError(err).Warnf("failed to get current room state")
|
||||
logrus.WithError(err).Warnf("Failed to get current room state for history visibility")
|
||||
}
|
||||
alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents))
|
||||
for _, ev := range stateEvents {
|
||||
|
|
@ -397,7 +397,6 @@ func applyHistoryVisibilityFilter(
|
|||
startTime := time.Now()
|
||||
events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
|
||||
if err != nil {
|
||||
|
||||
return nil, err
|
||||
}
|
||||
logrus.WithFields(logrus.Fields{
|
||||
|
|
@ -405,7 +404,7 @@ func applyHistoryVisibilityFilter(
|
|||
"room_id": roomID,
|
||||
"before": len(recentEvents),
|
||||
"after": len(events),
|
||||
}).Debug("applied history visibility (sync)")
|
||||
}).Trace("Applied history visibility (sync)")
|
||||
return events, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -624,9 +624,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
|
|||
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
|
||||
for i := 0; i < tc.sendMessagesCount; i++ {
|
||||
msgCounter++
|
||||
msg := map[string]string{
|
||||
"dummy": fmt.Sprintf("message %d", msgCounter),
|
||||
}
|
||||
msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
|
||||
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
|
||||
t.Fatalf("unable to send to device message: %v", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue