Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/usagestats

This commit is contained in:
Till Faelligen 2022-10-27 06:49:13 +02:00
commit 89429908f6
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
12 changed files with 256 additions and 175 deletions

View file

@ -0,0 +1,25 @@
FROM docker.io/golang:1.19-alpine AS base
RUN apk --update --no-cache add bash build-base
WORKDIR /build
COPY . /build
RUN mkdir -p bin
RUN go build -trimpath -o bin/ ./cmd/dendrite-demo-yggdrasil
RUN go build -trimpath -o bin/ ./cmd/create-account
RUN go build -trimpath -o bin/ ./cmd/generate-keys
FROM alpine:latest
LABEL org.opencontainers.image.title="Dendrite (Yggdrasil demo)"
LABEL org.opencontainers.image.description="Next-generation Matrix homeserver written in Go"
LABEL org.opencontainers.image.source="https://github.com/matrix-org/dendrite"
LABEL org.opencontainers.image.licenses="Apache-2.0"
COPY --from=base /build/bin/* /usr/bin/
VOLUME /etc/dendrite
WORKDIR /etc/dendrite
ENTRYPOINT ["/usr/bin/dendrite-demo-yggdrasil"]

View file

@ -101,19 +101,47 @@ func (m *DendriteMonolith) SessionCount() int {
return len(m.PineconeQUIC.Protocol("matrix").Sessions()) return len(m.PineconeQUIC.Protocol("matrix").Sessions())
} }
func (m *DendriteMonolith) RegisterNetworkInterface(name string, index int, mtu int, up bool, broadcast bool, loopback bool, pointToPoint bool, multicast bool, addrs string) { type InterfaceInfo struct {
m.PineconeMulticast.RegisterInterface(pineconeMulticast.InterfaceInfo{ Name string
Name: name, Index int
Index: index, Mtu int
Mtu: mtu, Up bool
Up: up, Broadcast bool
Broadcast: broadcast, Loopback bool
Loopback: loopback, PointToPoint bool
PointToPoint: pointToPoint, Multicast bool
Multicast: multicast, Addrs string
Addrs: addrs, }
type InterfaceRetriever interface {
CacheCurrentInterfaces() int
GetCachedInterface(index int) *InterfaceInfo
}
func (m *DendriteMonolith) RegisterNetworkCallback(intfCallback InterfaceRetriever) {
callback := func() []pineconeMulticast.InterfaceInfo {
count := intfCallback.CacheCurrentInterfaces()
intfs := []pineconeMulticast.InterfaceInfo{}
for i := 0; i < count; i++ {
iface := intfCallback.GetCachedInterface(i)
if iface != nil {
intfs = append(intfs, pineconeMulticast.InterfaceInfo{
Name: iface.Name,
Index: iface.Index,
Mtu: iface.Mtu,
Up: iface.Up,
Broadcast: iface.Broadcast,
Loopback: iface.Loopback,
PointToPoint: iface.PointToPoint,
Multicast: iface.Multicast,
Addrs: iface.Addrs,
}) })
} }
}
return intfs
}
m.PineconeMulticast.RegisterNetworkCallback(callback)
}
func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) { func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) {
if enabled { if enabled {

View file

@ -20,6 +20,7 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"net" "net"
"regexp"
"strings" "strings"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -27,9 +28,9 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
ironwoodtypes "github.com/Arceliar/ironwood/types" ironwoodtypes "github.com/Arceliar/ironwood/types"
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/core"
yggdrasilcore "github.com/yggdrasil-network/yggdrasil-go/src/core" yggdrasilcore "github.com/yggdrasil-network/yggdrasil-go/src/core"
yggdrasildefaults "github.com/yggdrasil-network/yggdrasil-go/src/defaults" "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast" yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
gologme "github.com/gologme/log" gologme "github.com/gologme/log"
@ -37,7 +38,6 @@ import (
type Node struct { type Node struct {
core *yggdrasilcore.Core core *yggdrasilcore.Core
config *yggdrasilconfig.NodeConfig
multicast *yggdrasilmulticast.Multicast multicast *yggdrasilmulticast.Multicast
log *gologme.Logger log *gologme.Logger
utpSocket *utp.Socket utpSocket *utp.Socket
@ -57,16 +57,17 @@ func (n *Node) DialerContext(ctx context.Context, _, address string) (net.Conn,
func Setup(sk ed25519.PrivateKey, instanceName, storageDirectory, peerURI, listenURI string) (*Node, error) { func Setup(sk ed25519.PrivateKey, instanceName, storageDirectory, peerURI, listenURI string) (*Node, error) {
n := &Node{ n := &Node{
core: &yggdrasilcore.Core{},
config: yggdrasildefaults.GenerateConfig(),
multicast: &yggdrasilmulticast.Multicast{},
log: gologme.New(logrus.StandardLogger().Writer(), "", 0), log: gologme.New(logrus.StandardLogger().Writer(), "", 0),
incoming: make(chan net.Conn), incoming: make(chan net.Conn),
} }
options := []yggdrasilcore.SetupOption{ n.log.EnableLevel("error")
yggdrasilcore.AdminListenAddress("none"), n.log.EnableLevel("warn")
} n.log.EnableLevel("info")
{
var err error
options := []yggdrasilcore.SetupOption{}
if listenURI != "" { if listenURI != "" {
options = append(options, yggdrasilcore.ListenAddress(listenURI)) options = append(options, yggdrasilcore.ListenAddress(listenURI))
} }
@ -77,23 +78,31 @@ func Setup(sk ed25519.PrivateKey, instanceName, storageDirectory, peerURI, liste
}) })
} }
} }
if n.core, err = core.New(sk[:], n.log, options...); err != nil {
var err error
if n.core, err = yggdrasilcore.New(sk, options...); err != nil {
panic(err) panic(err)
} }
n.log.EnableLevel("error")
n.log.EnableLevel("warn")
n.log.EnableLevel("info")
n.core.SetLogger(n.log) n.core.SetLogger(n.log)
if n.utpSocket, err = utp.NewSocketFromPacketConnNoClose(n.core); err != nil { if n.utpSocket, err = utp.NewSocketFromPacketConnNoClose(n.core); err != nil {
panic(err) panic(err)
} }
if err = n.multicast.Init(n.core, n.config, n.log, nil); err != nil { }
// Setup the multicast module.
{
var err error
options := []multicast.SetupOption{
multicast.MulticastInterface{
Regex: regexp.MustCompile(".*"),
Beacon: true,
Listen: true,
Port: 0,
Priority: 0,
},
}
if n.multicast, err = multicast.New(n.core, n.log, options...); err != nil {
panic(err) panic(err)
} }
if err = n.multicast.Start(); err != nil {
panic(err)
} }
n.log.Printf("Public key: %x", n.core.PublicKey()) n.log.Printf("Public key: %x", n.core.PublicKey())
@ -114,14 +123,7 @@ func (n *Node) DerivedServerName() string {
} }
func (n *Node) PrivateKey() ed25519.PrivateKey { func (n *Node) PrivateKey() ed25519.PrivateKey {
sk := make(ed25519.PrivateKey, ed25519.PrivateKeySize) return n.core.PrivateKey()
sb, err := hex.DecodeString(n.config.PrivateKey)
if err == nil {
copy(sk, sb[:])
} else {
panic(err)
}
return sk
} }
func (n *Node) PublicKey() ed25519.PublicKey { func (n *Node) PublicKey() ed25519.PublicKey {

View file

@ -40,7 +40,7 @@ type KeyChangeConsumer struct {
durable string durable string
db storage.Database db storage.Database
queues *queue.OutgoingQueues queues *queue.OutgoingQueues
serverName gomatrixserverlib.ServerName isLocalServerName func(gomatrixserverlib.ServerName) bool
rsAPI roomserverAPI.FederationRoomserverAPI rsAPI roomserverAPI.FederationRoomserverAPI
topic string topic string
} }
@ -61,7 +61,7 @@ func NewKeyChangeConsumer(
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
queues: queues, queues: queues,
db: store, db: store,
serverName: cfg.Matrix.ServerName, isLocalServerName: cfg.Matrix.IsLocalServerName,
rsAPI: rsAPI, rsAPI: rsAPI,
} }
} }
@ -112,7 +112,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
logger.WithError(err).Error("Failed to extract domain from key change event") logger.WithError(err).Error("Failed to extract domain from key change event")
return true return true
} }
if originServerName != t.serverName { if !t.isLocalServerName(originServerName) {
return true return true
} }
@ -141,7 +141,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
// Pack the EDU and marshal it // Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{ edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MDeviceListUpdate, Type: gomatrixserverlib.MDeviceListUpdate,
Origin: string(t.serverName), Origin: string(originServerName),
} }
event := gomatrixserverlib.DeviceListUpdateEvent{ event := gomatrixserverlib.DeviceListUpdateEvent{
UserID: m.UserID, UserID: m.UserID,
@ -159,7 +159,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
} }
logger.Debugf("Sending device list update message to %q", destinations) logger.Debugf("Sending device list update message to %q", destinations)
err = t.queues.SendEDU(edu, t.serverName, destinations) err = t.queues.SendEDU(edu, originServerName, destinations)
return err == nil return err == nil
} }
@ -171,7 +171,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure") logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
return true return true
} }
if host != gomatrixserverlib.ServerName(t.serverName) { if !t.isLocalServerName(host) {
// Ignore any messages that didn't originate locally, otherwise we'll // Ignore any messages that didn't originate locally, otherwise we'll
// end up parroting information we received from other servers. // end up parroting information we received from other servers.
return true return true
@ -203,7 +203,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
// Pack the EDU and marshal it // Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{ edu := &gomatrixserverlib.EDU{
Type: types.MSigningKeyUpdate, Type: types.MSigningKeyUpdate,
Origin: string(t.serverName), Origin: string(host),
} }
if edu.Content, err = json.Marshal(output); err != nil { if edu.Content, err = json.Marshal(output); err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
@ -212,7 +212,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
} }
logger.Debugf("Sending cross-signing update message to %q", destinations) logger.Debugf("Sending cross-signing update message to %q", destinations)
err = t.queues.SendEDU(edu, t.serverName, destinations) err = t.queues.SendEDU(edu, host, destinations)
return err == nil return err == nil
} }

View file

@ -38,7 +38,7 @@ type OutputPresenceConsumer struct {
durable string durable string
db storage.Database db storage.Database
queues *queue.OutgoingQueues queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName isLocalServerName func(gomatrixserverlib.ServerName) bool
topic string topic string
outboundPresenceEnabled bool outboundPresenceEnabled bool
} }
@ -56,7 +56,7 @@ func NewOutputPresenceConsumer(
jetstream: js, jetstream: js,
queues: queues, queues: queues,
db: store, db: store,
ServerName: cfg.Matrix.ServerName, isLocalServerName: cfg.Matrix.IsLocalServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"), durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound, outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound,
@ -85,7 +85,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
log.WithError(err).WithField("user_id", userID).Error("failed to extract domain from receipt sender") log.WithError(err).WithField("user_id", userID).Error("failed to extract domain from receipt sender")
return true return true
} }
if serverName != t.ServerName { if !t.isLocalServerName(serverName) {
return true return true
} }
@ -127,7 +127,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
edu := &gomatrixserverlib.EDU{ edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MPresence, Type: gomatrixserverlib.MPresence,
Origin: string(t.ServerName), Origin: string(serverName),
} }
if edu.Content, err = json.Marshal(content); err != nil { if edu.Content, err = json.Marshal(content); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON") log.WithError(err).Error("failed to marshal EDU JSON")
@ -135,7 +135,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
} }
log.Tracef("sending presence EDU to %d servers", len(joined)) log.Tracef("sending presence EDU to %d servers", len(joined))
if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil { if err = t.queues.SendEDU(edu, serverName, joined); err != nil {
log.WithError(err).Error("failed to send EDU") log.WithError(err).Error("failed to send EDU")
return false return false
} }

View file

@ -39,7 +39,7 @@ type OutputReceiptConsumer struct {
durable string durable string
db storage.Database db storage.Database
queues *queue.OutgoingQueues queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName isLocalServerName func(gomatrixserverlib.ServerName) bool
topic string topic string
} }
@ -56,7 +56,7 @@ func NewOutputReceiptConsumer(
jetstream: js, jetstream: js,
queues: queues, queues: queues,
db: store, db: store,
ServerName: cfg.Matrix.ServerName, isLocalServerName: cfg.Matrix.IsLocalServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"), durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
} }
@ -95,7 +95,7 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender") log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
return true return true
} }
if receiptServerName != t.ServerName { if !t.isLocalServerName(receiptServerName) {
return true return true
} }
@ -134,14 +134,14 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
edu := &gomatrixserverlib.EDU{ edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MReceipt, Type: gomatrixserverlib.MReceipt,
Origin: string(t.ServerName), Origin: string(receiptServerName),
} }
if edu.Content, err = json.Marshal(content); err != nil { if edu.Content, err = json.Marshal(content); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON") log.WithError(err).Error("failed to marshal EDU JSON")
return true return true
} }
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { if err := t.queues.SendEDU(edu, receiptServerName, names); err != nil {
log.WithError(err).Error("failed to send EDU") log.WithError(err).Error("failed to send EDU")
return false return false
} }

View file

@ -39,7 +39,7 @@ type OutputSendToDeviceConsumer struct {
durable string durable string
db storage.Database db storage.Database
queues *queue.OutgoingQueues queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName isLocalServerName func(gomatrixserverlib.ServerName) bool
topic string topic string
} }
@ -56,7 +56,7 @@ func NewOutputSendToDeviceConsumer(
jetstream: js, jetstream: js,
queues: queues, queues: queues,
db: store, db: store,
ServerName: cfg.Matrix.ServerName, isLocalServerName: cfg.Matrix.IsLocalServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"), durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
} }
@ -82,7 +82,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender") log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender")
return true return true
} }
if originServerName != t.ServerName { if !t.isLocalServerName(originServerName) {
return true return true
} }
// Extract the send-to-device event from msg. // Extract the send-to-device event from msg.
@ -101,14 +101,14 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
} }
// The SyncAPI is already handling sendToDevice for the local server // The SyncAPI is already handling sendToDevice for the local server
if destServerName == t.ServerName { if t.isLocalServerName(destServerName) {
return true return true
} }
// Pack the EDU and marshal it // Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{ edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MDirectToDevice, Type: gomatrixserverlib.MDirectToDevice,
Origin: string(t.ServerName), Origin: string(originServerName),
} }
tdm := gomatrixserverlib.ToDeviceMessage{ tdm := gomatrixserverlib.ToDeviceMessage{
Sender: ote.Sender, Sender: ote.Sender,
@ -127,7 +127,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
} }
log.Debugf("Sending send-to-device message into %q destination queue", destServerName) log.Debugf("Sending send-to-device message into %q destination queue", destServerName)
if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil { if err := t.queues.SendEDU(edu, originServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
log.WithError(err).Error("failed to send EDU") log.WithError(err).Error("failed to send EDU")
return false return false
} }

View file

@ -36,7 +36,7 @@ type OutputTypingConsumer struct {
durable string durable string
db storage.Database db storage.Database
queues *queue.OutgoingQueues queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName isLocalServerName func(gomatrixserverlib.ServerName) bool
topic string topic string
} }
@ -53,7 +53,7 @@ func NewOutputTypingConsumer(
jetstream: js, jetstream: js,
queues: queues, queues: queues,
db: store, db: store,
ServerName: cfg.Matrix.ServerName, isLocalServerName: cfg.Matrix.IsLocalServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"), durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
} }
@ -87,7 +87,7 @@ func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
_ = msg.Ack() _ = msg.Ack()
return true return true
} }
if typingServerName != t.ServerName { if !t.isLocalServerName(typingServerName) {
return true return true
} }
@ -111,7 +111,7 @@ func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
log.WithError(err).Error("failed to marshal EDU JSON") log.WithError(err).Error("failed to marshal EDU JSON")
return true return true
} }
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { if err := t.queues.SendEDU(edu, typingServerName, names); err != nil {
log.WithError(err).Error("failed to send EDU") log.WithError(err).Error("failed to send EDU")
return false return false
} }

View file

@ -76,6 +76,9 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
return return
} }
// Check if the destination is blacklisted. If it isn't then wake
// up the queue.
if !oq.statistics.Blacklisted() {
// If there's room in memory to hold the event then add it to the // If there's room in memory to hold the event then add it to the
// list. // list.
oq.pendingMutex.Lock() oq.pendingMutex.Lock()
@ -93,6 +96,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
oq.wakeQueueAndNotify() oq.wakeQueueAndNotify()
} }
} }
}
// sendEDU adds the EDU event to the pending queue for the destination. // sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to // If the queue is empty then it starts a background goroutine to
@ -103,6 +107,9 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
return return
} }
// Check if the destination is blacklisted. If it isn't then wake
// up the queue.
if !oq.statistics.Blacklisted() {
// If there's room in memory to hold the event then add it to the // If there's room in memory to hold the event then add it to the
// list. // list.
oq.pendingMutex.Lock() oq.pendingMutex.Lock()
@ -120,6 +127,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
oq.wakeQueueAndNotify() oq.wakeQueueAndNotify()
} }
} }
}
// handleBackoffNotifier is registered as the backoff notification // handleBackoffNotifier is registered as the backoff notification
// callback with Statistics. It will wakeup and notify the queue // callback with Statistics. It will wakeup and notify the queue

View file

@ -247,9 +247,10 @@ func (oqs *OutgoingQueues) SendEvent(
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err) return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
} }
destQueues := make([]*destinationQueue, 0, len(destmap))
for destination := range destmap { for destination := range destmap {
if queue := oqs.getQueue(destination); queue != nil && !queue.statistics.Blacklisted() { if queue := oqs.getQueue(destination); queue != nil {
queue.sendEvent(ev, nid) destQueues = append(destQueues, queue)
} else { } else {
delete(destmap, destination) delete(destmap, destination)
} }
@ -267,6 +268,14 @@ func (oqs *OutgoingQueues) SendEvent(
return err return err
} }
// NOTE : PDUs should be associated with destinations before sending
// them, otherwise this is technically a race.
// If the send completes before they are associated then they won't
// get properly cleaned up in the database.
for _, queue := range destQueues {
queue.sendEvent(ev, nid)
}
return nil return nil
} }
@ -335,20 +344,21 @@ func (oqs *OutgoingQueues) SendEDU(
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err) return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
} }
destQueues := make([]*destinationQueue, 0, len(destmap))
for destination := range destmap { for destination := range destmap {
if queue := oqs.getQueue(destination); queue != nil && !queue.statistics.Blacklisted() { if queue := oqs.getQueue(destination); queue != nil {
queue.sendEDU(e, nid) destQueues = append(destQueues, queue)
} else { } else {
delete(destmap, destination) delete(destmap, destination)
} }
} }
// Create a database entry that associates the given PDU NID with // Create a database entry that associates the given PDU NID with
// this destination queue. We'll then be able to retrieve the PDU // these destination queues. We'll then be able to retrieve the PDU
// later. // later.
if err := oqs.db.AssociateEDUWithDestinations( if err := oqs.db.AssociateEDUWithDestinations(
oqs.process.Context(), oqs.process.Context(),
destmap, // the destination server name destmap, // the destination server names
nid, // NIDs from federationapi_queue_json table nid, // NIDs from federationapi_queue_json table
e.Type, e.Type,
nil, // this will use the default expireEDUTypes map nil, // this will use the default expireEDUTypes map
@ -357,6 +367,14 @@ func (oqs *OutgoingQueues) SendEDU(
return err return err
} }
// NOTE : EDUs should be associated with destinations before sending
// them, otherwise this is technically a race.
// If the send completes before they are associated then they won't
// get properly cleaned up in the database.
for _, queue := range destQueues {
queue.sendEDU(e, nid)
}
return nil return nil
} }

6
go.mod
View file

@ -1,7 +1,7 @@
module github.com/matrix-org/dendrite module github.com/matrix-org/dendrite
require ( require (
github.com/Arceliar/ironwood v0.0.0-20220903132624-ee60c16bcfcf github.com/Arceliar/ironwood v0.0.0-20221025225125-45b4281814c2
github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979 github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979
github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/MFAshby/stdemuxerhook v1.0.0 github.com/MFAshby/stdemuxerhook v1.0.0
@ -23,7 +23,7 @@ require (
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530
github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa
github.com/matrix-org/pinecone v0.0.0-20221007145426-3adc85477dd3 github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.15 github.com/mattn/go-sqlite3 v1.14.15
github.com/nats-io/nats-server/v2 v2.9.3 github.com/nats-io/nats-server/v2 v2.9.3
@ -41,7 +41,7 @@ require (
github.com/tidwall/sjson v1.2.5 github.com/tidwall/sjson v1.2.5
github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c github.com/yggdrasil-network/yggdrasil-go v0.4.6
go.uber.org/atomic v1.10.0 go.uber.org/atomic v1.10.0
golang.org/x/crypto v0.0.0-20221012134737-56aed061732a golang.org/x/crypto v0.0.0-20221012134737-56aed061732a
golang.org/x/image v0.0.0-20220902085622-e7cb96979f69 golang.org/x/image v0.0.0-20220902085622-e7cb96979f69

12
go.sum
View file

@ -38,8 +38,8 @@ dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBr
dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1:a1inKt/atXimZ4Mv927x+r7UpyzRUf4emIoiiSC2TN4= dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1:a1inKt/atXimZ4Mv927x+r7UpyzRUf4emIoiiSC2TN4=
dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU= dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/Arceliar/ironwood v0.0.0-20220903132624-ee60c16bcfcf h1:kjPkmDHUTWUma/4tqDl208bOk3jsUEqOJA6TsMZo5Jk= github.com/Arceliar/ironwood v0.0.0-20221025225125-45b4281814c2 h1:Usab30pNT2i/vZvpXcN9uOr5IO1RZPcUqoGH0DIAPnU=
github.com/Arceliar/ironwood v0.0.0-20220903132624-ee60c16bcfcf/go.mod h1:RP72rucOFm5udrnEzTmIWLRVGQiV/fSUAQXJ0RST/nk= github.com/Arceliar/ironwood v0.0.0-20221025225125-45b4281814c2/go.mod h1:RP72rucOFm5udrnEzTmIWLRVGQiV/fSUAQXJ0RST/nk=
github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979 h1:WndgpSW13S32VLQ3ugUxx2EnnWmgba1kCqPkd4Gk1yQ= github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979 h1:WndgpSW13S32VLQ3ugUxx2EnnWmgba1kCqPkd4Gk1yQ=
github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI= github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
@ -389,8 +389,8 @@ github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa h1:S98DShDv3sn7O4n4HjtJOejypseYVpv1R/XPg+cDnfI= github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa h1:S98DShDv3sn7O4n4HjtJOejypseYVpv1R/XPg+cDnfI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4= github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4=
github.com/matrix-org/pinecone v0.0.0-20221007145426-3adc85477dd3 h1:lzkSQvBv8TuqKJCPoVwOVvEnARTlua5rrNy/Qw2Vxeo= github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6 h1:nAT5w41Q9uWTSnpKW55/hBwP91j2IFYPDRs0jJ8TyFI=
github.com/matrix-org/pinecone v0.0.0-20221007145426-3adc85477dd3/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
@ -592,8 +592,8 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c h1:/cTmA6pV2Z20BT/FGSmnb5BmJ8eRbDP0HbCB5IO1aKw= github.com/yggdrasil-network/yggdrasil-go v0.4.6 h1:GALUDV9QPz/5FVkbazpkTc9EABHufA556JwUJZr41j4=
github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c/go.mod h1:cIwhYwX9yT9Bcei59O0oOBSaj+kQP+9aVQUMWHh5R00= github.com/yggdrasil-network/yggdrasil-go v0.4.6/go.mod h1:PBMoAOvQjA9geNEeGyMXA9QgCS6Bu+9V+1VkWM84wpw=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=