diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 9a02e71e9..72008aa9a 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -39,6 +39,7 @@ import ( "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/syncapi" go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p" + "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -204,7 +205,6 @@ func main() { }, KeyDatabase: fetcher, } - p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node) rsAPI := roomserver.SetupRoomServerComponent(base, keyRing, federation) eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New(), deviceDB) @@ -213,6 +213,7 @@ func main() { ) fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, rsAPI, &keyRing) rsAPI.SetFederationSenderAPI(fedSenderAPI) + p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI) clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, diff --git a/cmd/dendritejs/publicrooms.go b/cmd/dendritejs/publicrooms.go index 5246cf535..5032bc15f 100644 --- a/cmd/dendritejs/publicrooms.go +++ b/cmd/dendritejs/publicrooms.go @@ -17,23 +17,48 @@ package main import ( + "context" + + "github.com/matrix-org/dendrite/federationsender/api" go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p" + "github.com/matrix-org/gomatrixserverlib" ) type libp2pPublicRoomsProvider struct { node *go_http_js_libp2p.P2pLocalNode providers []go_http_js_libp2p.PeerInfo + fedSender api.FederationSenderInternalAPI } -func NewLibP2PPublicRoomsProvider(node *go_http_js_libp2p.P2pLocalNode) *libp2pPublicRoomsProvider { +func NewLibP2PPublicRoomsProvider(node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI) *libp2pPublicRoomsProvider { p := &libp2pPublicRoomsProvider{ - node: node, + node: node, + fedSender: fedSender, } node.RegisterFoundProviders(p.foundProviders) return p } func (p *libp2pPublicRoomsProvider) foundProviders(peerInfos []go_http_js_libp2p.PeerInfo) { + // work out the diff then poke for new ones + seen := make(map[string]bool, len(p.providers)) + for _, pr := range p.providers { + seen[pr.Id] = true + } + var newPeers []gomatrixserverlib.ServerName + for _, pi := range peerInfos { + if !seen[pi.Id] { + newPeers = append(newPeers, gomatrixserverlib.ServerName(pi.Id)) + } + } + if len(newPeers) > 0 { + var res api.PerformServersAliveResponse + // ignore errors, we don't care. + p.fedSender.PerformServersAlive(context.Background(), &api.PerformServersAliveRequest{ + Servers: newPeers, + }, &res) + } + p.providers = peerInfos } diff --git a/federationsender/api/api.go b/federationsender/api/api.go index 678f02e68..4eb20cb67 100644 --- a/federationsender/api/api.go +++ b/federationsender/api/api.go @@ -42,6 +42,12 @@ type FederationSenderInternalAPI interface { request *PerformLeaveRequest, response *PerformLeaveResponse, ) error + // Notifies the federation sender that these servers may be online and to retry sending messages. + PerformServersAlive( + ctx context.Context, + request *PerformServersAliveRequest, + response *PerformServersAliveResponse, + ) error } // NewFederationSenderInternalAPIHTTP creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API. diff --git a/federationsender/api/perform.go b/federationsender/api/perform.go index 2a1834b61..5e4d7fe97 100644 --- a/federationsender/api/perform.go +++ b/federationsender/api/perform.go @@ -18,6 +18,9 @@ const ( // FederationSenderPerformLeaveRequestPath is the HTTP path for the PerformLeaveRequest API. FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest" + + // FederationSenderPerformServersAlivePath is the HTTP path for the PerformServersAlive API. + FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" ) type PerformDirectoryLookupRequest struct { @@ -88,3 +91,22 @@ func (h *httpFederationSenderInternalAPI) PerformLeave( apiURL := h.federationSenderURL + FederationSenderPerformLeaveRequestPath return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } + +type PerformServersAliveRequest struct { + Servers []gomatrixserverlib.ServerName +} + +type PerformServersAliveResponse struct { +} + +func (h *httpFederationSenderInternalAPI) PerformServersAlive( + ctx context.Context, + request *PerformServersAliveRequest, + response *PerformServersAliveResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformServersAlive") + defer span.Finish() + + apiURL := h.federationSenderURL + FederationSenderPerformServersAlivePath + return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index cca847a51..9e5cc8dd1 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -67,7 +67,7 @@ func SetupFederationSenderComponent( queryAPI := internal.NewFederationSenderInternalAPI( federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing, - statistics, + statistics, queues, ) queryAPI.SetupHTTP(base.InternalAPIMux) diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index dd3942583..edf8fb4e9 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -7,6 +7,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/producers" + "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal" @@ -24,6 +25,7 @@ type FederationSenderInternalAPI struct { producer *producers.RoomserverProducer federation *gomatrixserverlib.FederationClient keyRing *gomatrixserverlib.KeyRing + queues *queue.OutgoingQueues } func NewFederationSenderInternalAPI( @@ -32,6 +34,7 @@ func NewFederationSenderInternalAPI( federation *gomatrixserverlib.FederationClient, keyRing *gomatrixserverlib.KeyRing, statistics *types.Statistics, + queues *queue.OutgoingQueues, ) *FederationSenderInternalAPI { return &FederationSenderInternalAPI{ db: db, @@ -40,6 +43,7 @@ func NewFederationSenderInternalAPI( federation: federation, keyRing: keyRing, statistics: statistics, + queues: queues, } } @@ -112,4 +116,17 @@ func (f *FederationSenderInternalAPI) SetupHTTP(internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(api.FederationSenderPerformServersAlivePath, + internal.MakeInternalAPI("PerformServersAliveRequest", func(req *http.Request) util.JSONResponse { + var request api.PerformServersAliveRequest + var response api.PerformServersAliveResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := f.PerformServersAlive(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index 383ce4888..c601e9604 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -275,3 +275,16 @@ func (r *FederationSenderInternalAPI) PerformLeave( request.RoomID, len(request.ServerNames), ) } + +// PerformServersAlive implements api.FederationSenderInternalAPI +func (r *FederationSenderInternalAPI) PerformServersAlive( + ctx context.Context, + request *api.PerformServersAliveRequest, + response *api.PerformServersAliveResponse, +) (err error) { + for _, srv := range request.Servers { + r.queues.RetryServer(srv) + } + + return nil +} diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 09dac464f..4ab610de7 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -39,6 +39,7 @@ type destinationQueue struct { origin gomatrixserverlib.ServerName // origin of requests destination gomatrixserverlib.ServerName // destination of requests running atomic.Bool // is the queue worker running? + backingOff atomic.Bool // true if we're backing off statistics *types.ServerStatistics // statistics about this remote server incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send @@ -47,6 +48,28 @@ type destinationQueue struct { pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend + retryServerCh chan bool // interrupts backoff +} + +// retry will clear the blacklist state and attempt to send built up events to the server, +// resetting and interrupting any backoff timers. +func (oq *destinationQueue) retry() { + // TODO: We don't send all events in the case where the server has been blacklisted as we + // drop events instead then. This means we will send the oldest N events (chan size, currently 128) + // and then skip ahead a lot which feels non-ideal but equally we can't persist thousands of events + // in-memory to maybe-send it one day. Ideally we would just shove these pending events in a database + // so we can send a lot of events. + oq.statistics.Success() + // if we were backing off, swap to not backing off and interrupt the select. + // We need to use an atomic bool here to prevent multiple calls to retry() blocking on the channel + // as it is unbuffered. + if oq.backingOff.CAS(true, false) { + oq.retryServerCh <- true + } + if !oq.running.Load() { + log.Infof("Restarting queue for %s", oq.destination) + go oq.backgroundSend() + } } // Send event adds the event to the pending queue for the destination. @@ -155,9 +178,15 @@ func (oq *destinationQueue) backgroundSend() { } // If we are backing off this server then wait for the - // backoff duration to complete first. + // backoff duration to complete first, or until explicitly + // told to retry. if backoff, duration := oq.statistics.BackoffDuration(); backoff { - <-time.After(duration) + oq.backingOff.Store(true) + select { + case <-time.After(duration): + case <-oq.retryServerCh: + } + oq.backingOff.Store(false) } // How many things do we have waiting? diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index aae6c53a0..386a3397f 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -52,6 +52,12 @@ func NewOutgoingQueues( } } +func (oqs *OutgoingQueues) getQueueIfExists(destination gomatrixserverlib.ServerName) *destinationQueue { + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + return oqs.queues[destination] +} + func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue { oqs.queuesMutex.Lock() defer oqs.queuesMutex.Unlock() @@ -66,6 +72,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128), incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), + retryServerCh: make(chan bool), } oqs.queues[destination] = oq } @@ -160,6 +167,15 @@ func (oqs *OutgoingQueues) SendEDU( return nil } +// RetryServer attempts to resend events to the given server if we had given up. +func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) { + q := oqs.getQueueIfExists(srv) + if q == nil { + return + } + q.retry() +} + // filterAndDedupeDests removes our own server from the list of destinations // and deduplicates any servers in the list that may appear more than once. func filterAndDedupeDests(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) (