From 5298dd1133948606172944b7e5ec3805ccc72644 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 26 Oct 2022 14:52:33 +0100 Subject: [PATCH 1/4] Update federation API consumers --- federationapi/consumers/keychange.go | 44 ++++++++++++------------- federationapi/consumers/presence.go | 10 +++--- federationapi/consumers/receipts.go | 34 +++++++++---------- federationapi/consumers/sendtodevice.go | 36 ++++++++++---------- federationapi/consumers/typing.go | 32 +++++++++--------- 5 files changed, 78 insertions(+), 78 deletions(-) diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go index 67dfdc1d3..7d1ae0f81 100644 --- a/federationapi/consumers/keychange.go +++ b/federationapi/consumers/keychange.go @@ -35,14 +35,14 @@ import ( // KeyChangeConsumer consumes events that originate in key server. type KeyChangeConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - db storage.Database - queues *queue.OutgoingQueues - serverName gomatrixserverlib.ServerName - rsAPI roomserverAPI.FederationRoomserverAPI - topic string + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + isLocalServerName func(gomatrixserverlib.ServerName) bool + rsAPI roomserverAPI.FederationRoomserverAPI + topic string } // NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers. @@ -55,14 +55,14 @@ func NewKeyChangeConsumer( rsAPI roomserverAPI.FederationRoomserverAPI, ) *KeyChangeConsumer { return &KeyChangeConsumer{ - ctx: process.Context(), - jetstream: js, - durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"), - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), - queues: queues, - db: store, - serverName: cfg.Matrix.ServerName, - rsAPI: rsAPI, + ctx: process.Context(), + jetstream: js, + durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), + queues: queues, + db: store, + isLocalServerName: cfg.Matrix.IsLocalServerName, + rsAPI: rsAPI, } } @@ -112,7 +112,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool { logger.WithError(err).Error("Failed to extract domain from key change event") return true } - if originServerName != t.serverName { + if !t.isLocalServerName(originServerName) { return true } @@ -141,7 +141,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool { // Pack the EDU and marshal it edu := &gomatrixserverlib.EDU{ Type: gomatrixserverlib.MDeviceListUpdate, - Origin: string(t.serverName), + Origin: string(originServerName), } event := gomatrixserverlib.DeviceListUpdateEvent{ UserID: m.UserID, @@ -159,7 +159,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool { } logger.Debugf("Sending device list update message to %q", destinations) - err = t.queues.SendEDU(edu, t.serverName, destinations) + err = t.queues.SendEDU(edu, originServerName, destinations) return err == nil } @@ -171,7 +171,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure") return true } - if host != gomatrixserverlib.ServerName(t.serverName) { + if !t.isLocalServerName(host) { // Ignore any messages that didn't originate locally, otherwise we'll // end up parroting information we received from other servers. return true @@ -203,7 +203,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { // Pack the EDU and marshal it edu := &gomatrixserverlib.EDU{ Type: types.MSigningKeyUpdate, - Origin: string(t.serverName), + Origin: string(host), } if edu.Content, err = json.Marshal(output); err != nil { sentry.CaptureException(err) @@ -212,7 +212,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { } logger.Debugf("Sending cross-signing update message to %q", destinations) - err = t.queues.SendEDU(edu, t.serverName, destinations) + err = t.queues.SendEDU(edu, host, destinations) return err == nil } diff --git a/federationapi/consumers/presence.go b/federationapi/consumers/presence.go index e76103cd3..3445d34a9 100644 --- a/federationapi/consumers/presence.go +++ b/federationapi/consumers/presence.go @@ -38,7 +38,7 @@ type OutputPresenceConsumer struct { durable string db storage.Database queues *queue.OutgoingQueues - ServerName gomatrixserverlib.ServerName + isLocalServerName func(gomatrixserverlib.ServerName) bool topic string outboundPresenceEnabled bool } @@ -56,7 +56,7 @@ func NewOutputPresenceConsumer( jetstream: js, queues: queues, db: store, - ServerName: cfg.Matrix.ServerName, + isLocalServerName: cfg.Matrix.IsLocalServerName, durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound, @@ -85,7 +85,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg log.WithError(err).WithField("user_id", userID).Error("failed to extract domain from receipt sender") return true } - if serverName != t.ServerName { + if !t.isLocalServerName(serverName) { return true } @@ -127,7 +127,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg edu := &gomatrixserverlib.EDU{ Type: gomatrixserverlib.MPresence, - Origin: string(t.ServerName), + Origin: string(serverName), } if edu.Content, err = json.Marshal(content); err != nil { log.WithError(err).Error("failed to marshal EDU JSON") @@ -135,7 +135,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg } log.Tracef("sending presence EDU to %d servers", len(joined)) - if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil { + if err = t.queues.SendEDU(edu, serverName, joined); err != nil { log.WithError(err).Error("failed to send EDU") return false } diff --git a/federationapi/consumers/receipts.go b/federationapi/consumers/receipts.go index 75827cb68..200c06e6c 100644 --- a/federationapi/consumers/receipts.go +++ b/federationapi/consumers/receipts.go @@ -34,13 +34,13 @@ import ( // OutputReceiptConsumer consumes events that originate in the clientapi. type OutputReceiptConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - db storage.Database - queues *queue.OutgoingQueues - ServerName gomatrixserverlib.ServerName - topic string + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + isLocalServerName func(gomatrixserverlib.ServerName) bool + topic string } // NewOutputReceiptConsumer creates a new OutputReceiptConsumer. Call Start() to begin consuming typing events. @@ -52,13 +52,13 @@ func NewOutputReceiptConsumer( store storage.Database, ) *OutputReceiptConsumer { return &OutputReceiptConsumer{ - ctx: process.Context(), - jetstream: js, - queues: queues, - db: store, - ServerName: cfg.Matrix.ServerName, - durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"), - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), + ctx: process.Context(), + jetstream: js, + queues: queues, + db: store, + isLocalServerName: cfg.Matrix.IsLocalServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), } } @@ -95,7 +95,7 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender") return true } - if receiptServerName != t.ServerName { + if !t.isLocalServerName(receiptServerName) { return true } @@ -134,14 +134,14 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) edu := &gomatrixserverlib.EDU{ Type: gomatrixserverlib.MReceipt, - Origin: string(t.ServerName), + Origin: string(receiptServerName), } if edu.Content, err = json.Marshal(content); err != nil { log.WithError(err).Error("failed to marshal EDU JSON") return true } - if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { + if err := t.queues.SendEDU(edu, receiptServerName, names); err != nil { log.WithError(err).Error("failed to send EDU") return false } diff --git a/federationapi/consumers/sendtodevice.go b/federationapi/consumers/sendtodevice.go index 9aec22a3e..9620d1612 100644 --- a/federationapi/consumers/sendtodevice.go +++ b/federationapi/consumers/sendtodevice.go @@ -34,13 +34,13 @@ import ( // OutputSendToDeviceConsumer consumes events that originate in the clientapi. type OutputSendToDeviceConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - db storage.Database - queues *queue.OutgoingQueues - ServerName gomatrixserverlib.ServerName - topic string + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + isLocalServerName func(gomatrixserverlib.ServerName) bool + topic string } // NewOutputSendToDeviceConsumer creates a new OutputSendToDeviceConsumer. Call Start() to begin consuming send-to-device events. @@ -52,13 +52,13 @@ func NewOutputSendToDeviceConsumer( store storage.Database, ) *OutputSendToDeviceConsumer { return &OutputSendToDeviceConsumer{ - ctx: process.Context(), - jetstream: js, - queues: queues, - db: store, - ServerName: cfg.Matrix.ServerName, - durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"), - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + ctx: process.Context(), + jetstream: js, + queues: queues, + db: store, + isLocalServerName: cfg.Matrix.IsLocalServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), } } @@ -82,7 +82,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender") return true } - if originServerName != t.ServerName { + if !t.isLocalServerName(originServerName) { return true } // Extract the send-to-device event from msg. @@ -101,14 +101,14 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats } // The SyncAPI is already handling sendToDevice for the local server - if destServerName == t.ServerName { + if t.isLocalServerName(destServerName) { return true } // Pack the EDU and marshal it edu := &gomatrixserverlib.EDU{ Type: gomatrixserverlib.MDirectToDevice, - Origin: string(t.ServerName), + Origin: string(originServerName), } tdm := gomatrixserverlib.ToDeviceMessage{ Sender: ote.Sender, @@ -127,7 +127,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats } log.Debugf("Sending send-to-device message into %q destination queue", destServerName) - if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil { + if err := t.queues.SendEDU(edu, originServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil { log.WithError(err).Error("failed to send EDU") return false } diff --git a/federationapi/consumers/typing.go b/federationapi/consumers/typing.go index 9c7379136..c66f97519 100644 --- a/federationapi/consumers/typing.go +++ b/federationapi/consumers/typing.go @@ -31,13 +31,13 @@ import ( // OutputTypingConsumer consumes events that originate in the clientapi. type OutputTypingConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - db storage.Database - queues *queue.OutgoingQueues - ServerName gomatrixserverlib.ServerName - topic string + ctx context.Context + jetstream nats.JetStreamContext + durable string + db storage.Database + queues *queue.OutgoingQueues + isLocalServerName func(gomatrixserverlib.ServerName) bool + topic string } // NewOutputTypingConsumer creates a new OutputTypingConsumer. Call Start() to begin consuming typing events. @@ -49,13 +49,13 @@ func NewOutputTypingConsumer( store storage.Database, ) *OutputTypingConsumer { return &OutputTypingConsumer{ - ctx: process.Context(), - jetstream: js, - queues: queues, - db: store, - ServerName: cfg.Matrix.ServerName, - durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"), - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + ctx: process.Context(), + jetstream: js, + queues: queues, + db: store, + isLocalServerName: cfg.Matrix.IsLocalServerName, + durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), } } @@ -87,7 +87,7 @@ func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) _ = msg.Ack() return true } - if typingServerName != t.ServerName { + if !t.isLocalServerName(typingServerName) { return true } @@ -111,7 +111,7 @@ func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) log.WithError(err).Error("failed to marshal EDU JSON") return true } - if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { + if err := t.queues.SendEDU(edu, typingServerName, names); err != nil { log.WithError(err).Error("failed to send EDU") return false } From a74aea07144615f20a71f1c2f62f4ba0946d7b54 Mon Sep 17 00:00:00 2001 From: devonh Date: Wed, 26 Oct 2022 16:25:57 +0000 Subject: [PATCH 2/4] Add network interface callback to pinecone build (#2825) Co-authored-by: Neil Alexander --- build/gobind-pinecone/monolith.go | 52 ++++++++++++++++++++++++------- go.mod | 2 +- go.sum | 4 +-- 3 files changed, 43 insertions(+), 15 deletions(-) diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 4a96e4bef..adb4e40a6 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -101,18 +101,46 @@ func (m *DendriteMonolith) SessionCount() int { return len(m.PineconeQUIC.Protocol("matrix").Sessions()) } -func (m *DendriteMonolith) RegisterNetworkInterface(name string, index int, mtu int, up bool, broadcast bool, loopback bool, pointToPoint bool, multicast bool, addrs string) { - m.PineconeMulticast.RegisterInterface(pineconeMulticast.InterfaceInfo{ - Name: name, - Index: index, - Mtu: mtu, - Up: up, - Broadcast: broadcast, - Loopback: loopback, - PointToPoint: pointToPoint, - Multicast: multicast, - Addrs: addrs, - }) +type InterfaceInfo struct { + Name string + Index int + Mtu int + Up bool + Broadcast bool + Loopback bool + PointToPoint bool + Multicast bool + Addrs string +} + +type InterfaceRetriever interface { + CacheCurrentInterfaces() int + GetCachedInterface(index int) *InterfaceInfo +} + +func (m *DendriteMonolith) RegisterNetworkCallback(intfCallback InterfaceRetriever) { + callback := func() []pineconeMulticast.InterfaceInfo { + count := intfCallback.CacheCurrentInterfaces() + intfs := []pineconeMulticast.InterfaceInfo{} + for i := 0; i < count; i++ { + iface := intfCallback.GetCachedInterface(i) + if iface != nil { + intfs = append(intfs, pineconeMulticast.InterfaceInfo{ + Name: iface.Name, + Index: iface.Index, + Mtu: iface.Mtu, + Up: iface.Up, + Broadcast: iface.Broadcast, + Loopback: iface.Loopback, + PointToPoint: iface.PointToPoint, + Multicast: iface.Multicast, + Addrs: iface.Addrs, + }) + } + } + return intfs + } + m.PineconeMulticast.RegisterNetworkCallback(callback) } func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) { diff --git a/go.mod b/go.mod index 39dfb0fe1..be5099fcf 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa - github.com/matrix-org/pinecone v0.0.0-20221007145426-3adc85477dd3 + github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.15 github.com/nats-io/nats-server/v2 v2.9.3 diff --git a/go.sum b/go.sum index 5e6253860..c7903b0ce 100644 --- a/go.sum +++ b/go.sum @@ -389,8 +389,8 @@ github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa h1:S98DShDv3sn7O4n4HjtJOejypseYVpv1R/XPg+cDnfI= github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4= -github.com/matrix-org/pinecone v0.0.0-20221007145426-3adc85477dd3 h1:lzkSQvBv8TuqKJCPoVwOVvEnARTlua5rrNy/Qw2Vxeo= -github.com/matrix-org/pinecone v0.0.0-20221007145426-3adc85477dd3/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= +github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6 h1:nAT5w41Q9uWTSnpKW55/hBwP91j2IFYPDRs0jJ8TyFI= +github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= From 97491a174b7826c5c9058398e54e4a18c7a5a052 Mon Sep 17 00:00:00 2001 From: devonh Date: Wed, 26 Oct 2022 16:35:01 +0000 Subject: [PATCH 3/4] Associate events in db before queueing them to send (#2833) Fixes a race condition between sending federation events and having them fully associated in the database. --- federationapi/queue/destinationqueue.go | 64 ++++++++++++++----------- federationapi/queue/queue.go | 30 +++++++++--- 2 files changed, 60 insertions(+), 34 deletions(-) diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index 1b7670e9a..a638a5742 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -76,21 +76,25 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re return } - // If there's room in memory to hold the event then add it to the - // list. - oq.pendingMutex.Lock() - if len(oq.pendingPDUs) < maxPDUsInMemory { - oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{ - pdu: event, - receipt: receipt, - }) - } else { - oq.overflowed.Store(true) - } - oq.pendingMutex.Unlock() + // Check if the destination is blacklisted. If it isn't then wake + // up the queue. + if !oq.statistics.Blacklisted() { + // If there's room in memory to hold the event then add it to the + // list. + oq.pendingMutex.Lock() + if len(oq.pendingPDUs) < maxPDUsInMemory { + oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{ + pdu: event, + receipt: receipt, + }) + } else { + oq.overflowed.Store(true) + } + oq.pendingMutex.Unlock() - if !oq.backingOff.Load() { - oq.wakeQueueAndNotify() + if !oq.backingOff.Load() { + oq.wakeQueueAndNotify() + } } } @@ -103,21 +107,25 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share return } - // If there's room in memory to hold the event then add it to the - // list. - oq.pendingMutex.Lock() - if len(oq.pendingEDUs) < maxEDUsInMemory { - oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{ - edu: event, - receipt: receipt, - }) - } else { - oq.overflowed.Store(true) - } - oq.pendingMutex.Unlock() + // Check if the destination is blacklisted. If it isn't then wake + // up the queue. + if !oq.statistics.Blacklisted() { + // If there's room in memory to hold the event then add it to the + // list. + oq.pendingMutex.Lock() + if len(oq.pendingEDUs) < maxEDUsInMemory { + oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{ + edu: event, + receipt: receipt, + }) + } else { + oq.overflowed.Store(true) + } + oq.pendingMutex.Unlock() - if !oq.backingOff.Load() { - oq.wakeQueueAndNotify() + if !oq.backingOff.Load() { + oq.wakeQueueAndNotify() + } } } diff --git a/federationapi/queue/queue.go b/federationapi/queue/queue.go index 328334379..b5d0552c6 100644 --- a/federationapi/queue/queue.go +++ b/federationapi/queue/queue.go @@ -247,9 +247,10 @@ func (oqs *OutgoingQueues) SendEvent( return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err) } + destQueues := make([]*destinationQueue, 0, len(destmap)) for destination := range destmap { - if queue := oqs.getQueue(destination); queue != nil && !queue.statistics.Blacklisted() { - queue.sendEvent(ev, nid) + if queue := oqs.getQueue(destination); queue != nil { + destQueues = append(destQueues, queue) } else { delete(destmap, destination) } @@ -267,6 +268,14 @@ func (oqs *OutgoingQueues) SendEvent( return err } + // NOTE : PDUs should be associated with destinations before sending + // them, otherwise this is technically a race. + // If the send completes before they are associated then they won't + // get properly cleaned up in the database. + for _, queue := range destQueues { + queue.sendEvent(ev, nid) + } + return nil } @@ -335,20 +344,21 @@ func (oqs *OutgoingQueues) SendEDU( return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err) } + destQueues := make([]*destinationQueue, 0, len(destmap)) for destination := range destmap { - if queue := oqs.getQueue(destination); queue != nil && !queue.statistics.Blacklisted() { - queue.sendEDU(e, nid) + if queue := oqs.getQueue(destination); queue != nil { + destQueues = append(destQueues, queue) } else { delete(destmap, destination) } } // Create a database entry that associates the given PDU NID with - // this destination queue. We'll then be able to retrieve the PDU + // these destination queues. We'll then be able to retrieve the PDU // later. if err := oqs.db.AssociateEDUWithDestinations( oqs.process.Context(), - destmap, // the destination server name + destmap, // the destination server names nid, // NIDs from federationapi_queue_json table e.Type, nil, // this will use the default expireEDUTypes map @@ -357,6 +367,14 @@ func (oqs *OutgoingQueues) SendEDU( return err } + // NOTE : EDUs should be associated with destinations before sending + // them, otherwise this is technically a race. + // If the send completes before they are associated then they won't + // get properly cleaned up in the database. + for _, queue := range destQueues { + queue.sendEDU(e, nid) + } + return nil } From 238b6ef2cd5077482b46c57e9f44f05a19fcbfc7 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 26 Oct 2022 18:37:01 +0100 Subject: [PATCH 4/4] Update Yggdrasil demo --- build/docker/Dockerfile.demo-yggdrasil | 25 ++++++ cmd/dendrite-demo-yggdrasil/yggconn/node.go | 86 +++++++++++---------- go.mod | 4 +- go.sum | 8 +- 4 files changed, 75 insertions(+), 48 deletions(-) create mode 100644 build/docker/Dockerfile.demo-yggdrasil diff --git a/build/docker/Dockerfile.demo-yggdrasil b/build/docker/Dockerfile.demo-yggdrasil new file mode 100644 index 000000000..76bf35823 --- /dev/null +++ b/build/docker/Dockerfile.demo-yggdrasil @@ -0,0 +1,25 @@ +FROM docker.io/golang:1.19-alpine AS base + +RUN apk --update --no-cache add bash build-base + +WORKDIR /build + +COPY . /build + +RUN mkdir -p bin +RUN go build -trimpath -o bin/ ./cmd/dendrite-demo-yggdrasil +RUN go build -trimpath -o bin/ ./cmd/create-account +RUN go build -trimpath -o bin/ ./cmd/generate-keys + +FROM alpine:latest +LABEL org.opencontainers.image.title="Dendrite (Yggdrasil demo)" +LABEL org.opencontainers.image.description="Next-generation Matrix homeserver written in Go" +LABEL org.opencontainers.image.source="https://github.com/matrix-org/dendrite" +LABEL org.opencontainers.image.licenses="Apache-2.0" + +COPY --from=base /build/bin/* /usr/bin/ + +VOLUME /etc/dendrite +WORKDIR /etc/dendrite + +ENTRYPOINT ["/usr/bin/dendrite-demo-yggdrasil"] diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go index 83b4cdf9e..6df5fa879 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go @@ -20,6 +20,7 @@ import ( "encoding/hex" "fmt" "net" + "regexp" "strings" "github.com/matrix-org/gomatrixserverlib" @@ -27,9 +28,9 @@ import ( "github.com/sirupsen/logrus" ironwoodtypes "github.com/Arceliar/ironwood/types" - yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" + "github.com/yggdrasil-network/yggdrasil-go/src/core" yggdrasilcore "github.com/yggdrasil-network/yggdrasil-go/src/core" - yggdrasildefaults "github.com/yggdrasil-network/yggdrasil-go/src/defaults" + "github.com/yggdrasil-network/yggdrasil-go/src/multicast" yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast" gologme "github.com/gologme/log" @@ -37,7 +38,6 @@ import ( type Node struct { core *yggdrasilcore.Core - config *yggdrasilconfig.NodeConfig multicast *yggdrasilmulticast.Multicast log *gologme.Logger utpSocket *utp.Socket @@ -57,43 +57,52 @@ func (n *Node) DialerContext(ctx context.Context, _, address string) (net.Conn, func Setup(sk ed25519.PrivateKey, instanceName, storageDirectory, peerURI, listenURI string) (*Node, error) { n := &Node{ - core: &yggdrasilcore.Core{}, - config: yggdrasildefaults.GenerateConfig(), - multicast: &yggdrasilmulticast.Multicast{}, - log: gologme.New(logrus.StandardLogger().Writer(), "", 0), - incoming: make(chan net.Conn), + log: gologme.New(logrus.StandardLogger().Writer(), "", 0), + incoming: make(chan net.Conn), } - options := []yggdrasilcore.SetupOption{ - yggdrasilcore.AdminListenAddress("none"), - } - if listenURI != "" { - options = append(options, yggdrasilcore.ListenAddress(listenURI)) - } - if peerURI != "" { - for _, uri := range strings.Split(peerURI, ",") { - options = append(options, yggdrasilcore.Peer{ - URI: uri, - }) - } - } - - var err error - if n.core, err = yggdrasilcore.New(sk, options...); err != nil { - panic(err) - } n.log.EnableLevel("error") n.log.EnableLevel("warn") n.log.EnableLevel("info") - n.core.SetLogger(n.log) - if n.utpSocket, err = utp.NewSocketFromPacketConnNoClose(n.core); err != nil { - panic(err) + + { + var err error + options := []yggdrasilcore.SetupOption{} + if listenURI != "" { + options = append(options, yggdrasilcore.ListenAddress(listenURI)) + } + if peerURI != "" { + for _, uri := range strings.Split(peerURI, ",") { + options = append(options, yggdrasilcore.Peer{ + URI: uri, + }) + } + } + if n.core, err = core.New(sk[:], n.log, options...); err != nil { + panic(err) + } + n.core.SetLogger(n.log) + + if n.utpSocket, err = utp.NewSocketFromPacketConnNoClose(n.core); err != nil { + panic(err) + } } - if err = n.multicast.Init(n.core, n.config, n.log, nil); err != nil { - panic(err) - } - if err = n.multicast.Start(); err != nil { - panic(err) + + // Setup the multicast module. + { + var err error + options := []multicast.SetupOption{ + multicast.MulticastInterface{ + Regex: regexp.MustCompile(".*"), + Beacon: true, + Listen: true, + Port: 0, + Priority: 0, + }, + } + if n.multicast, err = multicast.New(n.core, n.log, options...); err != nil { + panic(err) + } } n.log.Printf("Public key: %x", n.core.PublicKey()) @@ -114,14 +123,7 @@ func (n *Node) DerivedServerName() string { } func (n *Node) PrivateKey() ed25519.PrivateKey { - sk := make(ed25519.PrivateKey, ed25519.PrivateKeySize) - sb, err := hex.DecodeString(n.config.PrivateKey) - if err == nil { - copy(sk, sb[:]) - } else { - panic(err) - } - return sk + return n.core.PrivateKey() } func (n *Node) PublicKey() ed25519.PublicKey { diff --git a/go.mod b/go.mod index be5099fcf..7c290d784 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/matrix-org/dendrite require ( - github.com/Arceliar/ironwood v0.0.0-20220903132624-ee60c16bcfcf + github.com/Arceliar/ironwood v0.0.0-20221025225125-45b4281814c2 github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/MFAshby/stdemuxerhook v1.0.0 @@ -41,7 +41,7 @@ require ( github.com/tidwall/sjson v1.2.5 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible - github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c + github.com/yggdrasil-network/yggdrasil-go v0.4.6 go.uber.org/atomic v1.10.0 golang.org/x/crypto v0.0.0-20221012134737-56aed061732a golang.org/x/image v0.0.0-20220902085622-e7cb96979f69 diff --git a/go.sum b/go.sum index c7903b0ce..a0db4823e 100644 --- a/go.sum +++ b/go.sum @@ -38,8 +38,8 @@ dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBr dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1:a1inKt/atXimZ4Mv927x+r7UpyzRUf4emIoiiSC2TN4= dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU= git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= -github.com/Arceliar/ironwood v0.0.0-20220903132624-ee60c16bcfcf h1:kjPkmDHUTWUma/4tqDl208bOk3jsUEqOJA6TsMZo5Jk= -github.com/Arceliar/ironwood v0.0.0-20220903132624-ee60c16bcfcf/go.mod h1:RP72rucOFm5udrnEzTmIWLRVGQiV/fSUAQXJ0RST/nk= +github.com/Arceliar/ironwood v0.0.0-20221025225125-45b4281814c2 h1:Usab30pNT2i/vZvpXcN9uOr5IO1RZPcUqoGH0DIAPnU= +github.com/Arceliar/ironwood v0.0.0-20221025225125-45b4281814c2/go.mod h1:RP72rucOFm5udrnEzTmIWLRVGQiV/fSUAQXJ0RST/nk= github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979 h1:WndgpSW13S32VLQ3ugUxx2EnnWmgba1kCqPkd4Gk1yQ= github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= @@ -592,8 +592,8 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17 github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= -github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c h1:/cTmA6pV2Z20BT/FGSmnb5BmJ8eRbDP0HbCB5IO1aKw= -github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c/go.mod h1:cIwhYwX9yT9Bcei59O0oOBSaj+kQP+9aVQUMWHh5R00= +github.com/yggdrasil-network/yggdrasil-go v0.4.6 h1:GALUDV9QPz/5FVkbazpkTc9EABHufA556JwUJZr41j4= +github.com/yggdrasil-network/yggdrasil-go v0.4.6/go.mod h1:PBMoAOvQjA9geNEeGyMXA9QgCS6Bu+9V+1VkWM84wpw= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=