mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-15 01:53:09 -06:00
Merge branch 'main' into upgrade-go-1.19
This commit is contained in:
commit
b153b50862
|
|
@ -255,7 +255,7 @@ func (m *DendriteMonolith) Start() {
|
||||||
m.logger.SetOutput(BindLogger{})
|
m.logger.SetOutput(BindLogger{})
|
||||||
logrus.SetOutput(BindLogger{})
|
logrus.SetOutput(BindLogger{})
|
||||||
|
|
||||||
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
|
||||||
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
|
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
|
||||||
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
|
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
|
||||||
m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil)
|
m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil)
|
||||||
|
|
|
||||||
|
|
@ -21,12 +21,13 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SyncAPIProducer produces events for the sync API server to consume
|
// SyncAPIProducer produces events for the sync API server to consume
|
||||||
|
|
@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
||||||
|
|
||||||
func (p *SyncAPIProducer) SendToDevice(
|
func (p *SyncAPIProducer) SendToDevice(
|
||||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||||
message interface{},
|
message json.RawMessage,
|
||||||
) error {
|
) error {
|
||||||
devices := []string{}
|
devices := []string{}
|
||||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
|
|
@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
devices = append(devices, deviceID)
|
devices = append(devices, deviceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
js, err := json.Marshal(message)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"user_id": userID,
|
"user_id": userID,
|
||||||
"num_devices": len(devices),
|
"num_devices": len(devices),
|
||||||
"type": eventType,
|
"type": eventType,
|
||||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||||
for _, device := range devices {
|
for i, device := range devices {
|
||||||
ote := &types.OutputSendToDeviceEvent{
|
ote := &types.OutputSendToDeviceEvent{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
DeviceID: device,
|
DeviceID: device,
|
||||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||||
Sender: sender,
|
Sender: sender,
|
||||||
Type: eventType,
|
Type: eventType,
|
||||||
Content: js,
|
Content: message,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m := &nats.Msg{
|
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||||
Subject: p.TopicSendToDeviceEvent,
|
m.Data = eventJSON
|
||||||
Data: eventJSON,
|
|
||||||
Header: nats.Header{},
|
|
||||||
}
|
|
||||||
m.Header.Set("sender", sender)
|
m.Header.Set("sender", sender)
|
||||||
m.Header.Set(jetstream.UserID, userID)
|
m.Header.Set(jetstream.UserID, userID)
|
||||||
|
|
||||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
if i < len(devices)-1 {
|
||||||
|
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -151,7 +151,7 @@ func main() {
|
||||||
base := base.NewBaseDendrite(cfg, "Monolith")
|
base := base.NewBaseDendrite(cfg, "Monolith")
|
||||||
defer base.Close() // nolint: errcheck
|
defer base.Close() // nolint: errcheck
|
||||||
|
|
||||||
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
|
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
|
||||||
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
|
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
|
||||||
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
|
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
|
||||||
pManager := pineconeConnections.NewConnectionManager(pRouter, nil)
|
pManager := pineconeConnections.NewConnectionManager(pRouter, nil)
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrix"
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
@ -81,11 +82,14 @@ func runTests(baseURL, branchName string) error {
|
||||||
client: users[1].client, text: "4: " + branchName,
|
client: users[1].client, text: "4: " + branchName,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
wantEventIDs := make(map[string]struct{}, 8)
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
_, err = msg.client.SendText(dmRoomID, msg.text)
|
var resp *gomatrix.RespSendEvent
|
||||||
|
resp, err = msg.client.SendText(dmRoomID, msg.text)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to send text in dm room: %s", err)
|
return fmt.Errorf("failed to send text in dm room: %s", err)
|
||||||
}
|
}
|
||||||
|
wantEventIDs[resp.EventID] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// attempt to create/join the shared public room
|
// attempt to create/join the shared public room
|
||||||
|
|
@ -113,11 +117,48 @@ func runTests(baseURL, branchName string) error {
|
||||||
}
|
}
|
||||||
// send messages
|
// send messages
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
_, err = msg.client.SendText(publicRoomID, "public "+msg.text)
|
resp, err := msg.client.SendText(publicRoomID, "public "+msg.text)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to send text in public room: %s", err)
|
return fmt.Errorf("failed to send text in public room: %s", err)
|
||||||
}
|
}
|
||||||
|
wantEventIDs[resp.EventID] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync until we have all expected messages
|
||||||
|
doneCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
syncClient := users[0].client
|
||||||
|
since := ""
|
||||||
|
for len(wantEventIDs) > 0 {
|
||||||
|
select {
|
||||||
|
case <-doneCh:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
syncResp, err := syncClient.SyncRequest(1000, since, "1", false, "")
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, room := range syncResp.Rooms.Join {
|
||||||
|
for _, ev := range room.Timeline.Events {
|
||||||
|
if ev.Type != "m.room.message" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
delete(wantEventIDs, ev.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
since = syncResp.NextBatch
|
||||||
|
}
|
||||||
|
close(doneCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 10):
|
||||||
|
close(doneCh)
|
||||||
|
return fmt.Errorf("failed to receive all expected messages: %+v", wantEventIDs)
|
||||||
|
case <-doneCh:
|
||||||
|
}
|
||||||
|
|
||||||
log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID)
|
log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
||||||
|
|
||||||
func (p *SyncAPIProducer) SendToDevice(
|
func (p *SyncAPIProducer) SendToDevice(
|
||||||
ctx context.Context, sender, userID, deviceID, eventType string,
|
ctx context.Context, sender, userID, deviceID, eventType string,
|
||||||
message interface{},
|
message json.RawMessage,
|
||||||
) error {
|
) error {
|
||||||
devices := []string{}
|
devices := []string{}
|
||||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
|
|
@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
devices = append(devices, deviceID)
|
devices = append(devices, deviceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
js, err := json.Marshal(message)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"user_id": userID,
|
"user_id": userID,
|
||||||
"num_devices": len(devices),
|
"num_devices": len(devices),
|
||||||
"type": eventType,
|
"type": eventType,
|
||||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||||
for _, device := range devices {
|
for i, device := range devices {
|
||||||
ote := &types.OutputSendToDeviceEvent{
|
ote := &types.OutputSendToDeviceEvent{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
DeviceID: device,
|
DeviceID: device,
|
||||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||||
Sender: sender,
|
Sender: sender,
|
||||||
Type: eventType,
|
Type: eventType,
|
||||||
Content: js,
|
Content: message,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m := &nats.Msg{
|
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||||
Subject: p.TopicSendToDeviceEvent,
|
m.Data = eventJSON
|
||||||
Data: eventJSON,
|
|
||||||
Header: nats.Header{},
|
|
||||||
}
|
|
||||||
m.Header.Set("sender", sender)
|
m.Header.Set("sender", sender)
|
||||||
m.Header.Set(jetstream.UserID, userID)
|
m.Header.Set(jetstream.UserID, userID)
|
||||||
|
|
||||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||||
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
|
if i < len(devices)-1 {
|
||||||
|
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
4
go.mod
4
go.mod
|
|
@ -22,8 +22,8 @@ require (
|
||||||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220911125436-dec87dbaa407
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534
|
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
github.com/mattn/go-sqlite3 v1.14.13
|
github.com/mattn/go-sqlite3 v1.14.13
|
||||||
github.com/nats-io/nats-server/v2 v2.9.0
|
github.com/nats-io/nats-server/v2 v2.9.0
|
||||||
|
|
|
||||||
8
go.sum
8
go.sum
|
|
@ -388,10 +388,10 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220911125436-dec87dbaa407 h1:UciyfR3UTWnpqFBvEAMwGmZpjjO2hemFkWmwa/tB+fw=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a h1:XeGBDZZsUe4kgj3myl0EiuDNVWxszJecMTrON3Wn9sI=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220911125436-dec87dbaa407/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 h1:XuJYAJNkdG3zj9cO0yQSvL+Sp2xogsTOuZRx7PwdtoA=
|
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 h1:/AIaqhK1BBi2sMEVQdgZRV8H8sNloAGCgztLZhsPqD0=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
||||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||||
|
|
|
||||||
|
|
@ -155,5 +155,10 @@ func InsertMigration(ctx context.Context, db *sql.DB, migrationName string) erro
|
||||||
time.Now().Format(time.RFC3339),
|
time.Now().Format(time.RFC3339),
|
||||||
internal.VersionString(),
|
internal.VersionString(),
|
||||||
)
|
)
|
||||||
|
// If the migration was already executed, we'll get a unique constraint error,
|
||||||
|
// return nil instead, to avoid unnecessary logging.
|
||||||
|
if IsUniqueConstraintViolationErr(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,12 +12,27 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
//go:build wasm
|
//go:build !wasm
|
||||||
// +build wasm
|
// +build !wasm
|
||||||
|
|
||||||
package sqlutil
|
package sqlutil
|
||||||
|
|
||||||
// IsUniqueConstraintViolationErr no-ops for this architecture
|
import (
|
||||||
|
"github.com/lib/pq"
|
||||||
|
"github.com/mattn/go-sqlite3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
|
||||||
func IsUniqueConstraintViolationErr(err error) bool {
|
func IsUniqueConstraintViolationErr(err error) bool {
|
||||||
|
switch e := err.(type) {
|
||||||
|
case *pq.Error:
|
||||||
|
return e.Code == "23505"
|
||||||
|
case pq.Error:
|
||||||
|
return e.Code == "23505"
|
||||||
|
case *sqlite3.Error:
|
||||||
|
return e.Code == sqlite3.ErrConstraint
|
||||||
|
case sqlite3.Error:
|
||||||
|
return e.Code == sqlite3.ErrConstraint
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
@ -12,15 +12,20 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
//go:build !wasm
|
//go:build wasm
|
||||||
// +build !wasm
|
// +build wasm
|
||||||
|
|
||||||
package sqlutil
|
package sqlutil
|
||||||
|
|
||||||
import "github.com/lib/pq"
|
import "github.com/mattn/go-sqlite3"
|
||||||
|
|
||||||
// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error
|
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
|
||||||
func IsUniqueConstraintViolationErr(err error) bool {
|
func IsUniqueConstraintViolationErr(err error) bool {
|
||||||
pqErr, ok := err.(*pq.Error)
|
switch e := err.(type) {
|
||||||
return ok && pqErr.Code == "23505"
|
case *sqlite3.Error:
|
||||||
|
return e.Code == sqlite3.ErrConstraint
|
||||||
|
case sqlite3.Error:
|
||||||
|
return e.Code == sqlite3.ErrConstraint
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
@ -18,8 +18,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -81,8 +80,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||||
// not a fatal error, log and continue
|
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -80,8 +79,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||||
// not a fatal error, log and continue
|
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -439,6 +439,7 @@ type QueryMembershipAtEventRequest struct {
|
||||||
|
|
||||||
// QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest.
|
// QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest.
|
||||||
type QueryMembershipAtEventResponse struct {
|
type QueryMembershipAtEventResponse struct {
|
||||||
// Memberships is a map from eventID to a list of events (if any).
|
// Memberships is a map from eventID to a list of events (if any). Events that
|
||||||
|
// do not have known state will return an empty array here.
|
||||||
Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
|
Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -176,7 +176,8 @@ func (r *Inputer) Start() error {
|
||||||
},
|
},
|
||||||
nats.HeadersOnly(),
|
nats.HeadersOnly(),
|
||||||
nats.DeliverAll(),
|
nats.DeliverAll(),
|
||||||
nats.AckAll(),
|
nats.AckExplicit(),
|
||||||
|
nats.ReplayInstant(),
|
||||||
nats.BindStream(r.InputRoomEventTopic),
|
nats.BindStream(r.InputRoomEventTopic),
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ func (r *Queryer) QueryStateAfterEvents(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(request.PrevEventIDs) > 1 && len(request.StateToFetch) == 0 {
|
if len(request.PrevEventIDs) > 1 {
|
||||||
var authEventIDs []string
|
var authEventIDs []string
|
||||||
for _, e := range stateEvents {
|
for _, e := range stateEvents {
|
||||||
authEventIDs = append(authEventIDs, e.AuthEventIDs()...)
|
authEventIDs = append(authEventIDs, e.AuthEventIDs()...)
|
||||||
|
|
@ -208,6 +208,9 @@ func (r *Queryer) QueryMembershipForUser(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryMembershipAtEvent returns the known memberships at a given event.
|
||||||
|
// If the state before an event is not known, an empty list will be returned
|
||||||
|
// for that event instead.
|
||||||
func (r *Queryer) QueryMembershipAtEvent(
|
func (r *Queryer) QueryMembershipAtEvent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.QueryMembershipAtEventRequest,
|
request *api.QueryMembershipAtEventRequest,
|
||||||
|
|
@ -237,7 +240,11 @@ func (r *Queryer) QueryMembershipAtEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, eventID := range request.EventIDs {
|
for _, eventID := range request.EventIDs {
|
||||||
stateEntry := stateEntries[eventID]
|
stateEntry, ok := stateEntries[eventID]
|
||||||
|
if !ok {
|
||||||
|
response.Memberships[eventID] = []*gomatrixserverlib.HeaderedEvent{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
memberships, err := helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
memberships, err := helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to get memberships at state: %w", err)
|
return fmt.Errorf("unable to get memberships at state: %w", err)
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
@ -134,11 +135,14 @@ func (v *StateResolution) LoadMembershipAtEvent(
|
||||||
for i := range eventIDs {
|
for i := range eventIDs {
|
||||||
eventID := eventIDs[i]
|
eventID := eventIDs[i]
|
||||||
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
||||||
if err != nil {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err)
|
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err)
|
||||||
}
|
}
|
||||||
if snapshotNID == 0 {
|
if snapshotNID == 0 {
|
||||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID)
|
// If we don't know a state snapshot for this event then we can't calculate
|
||||||
|
// memberships at the time of the event, so skip over it. This means that
|
||||||
|
// it isn't guaranteed that the response map will contain every single event.
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID)
|
snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,8 +21,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
|
|
||||||
|
|
@ -79,8 +77,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||||
// not a fatal error, log and continue
|
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -87,8 +86,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||||
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
|
||||||
// not a fatal error, log and continue
|
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||||
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,16 +19,17 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/getsentry/sentry-go"
|
"github.com/getsentry/sentry-go"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"github.com/matrix-org/util"
|
|
||||||
"github.com/nats-io/nats.go"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
|
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
|
||||||
|
|
@ -79,16 +80,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
||||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if domain != s.serverName {
|
if domain != s.serverName {
|
||||||
|
log.Tracef("ignoring send-to-device event with destination %s", domain)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
var output types.OutputSendToDeviceEvent
|
var output types.OutputSendToDeviceEvent
|
||||||
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("output log: message parse failure")
|
log.WithError(err).Errorf("send-to-device: message parse failure")
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
@ -105,7 +108,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
log.WithError(err).Errorf("failed to store send-to-device message")
|
log.WithError(err).Errorf("send-to-device: failed to store message")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -388,7 +388,7 @@ func applyHistoryVisibilityFilter(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Not a fatal error, we can continue without the stateEvents,
|
// Not a fatal error, we can continue without the stateEvents,
|
||||||
// they are only needed if there are state events in the timeline.
|
// they are only needed if there are state events in the timeline.
|
||||||
logrus.WithError(err).Warnf("failed to get current room state")
|
logrus.WithError(err).Warnf("Failed to get current room state for history visibility")
|
||||||
}
|
}
|
||||||
alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents))
|
alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents))
|
||||||
for _, ev := range stateEvents {
|
for _, ev := range stateEvents {
|
||||||
|
|
@ -397,7 +397,6 @@ func applyHistoryVisibilityFilter(
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
|
events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
logrus.WithFields(logrus.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
|
|
@ -405,7 +404,7 @@ func applyHistoryVisibilityFilter(
|
||||||
"room_id": roomID,
|
"room_id": roomID,
|
||||||
"before": len(recentEvents),
|
"before": len(recentEvents),
|
||||||
"after": len(events),
|
"after": len(events),
|
||||||
}).Debug("applied history visibility (sync)")
|
}).Trace("Applied history visibility (sync)")
|
||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -624,9 +624,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
|
||||||
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
|
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
|
||||||
for i := 0; i < tc.sendMessagesCount; i++ {
|
for i := 0; i < tc.sendMessagesCount; i++ {
|
||||||
msgCounter++
|
msgCounter++
|
||||||
msg := map[string]string{
|
msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
|
||||||
"dummy": fmt.Sprintf("message %d", msgCounter),
|
|
||||||
}
|
|
||||||
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
|
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
|
||||||
t.Fatalf("unable to send to device message: %v", err)
|
t.Fatalf("unable to send to device message: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue