Add p2p wakeup broadcast handling to pinecone demos

This commit is contained in:
Devon Hudson 2022-10-28 11:13:57 -06:00
parent 238b6ef2cd
commit ad66299aa3
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
8 changed files with 161 additions and 9 deletions

View file

@ -40,6 +40,7 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
@ -58,6 +59,7 @@ import (
pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeMulticast "github.com/matrix-org/pinecone/multicast"
pineconeRouter "github.com/matrix-org/pinecone/router" pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeEvents "github.com/matrix-org/pinecone/router/events"
pineconeSessions "github.com/matrix-org/pinecone/sessions" pineconeSessions "github.com/matrix-org/pinecone/sessions"
"github.com/matrix-org/pinecone/types" "github.com/matrix-org/pinecone/types"
@ -295,7 +297,12 @@ func (m *DendriteMonolith) Start() {
m.logger.SetOutput(BindLogger{}) m.logger.SetOutput(BindLogger{})
logrus.SetOutput(BindLogger{}) logrus.SetOutput(BindLogger{})
pineconeEventChannel := make(chan pineconeEvents.Event)
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
m.PineconeRouter.EnableHopLimiting()
m.PineconeRouter.EnableWakeupBroadcasts()
m.PineconeRouter.Subscribe(pineconeEventChannel)
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"}) m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter) m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil) m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil)
@ -423,6 +430,37 @@ func (m *DendriteMonolith) Start() {
m.logger.Fatal(err) m.logger.Fatal(err)
} }
}() }()
go func(ch <-chan pineconeEvents.Event) {
eLog := logrus.WithField("pinecone", "events")
for {
select {
case event := <-ch:
switch e := event.(type) {
case pineconeEvents.PeerAdded:
case pineconeEvents.PeerRemoved:
case pineconeEvents.TreeParentUpdate:
case pineconeEvents.SnakeDescUpdate:
case pineconeEvents.TreeRootAnnUpdate:
case pineconeEvents.SnakeEntryAdded:
case pineconeEvents.SnakeEntryRemoved:
case pineconeEvents.BroadcastReceived:
eLog.Info("Broadcast received from: ", e.PeerID)
req := &api.PerformWakeupServersRequest{
ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)},
}
res := &api.PerformWakeupServersResponse{}
if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil {
logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID)
}
case pineconeEvents.BandwidthReport:
default:
}
}
}
}(pineconeEventChannel)
} }
func (m *DendriteMonolith) Stop() { func (m *DendriteMonolith) Stop() {

View file

@ -37,6 +37,7 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/keyserver"
@ -51,6 +52,7 @@ import (
pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeMulticast "github.com/matrix-org/pinecone/multicast"
pineconeRouter "github.com/matrix-org/pinecone/router" pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeEvents "github.com/matrix-org/pinecone/router/events"
pineconeSessions "github.com/matrix-org/pinecone/sessions" pineconeSessions "github.com/matrix-org/pinecone/sessions"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -157,7 +159,12 @@ func main() {
base := base.NewBaseDendrite(cfg, "Monolith") base := base.NewBaseDendrite(cfg, "Monolith")
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
pineconeEventChannel := make(chan pineconeEvents.Event)
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
pRouter.EnableHopLimiting()
pRouter.EnableWakeupBroadcasts()
pRouter.Subscribe(pineconeEventChannel)
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
pManager := pineconeConnections.NewConnectionManager(pRouter, nil) pManager := pineconeConnections.NewConnectionManager(pRouter, nil)
@ -295,5 +302,36 @@ func main() {
logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter)) logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter))
}() }()
go func(ch <-chan pineconeEvents.Event) {
eLog := logrus.WithField("pinecone", "events")
for {
select {
case event := <-ch:
switch e := event.(type) {
case pineconeEvents.PeerAdded:
case pineconeEvents.PeerRemoved:
case pineconeEvents.TreeParentUpdate:
case pineconeEvents.SnakeDescUpdate:
case pineconeEvents.TreeRootAnnUpdate:
case pineconeEvents.SnakeEntryAdded:
case pineconeEvents.SnakeEntryRemoved:
case pineconeEvents.BroadcastReceived:
eLog.Info("Broadcast received from: ", e.PeerID)
req := &api.PerformWakeupServersRequest{
ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)},
}
res := &api.PerformWakeupServersResponse{}
if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil {
logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID)
}
case pineconeEvents.BandwidthReport:
default:
}
}
}
}(pineconeEventChannel)
base.WaitForShutdown() base.WaitForShutdown()
} }

View file

@ -30,6 +30,12 @@ type FederationInternalAPI interface {
request *PerformBroadcastEDURequest, request *PerformBroadcastEDURequest,
response *PerformBroadcastEDUResponse, response *PerformBroadcastEDUResponse,
) error ) error
PerformWakeupServers(
ctx context.Context,
request *PerformWakeupServersRequest,
response *PerformWakeupServersResponse,
) error
} }
type ClientFederationAPI interface { type ClientFederationAPI interface {
@ -213,6 +219,13 @@ type PerformBroadcastEDURequest struct {
type PerformBroadcastEDUResponse struct { type PerformBroadcastEDUResponse struct {
} }
type PerformWakeupServersRequest struct {
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}
type PerformWakeupServersResponse struct {
}
type InputPublicKeysRequest struct { type InputPublicKeysRequest struct {
Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"` Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"`
} }

View file

@ -648,9 +648,23 @@ func (r *FederationInternalAPI) PerformBroadcastEDU(
return nil return nil
} }
// PerformWakeupServers implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformWakeupServers(
ctx context.Context,
request *api.PerformWakeupServersRequest,
response *api.PerformWakeupServersResponse,
) (err error) {
r.MarkServersAlive(request.ServerNames)
return nil
}
func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverlib.ServerName) { func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverlib.ServerName) {
for _, srv := range destinations { for _, srv := range destinations {
_ = r.db.RemoveServerFromBlacklist(srv) // Check the statistics cache for the blacklist status to prevent hitting
// the database unnecessarily.
if r.queues.IsServerBlacklisted(srv) {
_ = r.db.RemoveServerFromBlacklist(srv)
}
r.queues.RetryServer(srv) r.queues.RetryServer(srv)
} }
} }

View file

@ -23,6 +23,7 @@ const (
FederationAPIPerformInviteRequestPath = "/federationapi/performInviteRequest" FederationAPIPerformInviteRequestPath = "/federationapi/performInviteRequest"
FederationAPIPerformOutboundPeekRequestPath = "/federationapi/performOutboundPeekRequest" FederationAPIPerformOutboundPeekRequestPath = "/federationapi/performOutboundPeekRequest"
FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU" FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU"
FederationAPIPerformWakeupServers = "/federationapi/performWakeupServers"
FederationAPIGetUserDevicesPath = "/federationapi/client/getUserDevices" FederationAPIGetUserDevicesPath = "/federationapi/client/getUserDevices"
FederationAPIClaimKeysPath = "/federationapi/client/claimKeys" FederationAPIClaimKeysPath = "/federationapi/client/claimKeys"
@ -150,6 +151,18 @@ func (h *httpFederationInternalAPI) PerformBroadcastEDU(
) )
} }
// Handle an instruction to remove the respective servers from being blacklisted.
func (h *httpFederationInternalAPI) PerformWakeupServers(
ctx context.Context,
request *api.PerformWakeupServersRequest,
response *api.PerformWakeupServersResponse,
) error {
return httputil.CallInternalRPCAPI(
"PerformWakeupServers", h.federationAPIURL+FederationAPIPerformWakeupServers,
h.httpClient, ctx, request, response,
)
}
type getUserDevices struct { type getUserDevices struct {
S gomatrixserverlib.ServerName S gomatrixserverlib.ServerName
UserID string UserID string

View file

@ -43,6 +43,11 @@ func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) {
httputil.MakeInternalRPCAPI("FederationAPIPerformBroadcastEDU", intAPI.PerformBroadcastEDU), httputil.MakeInternalRPCAPI("FederationAPIPerformBroadcastEDU", intAPI.PerformBroadcastEDU),
) )
internalAPIMux.Handle(
FederationAPIPerformWakeupServers,
httputil.MakeInternalRPCAPI("FederationAPIPerformWakeupServers", intAPI.PerformWakeupServers),
)
internalAPIMux.Handle( internalAPIMux.Handle(
FederationAPIPerformJoinRequestPath, FederationAPIPerformJoinRequestPath,
httputil.MakeInternalRPCAPI( httputil.MakeInternalRPCAPI(

View file

@ -141,23 +141,44 @@ func (oq *destinationQueue) handleBackoffNotifier() {
} }
} }
// wakeQueueIfEventsPending calls wakeQueueAndNotify only if there are
// pending events or if forceWakeup is true. This prevents starting the
// queue unnecessarily.
func (oq *destinationQueue) wakeQueueIfEventsPending(forceWakeup bool) {
eventsPending := func() bool {
oq.pendingMutex.Lock()
defer oq.pendingMutex.Unlock()
return len(oq.pendingPDUs) > 0 || len(oq.pendingEDUs) > 0
}
// NOTE : Only wakeup and notify the queue if there are pending events
// or if forceWakeup is true. Otherwise there is no reason to start the
// queue goroutine and waste resources.
if forceWakeup || eventsPending() {
logrus.Info("Starting queue due to pending events or forceWakeup")
oq.wakeQueueAndNotify()
}
}
// wakeQueueAndNotify ensures the destination queue is running and notifies it // wakeQueueAndNotify ensures the destination queue is running and notifies it
// that there is pending work. // that there is pending work.
func (oq *destinationQueue) wakeQueueAndNotify() { func (oq *destinationQueue) wakeQueueAndNotify() {
// Wake up the queue if it's asleep. // NOTE : Send notification before waking queue to prevent a race
oq.wakeQueueIfNeeded() // where the queue was running and stops due to a timeout in between
// checking it and sending the notification.
// Notify the queue that there are events ready to send. // Notify the queue that there are events ready to send.
select { select {
case oq.notify <- struct{}{}: case oq.notify <- struct{}{}:
default: default:
} }
// Wake up the queue if it's asleep.
oq.wakeQueueIfNeeded()
} }
// wakeQueueIfNeeded will wake up the destination queue if it is // wakeQueueIfNeeded will wake up the destination queue if it is
// not already running. If it is running but it is backing off // not already running.
// then we will interrupt the backoff, causing any federation
// requests to retry.
func (oq *destinationQueue) wakeQueueIfNeeded() { func (oq *destinationQueue) wakeQueueIfNeeded() {
// Clear the backingOff flag and update the backoff metrics if it was set. // Clear the backingOff flag and update the backoff metrics if it was set.
if oq.backingOff.CompareAndSwap(true, false) { if oq.backingOff.CompareAndSwap(true, false) {

View file

@ -378,14 +378,24 @@ func (oqs *OutgoingQueues) SendEDU(
return nil return nil
} }
// IsServerBlacklisted returns whether or not the provided server is currently
// blacklisted.
func (oqs *OutgoingQueues) IsServerBlacklisted(srv gomatrixserverlib.ServerName) bool {
return oqs.statistics.ForServer(srv).Blacklisted()
}
// RetryServer attempts to resend events to the given server if we had given up. // RetryServer attempts to resend events to the given server if we had given up.
func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) { func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
if oqs.disabled { if oqs.disabled {
return return
} }
oqs.statistics.ForServer(srv).RemoveBlacklist()
serverStatistics := oqs.statistics.ForServer(srv)
forceWakeup := serverStatistics.Blacklisted()
serverStatistics.RemoveBlacklist()
serverStatistics.ClearBackoff()
if queue := oqs.getQueue(srv); queue != nil { if queue := oqs.getQueue(srv); queue != nil {
queue.statistics.ClearBackoff() queue.wakeQueueIfEventsPending(forceWakeup)
queue.wakeQueueIfNeeded()
} }
} }