mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-15 10:03:09 -06:00
Merge Matrix main
Signed-off-by: Brian Meek <brian@hntlabs.com>
This commit is contained in:
commit
4accf677ea
14
CHANGES.md
14
CHANGES.md
|
|
@ -1,5 +1,19 @@
|
||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## Dendrite 0.9.8 (2022-09-12)
|
||||||
|
|
||||||
|
### Important
|
||||||
|
|
||||||
|
* This is a **security release** to fix a vulnerability where missing events retrieved from other servers did not have their signatures verified in all cases, affecting all versions of Dendrite before 0.9.8. Upgrading to this version is highly recommended. For more information, [see here](https://github.com/matrix-org/dendrite/security/advisories/GHSA-pfw4-xjgm-267c).
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* The built-in NATS Server has been updated to the final 2.9.0 release version
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
|
||||||
|
* Dendrite will now correctly verify the signatures of events retrieved using `/get_missing_events`
|
||||||
|
|
||||||
## Dendrite 0.9.7 (2022-09-09)
|
## Dendrite 0.9.7 (2022-09-09)
|
||||||
|
|
||||||
### Features
|
### Features
|
||||||
|
|
|
||||||
|
|
@ -154,25 +154,23 @@ func (s *OutputRoomEventConsumer) onMessage(
|
||||||
|
|
||||||
txnID := ""
|
txnID := ""
|
||||||
// Try to get the message metadata, if we're able to, use the timestamp as the txnID
|
// Try to get the message metadata, if we're able to, use the timestamp as the txnID
|
||||||
if len(msgs) > 0 {
|
metadata, err := msgs[0].Metadata()
|
||||||
metadata, err := msgs[0].Metadata()
|
if err == nil {
|
||||||
if err == nil {
|
txnID = strconv.Itoa(int(metadata.Timestamp.UnixNano()))
|
||||||
txnID = strconv.Itoa(int(metadata.Timestamp.UnixNano()))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send event to any relevant application services. If we hit
|
// Send event to any relevant application services. If we hit
|
||||||
// an error here, return false, so that we negatively ack.
|
// an error here, return false, so that we negatively ack.
|
||||||
log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
|
log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
|
||||||
return s.sendEvents(txnID, ctx, state, events) == nil
|
return s.sendEvents(ctx, state, events, txnID) == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendEvents passes events to the appservice by using the transactions
|
// sendEvents passes events to the appservice by using the transactions
|
||||||
// endpoint. It will block for the backoff period if necessary.
|
// endpoint. It will block for the backoff period if necessary.
|
||||||
func (s *OutputRoomEventConsumer) sendEvents(
|
func (s *OutputRoomEventConsumer) sendEvents(
|
||||||
txnID string,
|
|
||||||
ctx context.Context, state *appserviceState,
|
ctx context.Context, state *appserviceState,
|
||||||
events []*gomatrixserverlib.HeaderedEvent,
|
events []*gomatrixserverlib.HeaderedEvent,
|
||||||
|
txnID string,
|
||||||
) error {
|
) error {
|
||||||
// Create the transaction body.
|
// Create the transaction body.
|
||||||
transaction, err := json.Marshal(
|
transaction, err := json.Marshal(
|
||||||
|
|
|
||||||
|
|
@ -218,6 +218,9 @@ func (m *DendriteMonolith) Start() {
|
||||||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||||
panic("failed to load PEM key: " + err.Error())
|
panic("failed to load PEM key: " + err.Error())
|
||||||
}
|
}
|
||||||
|
if len(sk) != ed25519.PrivateKeySize {
|
||||||
|
panic("the private key is not long enough")
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||||
panic("failed to read the old private key: " + err.Error())
|
panic("failed to read the old private key: " + err.Error())
|
||||||
|
|
@ -233,8 +236,13 @@ func (m *DendriteMonolith) Start() {
|
||||||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||||
panic("failed to load PEM key: " + err.Error())
|
panic("failed to load PEM key: " + err.Error())
|
||||||
}
|
}
|
||||||
|
if len(sk) != ed25519.PrivateKeySize {
|
||||||
|
panic("the private key is not long enough")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pk = sk.Public().(ed25519.PublicKey)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
m.listener, err = net.Listen("tcp", "localhost:65432")
|
m.listener, err = net.Listen("tcp", "localhost:65432")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -247,7 +255,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
m.logger.SetOutput(BindLogger{})
|
m.logger.SetOutput(BindLogger{})
|
||||||
logrus.SetOutput(BindLogger{})
|
logrus.SetOutput(BindLogger{})
|
||||||
|
|
||||||
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
|
||||||
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
|
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
|
||||||
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
|
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
|
||||||
m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil)
|
m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil)
|
||||||
|
|
|
||||||
|
|
@ -87,6 +87,9 @@ func (m *DendriteMonolith) Start() {
|
||||||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||||
panic("failed to load PEM key: " + err.Error())
|
panic("failed to load PEM key: " + err.Error())
|
||||||
}
|
}
|
||||||
|
if len(sk) != ed25519.PrivateKeySize {
|
||||||
|
panic("the private key is not long enough")
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||||
panic("failed to read the old private key: " + err.Error())
|
panic("failed to read the old private key: " + err.Error())
|
||||||
|
|
@ -103,8 +106,13 @@ func (m *DendriteMonolith) Start() {
|
||||||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||||
panic("failed to load PEM key: " + err.Error())
|
panic("failed to load PEM key: " + err.Error())
|
||||||
}
|
}
|
||||||
|
if len(sk) != ed25519.PrivateKeySize {
|
||||||
|
panic("the private key is not long enough")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pk = sk.Public().(ed25519.PublicKey)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
m.listener, err = net.Listen("tcp", "localhost:65432")
|
m.listener, err = net.Listen("tcp", "localhost:65432")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -21,12 +21,13 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SyncAPIProducer produces events for the sync API server to consume
|
// SyncAPIProducer produces events for the sync API server to consume
|
||||||
|
|
@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
||||||
|
|
||||||
func (p *SyncAPIProducer) SendToDevice(
|
func (p *SyncAPIProducer) SendToDevice(
|
||||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||||
message interface{},
|
message json.RawMessage,
|
||||||
) error {
|
) error {
|
||||||
devices := []string{}
|
devices := []string{}
|
||||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
|
|
@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
devices = append(devices, deviceID)
|
devices = append(devices, deviceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
js, err := json.Marshal(message)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"user_id": userID,
|
"user_id": userID,
|
||||||
"num_devices": len(devices),
|
"num_devices": len(devices),
|
||||||
"type": eventType,
|
"type": eventType,
|
||||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||||
for _, device := range devices {
|
for i, device := range devices {
|
||||||
ote := &types.OutputSendToDeviceEvent{
|
ote := &types.OutputSendToDeviceEvent{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
DeviceID: device,
|
DeviceID: device,
|
||||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||||
Sender: sender,
|
Sender: sender,
|
||||||
Type: eventType,
|
Type: eventType,
|
||||||
Content: js,
|
Content: message,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m := &nats.Msg{
|
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||||
Subject: p.TopicSendToDeviceEvent,
|
m.Data = eventJSON
|
||||||
Data: eventJSON,
|
|
||||||
Header: nats.Header{},
|
|
||||||
}
|
|
||||||
m.Header.Set("sender", sender)
|
m.Header.Set("sender", sender)
|
||||||
m.Header.Set(jetstream.UserID, userID)
|
m.Header.Set(jetstream.UserID, userID)
|
||||||
|
|
||||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
if i < len(devices)-1 {
|
||||||
|
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -98,6 +98,9 @@ func main() {
|
||||||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||||
panic("failed to load PEM key: " + err.Error())
|
panic("failed to load PEM key: " + err.Error())
|
||||||
}
|
}
|
||||||
|
if len(sk) != ed25519.PrivateKeySize {
|
||||||
|
panic("the private key is not long enough")
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||||
panic("failed to read the old private key: " + err.Error())
|
panic("failed to read the old private key: " + err.Error())
|
||||||
|
|
@ -114,7 +117,13 @@ func main() {
|
||||||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||||
panic("failed to load PEM key: " + err.Error())
|
panic("failed to load PEM key: " + err.Error())
|
||||||
}
|
}
|
||||||
|
if len(sk) != ed25519.PrivateKeySize {
|
||||||
|
panic("the private key is not long enough")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pk = sk.Public().(ed25519.PublicKey)
|
||||||
|
|
||||||
cfg.Defaults(config.DefaultOpts{
|
cfg.Defaults(config.DefaultOpts{
|
||||||
Generate: true,
|
Generate: true,
|
||||||
Monolithic: true,
|
Monolithic: true,
|
||||||
|
|
@ -136,14 +145,13 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pk = sk.Public().(ed25519.PublicKey)
|
|
||||||
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
||||||
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
|
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
|
||||||
|
|
||||||
base := base.NewBaseDendrite(cfg, "Monolith")
|
base := base.NewBaseDendrite(cfg, "Monolith")
|
||||||
defer base.Close() // nolint: errcheck
|
defer base.Close() // nolint: errcheck
|
||||||
|
|
||||||
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
|
||||||
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
|
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
|
||||||
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
|
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
|
||||||
pManager := pineconeConnections.NewConnectionManager(pRouter, nil)
|
pManager := pineconeConnections.NewConnectionManager(pRouter, nil)
|
||||||
|
|
|
||||||
|
|
@ -86,6 +86,9 @@ func main() {
|
||||||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||||
panic("failed to load PEM key: " + err.Error())
|
panic("failed to load PEM key: " + err.Error())
|
||||||
}
|
}
|
||||||
|
if len(sk) != ed25519.PrivateKeySize {
|
||||||
|
panic("the private key is not long enough")
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
if sk, err = os.ReadFile(oldkeyfile); err != nil {
|
||||||
panic("failed to read the old private key: " + err.Error())
|
panic("failed to read the old private key: " + err.Error())
|
||||||
|
|
@ -102,8 +105,13 @@ func main() {
|
||||||
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
|
||||||
panic("failed to load PEM key: " + err.Error())
|
panic("failed to load PEM key: " + err.Error())
|
||||||
}
|
}
|
||||||
|
if len(sk) != ed25519.PrivateKeySize {
|
||||||
|
panic("the private key is not long enough")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pk = sk.Public().(ed25519.PublicKey)
|
||||||
|
|
||||||
// use custom config if config flag is set
|
// use custom config if config flag is set
|
||||||
if configFlagSet {
|
if configFlagSet {
|
||||||
cfg = setup.ParseFlags(true)
|
cfg = setup.ParseFlags(true)
|
||||||
|
|
@ -129,7 +137,6 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pk = sk.Public().(ed25519.PublicKey)
|
|
||||||
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
|
||||||
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
|
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrix"
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
@ -82,11 +83,14 @@ func runTests(baseURL, branchName string) error {
|
||||||
client: users[1].client, text: "4: " + branchName,
|
client: users[1].client, text: "4: " + branchName,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
wantEventIDs := make(map[string]struct{}, 8)
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
_, err = msg.client.SendText(dmRoomID, msg.text)
|
var resp *gomatrix.RespSendEvent
|
||||||
|
resp, err = msg.client.SendText(dmRoomID, msg.text)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to send text in dm room: %s", err)
|
return fmt.Errorf("failed to send text in dm room: %s", err)
|
||||||
}
|
}
|
||||||
|
wantEventIDs[resp.EventID] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// attempt to create/join the shared public room
|
// attempt to create/join the shared public room
|
||||||
|
|
@ -114,11 +118,48 @@ func runTests(baseURL, branchName string) error {
|
||||||
}
|
}
|
||||||
// send messages
|
// send messages
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
_, err = msg.client.SendText(publicRoomID, "public "+msg.text)
|
resp, err := msg.client.SendText(publicRoomID, "public "+msg.text)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to send text in public room: %s", err)
|
return fmt.Errorf("failed to send text in public room: %s", err)
|
||||||
}
|
}
|
||||||
|
wantEventIDs[resp.EventID] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync until we have all expected messages
|
||||||
|
doneCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
syncClient := users[0].client
|
||||||
|
since := ""
|
||||||
|
for len(wantEventIDs) > 0 {
|
||||||
|
select {
|
||||||
|
case <-doneCh:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
syncResp, err := syncClient.SyncRequest(1000, since, "1", false, "")
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, room := range syncResp.Rooms.Join {
|
||||||
|
for _, ev := range room.Timeline.Events {
|
||||||
|
if ev.Type != "m.room.message" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
delete(wantEventIDs, ev.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
since = syncResp.NextBatch
|
||||||
|
}
|
||||||
|
close(doneCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 10):
|
||||||
|
close(doneCh)
|
||||||
|
return fmt.Errorf("failed to receive all expected messages: %+v", wantEventIDs)
|
||||||
|
case <-doneCh:
|
||||||
|
}
|
||||||
|
|
||||||
log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID)
|
log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
||||||
|
|
||||||
func (p *SyncAPIProducer) SendToDevice(
|
func (p *SyncAPIProducer) SendToDevice(
|
||||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||||
message interface{},
|
message json.RawMessage,
|
||||||
) error {
|
) error {
|
||||||
devices := []string{}
|
devices := []string{}
|
||||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
|
|
@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
devices = append(devices, deviceID)
|
devices = append(devices, deviceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
js, err := json.Marshal(message)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"user_id": userID,
|
"user_id": userID,
|
||||||
"num_devices": len(devices),
|
"num_devices": len(devices),
|
||||||
"type": eventType,
|
"type": eventType,
|
||||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||||
for _, device := range devices {
|
for i, device := range devices {
|
||||||
ote := &types.OutputSendToDeviceEvent{
|
ote := &types.OutputSendToDeviceEvent{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
DeviceID: device,
|
DeviceID: device,
|
||||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||||
Sender: sender,
|
Sender: sender,
|
||||||
Type: eventType,
|
Type: eventType,
|
||||||
Content: js,
|
Content: message,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m := &nats.Msg{
|
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||||
Subject: p.TopicSendToDeviceEvent,
|
m.Data = eventJSON
|
||||||
Data: eventJSON,
|
|
||||||
Header: nats.Header{},
|
|
||||||
}
|
|
||||||
m.Header.Set("sender", sender)
|
m.Header.Set("sender", sender)
|
||||||
m.Header.Set(jetstream.UserID, userID)
|
m.Header.Set(jetstream.UserID, userID)
|
||||||
|
|
||||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
if i < len(devices)-1 {
|
||||||
|
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -167,10 +163,10 @@ func (p *SyncAPIProducer) SendPresence(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SyncAPIProducer) SendDeviceListUpdate(
|
func (p *SyncAPIProducer) SendDeviceListUpdate(
|
||||||
ctx context.Context, deviceListUpdate gomatrixserverlib.RawJSON, origin string,
|
ctx context.Context, deviceListUpdate gomatrixserverlib.RawJSON, origin gomatrixserverlib.ServerName,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
m := nats.NewMsg(p.TopicDeviceListUpdate)
|
m := nats.NewMsg(p.TopicDeviceListUpdate)
|
||||||
m.Header.Set("origin", origin)
|
m.Header.Set("origin", string(origin))
|
||||||
m.Data = deviceListUpdate
|
m.Data = deviceListUpdate
|
||||||
log.Debugf("Sending device list update: %+v", m.Header)
|
log.Debugf("Sending device list update: %+v", m.Header)
|
||||||
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
|
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||||
|
|
|
||||||
|
|
@ -359,7 +359,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case gomatrixserverlib.MDeviceListUpdate:
|
case gomatrixserverlib.MDeviceListUpdate:
|
||||||
if err := t.producer.SendDeviceListUpdate(ctx, e.Content, e.Origin); err != nil {
|
if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil {
|
||||||
util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
|
util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
|
||||||
}
|
}
|
||||||
case gomatrixserverlib.MReceipt:
|
case gomatrixserverlib.MReceipt:
|
||||||
|
|
|
||||||
20
go.mod
20
go.mod
|
|
@ -17,16 +17,17 @@ require (
|
||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/gorilla/mux v1.8.0
|
github.com/gorilla/mux v1.8.0
|
||||||
github.com/gorilla/websocket v1.5.0
|
github.com/gorilla/websocket v1.5.0
|
||||||
|
github.com/kardianos/minwinsvc v1.0.0
|
||||||
github.com/lib/pq v1.10.5
|
github.com/lib/pq v1.10.5
|
||||||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed
|
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
github.com/mattn/go-sqlite3 v1.14.13
|
github.com/mattn/go-sqlite3 v1.14.13
|
||||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c
|
github.com/nats-io/nats-server/v2 v2.9.0
|
||||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995
|
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0
|
||||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
|
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9
|
||||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
||||||
github.com/ngrok/sqlmw v0.0.0-20220520173518-97c9c04efc79
|
github.com/ngrok/sqlmw v0.0.0-20220520173518-97c9c04efc79
|
||||||
|
|
@ -44,7 +45,7 @@ require (
|
||||||
github.com/uber/jaeger-lib v2.4.1+incompatible
|
github.com/uber/jaeger-lib v2.4.1+incompatible
|
||||||
github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c
|
github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c
|
||||||
go.uber.org/atomic v1.9.0
|
go.uber.org/atomic v1.9.0
|
||||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
|
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90
|
||||||
golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9
|
golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9
|
||||||
golang.org/x/mobile v0.0.0-20220722155234-aaac322e2105
|
golang.org/x/mobile v0.0.0-20220722155234-aaac322e2105
|
||||||
golang.org/x/net v0.0.0-20220728211354-c7608f3a8462
|
golang.org/x/net v0.0.0-20220728211354-c7608f3a8462
|
||||||
|
|
@ -54,10 +55,7 @@ require (
|
||||||
nhooyr.io/websocket v1.8.7
|
nhooyr.io/websocket v1.8.7
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require github.com/ethereum/go-ethereum v1.10.15
|
||||||
github.com/ethereum/go-ethereum v1.10.15
|
|
||||||
github.com/kardianos/minwinsvc v1.0.0
|
|
||||||
)
|
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
|
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
|
||||||
|
|
@ -132,9 +130,9 @@ require (
|
||||||
github.com/tidwall/pretty v1.2.0 // indirect
|
github.com/tidwall/pretty v1.2.0 // indirect
|
||||||
go.etcd.io/bbolt v1.3.5 // indirect
|
go.etcd.io/bbolt v1.3.5 // indirect
|
||||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
|
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
|
||||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d // indirect
|
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 // indirect
|
||||||
golang.org/x/text v0.3.8-0.20211004125949-5bd84dd9b33b // indirect
|
golang.org/x/text v0.3.8-0.20211004125949-5bd84dd9b33b // indirect
|
||||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
|
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
|
||||||
golang.org/x/tools v0.1.12 // indirect
|
golang.org/x/tools v0.1.12 // indirect
|
||||||
google.golang.org/protobuf v1.27.1 // indirect
|
google.golang.org/protobuf v1.27.1 // indirect
|
||||||
gopkg.in/macaroon.v2 v2.1.0 // indirect
|
gopkg.in/macaroon.v2 v2.1.0 // indirect
|
||||||
|
|
|
||||||
27
go.sum
27
go.sum
|
|
@ -523,10 +523,10 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661 h1:dww9rH0HVfAO9JOBD1nxq26GHKbEw07thAJTu1DrAQs=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a h1:XeGBDZZsUe4kgj3myl0EiuDNVWxszJecMTrON3Wn9sI=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220907081047-637a173a3661/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed h1:YMcCnrmTbT5M1LtTiagiFFaj9vEgvC6iVEzWsIb0tQQ=
|
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 h1:/AIaqhK1BBi2sMEVQdgZRV8H8sNloAGCgztLZhsPqD0=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220901133433-565beccfebed/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
||||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||||
|
|
@ -582,10 +582,10 @@ github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h
|
||||||
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
|
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
|
||||||
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
|
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
|
||||||
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
||||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c h1:U5qngWGZ7E/nQxz0544IpIEdKFUUaOJxQN2LHCYLGhg=
|
github.com/nats-io/nats-server/v2 v2.9.0 h1:DLWu+7/VgGOoChcDKytnUZPAmudpv7o/MhKmNrnH1RE=
|
||||||
github.com/nats-io/nats-server/v2 v2.8.5-0.20220811224153-d8d25d9b0b1c/go.mod h1:+f++B/5jpr71JATt7b5KCX+G7bt43iWx1OYWGkpE/Kk=
|
github.com/nats-io/nats-server/v2 v2.9.0/go.mod h1:BWKY6217RvhI+FDoOLZ2BH+hOC37xeKRBlQ1Lz7teKI=
|
||||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995 h1:CUcSQR8jwa9//qNgN/t3tW53DObnTPQ/G/K+qnS7yRc=
|
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0 h1:dPUKD6Iv8M1y9MU8PK6H4a4/12yx5/CbaYWz/Z1arY8=
|
||||||
github.com/nats-io/nats.go v1.16.1-0.20220810192301-fb5ca2cbc995/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
|
|
@ -828,8 +828,9 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm
|
||||||
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||||
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c=
|
|
||||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||||
|
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM=
|
||||||
|
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||||
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||||
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||||
|
|
@ -1027,8 +1028,8 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220730100132-1609e554cd39/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220730100132-1609e554cd39/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d h1:Sv5ogFZatcgIMMtBSTTAgMYsicp25MXBubjXNDKwm80=
|
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 h1:C1tElbkWrsSkn3IRl1GCW/gETw1TywWIPgwZtXTZbYg=
|
||||||
golang.org/x/sys v0.0.0-20220731174439-a90be440212d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||||
|
|
@ -1051,8 +1052,8 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb
|
||||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306 h1:+gHMid33q6pen7kv9xvT+JRinntgeXO2AeZVd0AWD3w=
|
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 h1:ftMN5LMiBFjbzleLqtoBZk7KdJwhuybIU+FckUHgoyQ=
|
||||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
|
|
||||||
|
|
@ -155,5 +155,10 @@ func InsertMigration(ctx context.Context, db *sql.DB, migrationName string) erro
|
||||||
time.Now().Format(time.RFC3339),
|
time.Now().Format(time.RFC3339),
|
||||||
internal.VersionString(),
|
internal.VersionString(),
|
||||||
)
|
)
|
||||||
|
// If the migration was already executed, we'll get a unique constraint error,
|
||||||
|
// return nil instead, to avoid unnecessary logging.
|
||||||
|
if IsUniqueConstraintViolationErr(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,12 +12,27 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
//go:build wasm
|
//go:build !wasm
|
||||||
// +build wasm
|
// +build !wasm
|
||||||
|
|
||||||
package sqlutil
|
package sqlutil
|
||||||
|
|
||||||
// IsUniqueConstraintViolationErr no-ops for this architecture
|
import (
|
||||||
|
"github.com/lib/pq"
|
||||||
|
"github.com/mattn/go-sqlite3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
|
||||||
func IsUniqueConstraintViolationErr(err error) bool {
|
func IsUniqueConstraintViolationErr(err error) bool {
|
||||||
|
switch e := err.(type) {
|
||||||
|
case *pq.Error:
|
||||||
|
return e.Code == "23505"
|
||||||
|
case pq.Error:
|
||||||
|
return e.Code == "23505"
|
||||||
|
case *sqlite3.Error:
|
||||||
|
return e.Code == sqlite3.ErrConstraint
|
||||||
|
case sqlite3.Error:
|
||||||
|
return e.Code == sqlite3.ErrConstraint
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
@ -12,15 +12,20 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
//go:build !wasm
|
//go:build wasm
|
||||||
// +build !wasm
|
// +build wasm
|
||||||
|
|
||||||
package sqlutil
|
package sqlutil
|
||||||
|
|
||||||
import "github.com/lib/pq"
|
import "github.com/mattn/go-sqlite3"
|
||||||
|
|
||||||
// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error
|
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
|
||||||
func IsUniqueConstraintViolationErr(err error) bool {
|
func IsUniqueConstraintViolationErr(err error) bool {
|
||||||
pqErr, ok := err.(*pq.Error)
|
switch e := err.(type) {
|
||||||
return ok && pqErr.Code == "23505"
|
case *sqlite3.Error:
|
||||||
|
return e.Code == sqlite3.ErrConstraint
|
||||||
|
case sqlite3.Error:
|
||||||
|
return e.Code == sqlite3.ErrConstraint
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
@ -17,7 +17,7 @@ var build string
|
||||||
const (
|
const (
|
||||||
VersionMajor = 0
|
VersionMajor = 0
|
||||||
VersionMinor = 9
|
VersionMinor = 9
|
||||||
VersionPatch = 7
|
VersionPatch = 8
|
||||||
VersionTag = "" // example: "rc1"
|
VersionTag = "" // example: "rc1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -81,8 +80,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||||
// not a fatal error, log and continue
|
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -80,8 +79,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||||
// not a fatal error, log and continue
|
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -439,6 +439,7 @@ type QueryMembershipAtEventRequest struct {
|
||||||
|
|
||||||
// QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest.
|
// QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest.
|
||||||
type QueryMembershipAtEventResponse struct {
|
type QueryMembershipAtEventResponse struct {
|
||||||
// Memberships is a map from eventID to a list of events (if any).
|
// Memberships is a map from eventID to a list of events (if any). Events that
|
||||||
|
// do not have known state will return an empty array here.
|
||||||
Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
|
Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -176,7 +176,8 @@ func (r *Inputer) Start() error {
|
||||||
},
|
},
|
||||||
nats.HeadersOnly(),
|
nats.HeadersOnly(),
|
||||||
nats.DeliverAll(),
|
nats.DeliverAll(),
|
||||||
nats.AckAll(),
|
nats.AckExplicit(),
|
||||||
|
nats.ReplayInstant(),
|
||||||
nats.BindStream(r.InputRoomEventTopic),
|
nats.BindStream(r.InputRoomEventTopic),
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -264,16 +264,27 @@ func (u *latestEventsUpdater) latestState() error {
|
||||||
return fmt.Errorf("roomState.CalculateAndStoreStateAfterEvents: %w", err)
|
return fmt.Errorf("roomState.CalculateAndStoreStateAfterEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now that we have a new state snapshot based on the latest events,
|
// Include information about what changed in the state transition. If the
|
||||||
// we can compare that new snapshot to the previous one and see what
|
// event rewrites the state (i.e. is a federated join) then we will simply
|
||||||
// has changed. This gives us one list of removed state events and
|
// include the entire state snapshot as added events, as the "RewritesState"
|
||||||
// another list of added ones. Replacing a value for a state-key tuple
|
// flag in the output event signals downstream components to purge their
|
||||||
// will result one removed (the old event) and one added (the new event).
|
// room state first. If it doesn't rewrite the state then we will work out
|
||||||
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
|
// what the difference is between the state snapshots and send that. In all
|
||||||
ctx, u.oldStateNID, u.newStateNID,
|
// cases where a state event is being replaced, the old state event will
|
||||||
)
|
// appear in "removed" and the replacement will appear in "added".
|
||||||
if err != nil {
|
if u.rewritesState {
|
||||||
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
|
u.removed = []types.StateEntry{}
|
||||||
|
u.added, err = roomState.LoadStateAtSnapshot(ctx, u.newStateNID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("roomState.LoadStateAtSnapshot: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
|
||||||
|
ctx, u.oldStateNID, u.newStateNID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if removed := len(u.removed) - len(u.added); !u.rewritesState && removed > 0 {
|
if removed := len(u.removed) - len(u.added); !u.rewritesState && removed > 0 {
|
||||||
|
|
|
||||||
|
|
@ -553,11 +553,14 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
|
||||||
|
|
||||||
// Make sure events from the missingResp are using the cache - missing events
|
// Make sure events from the missingResp are using the cache - missing events
|
||||||
// will be added and duplicates will be removed.
|
// will be added and duplicates will be removed.
|
||||||
logger.Debugf("get_missing_events returned %d events", len(missingResp.Events))
|
|
||||||
missingEvents := make([]*gomatrixserverlib.Event, 0, len(missingResp.Events))
|
missingEvents := make([]*gomatrixserverlib.Event, 0, len(missingResp.Events))
|
||||||
for _, ev := range missingResp.Events.UntrustedEvents(roomVersion) {
|
for _, ev := range missingResp.Events.UntrustedEvents(roomVersion) {
|
||||||
|
if err = ev.VerifyEventSignatures(ctx, t.keys); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
missingEvents = append(missingEvents, t.cacheAndReturn(ev))
|
missingEvents = append(missingEvents, t.cacheAndReturn(ev))
|
||||||
}
|
}
|
||||||
|
logger.Debugf("get_missing_events returned %d events (%d passed signature checks)", len(missingResp.Events), len(missingEvents))
|
||||||
|
|
||||||
// topologically sort and sanity check that we are making forward progress
|
// topologically sort and sanity check that we are making forward progress
|
||||||
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingEvents, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingEvents, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ func (r *Queryer) QueryStateAfterEvents(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(request.PrevEventIDs) > 1 && len(request.StateToFetch) == 0 {
|
if len(request.PrevEventIDs) > 1 {
|
||||||
var authEventIDs []string
|
var authEventIDs []string
|
||||||
for _, e := range stateEvents {
|
for _, e := range stateEvents {
|
||||||
authEventIDs = append(authEventIDs, e.AuthEventIDs()...)
|
authEventIDs = append(authEventIDs, e.AuthEventIDs()...)
|
||||||
|
|
@ -208,6 +208,9 @@ func (r *Queryer) QueryMembershipForUser(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryMembershipAtEvent returns the known memberships at a given event.
|
||||||
|
// If the state before an event is not known, an empty list will be returned
|
||||||
|
// for that event instead.
|
||||||
func (r *Queryer) QueryMembershipAtEvent(
|
func (r *Queryer) QueryMembershipAtEvent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.QueryMembershipAtEventRequest,
|
request *api.QueryMembershipAtEventRequest,
|
||||||
|
|
@ -237,7 +240,11 @@ func (r *Queryer) QueryMembershipAtEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, eventID := range request.EventIDs {
|
for _, eventID := range request.EventIDs {
|
||||||
stateEntry := stateEntries[eventID]
|
stateEntry, ok := stateEntries[eventID]
|
||||||
|
if !ok {
|
||||||
|
response.Memberships[eventID] = []*gomatrixserverlib.HeaderedEvent{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
memberships, err := helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
memberships, err := helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to get memberships at state: %w", err)
|
return fmt.Errorf("unable to get memberships at state: %w", err)
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
@ -134,11 +135,14 @@ func (v *StateResolution) LoadMembershipAtEvent(
|
||||||
for i := range eventIDs {
|
for i := range eventIDs {
|
||||||
eventID := eventIDs[i]
|
eventID := eventIDs[i]
|
||||||
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
||||||
if err != nil {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err)
|
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err)
|
||||||
}
|
}
|
||||||
if snapshotNID == 0 {
|
if snapshotNID == 0 {
|
||||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID)
|
// If we don't know a state snapshot for this event then we can't calculate
|
||||||
|
// memberships at the time of the event, so skip over it. This means that
|
||||||
|
// it isn't guaranteed that the response map will contain every single event.
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID)
|
snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,8 +21,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
|
|
||||||
|
|
@ -79,8 +77,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||||
// not a fatal error, log and continue
|
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -87,8 +86,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||||
// not a fatal error, log and continue
|
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,16 +19,17 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/getsentry/sentry-go"
|
"github.com/getsentry/sentry-go"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"github.com/matrix-org/util"
|
|
||||||
"github.com/nats-io/nats.go"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
|
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
|
||||||
|
|
@ -79,16 +80,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
||||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if domain != s.serverName {
|
if domain != s.serverName {
|
||||||
|
log.Tracef("ignoring send-to-device event with destination %s", domain)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
var output types.OutputSendToDeviceEvent
|
var output types.OutputSendToDeviceEvent
|
||||||
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("output log: message parse failure")
|
log.WithError(err).Errorf("send-to-device: message parse failure")
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
@ -105,7 +108,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
log.WithError(err).Errorf("failed to store send-to-device message")
|
log.WithError(err).Errorf("send-to-device: failed to store message")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -388,7 +388,7 @@ func applyHistoryVisibilityFilter(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Not a fatal error, we can continue without the stateEvents,
|
// Not a fatal error, we can continue without the stateEvents,
|
||||||
// they are only needed if there are state events in the timeline.
|
// they are only needed if there are state events in the timeline.
|
||||||
logrus.WithError(err).Warnf("failed to get current room state")
|
logrus.WithError(err).Warnf("Failed to get current room state for history visibility")
|
||||||
}
|
}
|
||||||
alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents))
|
alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents))
|
||||||
for _, ev := range stateEvents {
|
for _, ev := range stateEvents {
|
||||||
|
|
@ -397,7 +397,6 @@ func applyHistoryVisibilityFilter(
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
|
events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
logrus.WithFields(logrus.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
|
|
@ -405,7 +404,7 @@ func applyHistoryVisibilityFilter(
|
||||||
"room_id": roomID,
|
"room_id": roomID,
|
||||||
"before": len(recentEvents),
|
"before": len(recentEvents),
|
||||||
"after": len(events),
|
"after": len(events),
|
||||||
}).Debug("applied history visibility (sync)")
|
}).Trace("Applied history visibility (sync)")
|
||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -624,9 +624,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
|
||||||
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
|
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
|
||||||
for i := 0; i < tc.sendMessagesCount; i++ {
|
for i := 0; i < tc.sendMessagesCount; i++ {
|
||||||
msgCounter++
|
msgCounter++
|
||||||
msg := map[string]string{
|
msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
|
||||||
"dummy": fmt.Sprintf("message %d", msgCounter),
|
|
||||||
}
|
|
||||||
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
|
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
|
||||||
t.Fatalf("unable to send to device message: %v", err)
|
t.Fatalf("unable to send to device message: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue