From 7434715f70d097bf3816af5dd2228c8107841477 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 May 2020 09:30:12 +0100 Subject: [PATCH] Tidy up a bit --- federationsender/queue/destinationqueue.go | 35 ++++++++++++++-------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 9b79c487b..405a7dea2 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -33,7 +33,7 @@ const ( // How many times should we tolerate consecutive failures before we // just blacklist the host altogether? Bear in mind that the backoff // is exponential, so the max time here to attempt is 2**failures. - FailuresUntilBlacklist = 16 + FailuresUntilBlacklist = 16 // 16 equates to roughly 18 hours. ) // destinationQueue is a queue of events for a single destination. @@ -41,13 +41,13 @@ const ( // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { - rsProducer *producers.RoomserverProducer // - client *gomatrixserverlib.FederationClient // - origin gomatrixserverlib.ServerName // - destination gomatrixserverlib.ServerName // - running atomic.Bool // is the queue worke running? + rsProducer *producers.RoomserverProducer // roomserver producer + client *gomatrixserverlib.FederationClient // federation client + origin gomatrixserverlib.ServerName // origin of requests + destination gomatrixserverlib.ServerName // destination of requests + running atomic.Bool // is the queue worker running? blacklisted atomic.Bool // is the remote side dead? - backoffUntil atomic.Value // time.Time + backoffUntil atomic.Value // time.Time to wait until before sending requests idleCounter atomic.Uint32 // how many ticks have we done nothing? failCounter atomic.Uint32 // how many times have we failed? sentCounter atomic.Uint32 // how many times have we succeeded? @@ -124,11 +124,9 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { // If the destination is blacklisted then drop the event. return } - fmt.Println("Queuing event", ev.EventID()) oq.runningMutex.Lock() oq.pendingPDUs = append(oq.pendingPDUs, ev) oq.runningMutex.Unlock() - fmt.Println("Queued event", ev.EventID()) oq.wake() } @@ -206,8 +204,16 @@ func (oq *destinationQueue) backgroundSend() { // the pending events and EDUs. if transaction { oq.runningMutex.Lock() - oq.pendingPDUs = oq.pendingPDUs[len(pendingPDUs):] - oq.pendingEDUs = oq.pendingEDUs[len(pendingEDUs):] + // Reallocate so that the underlying arrays can be GC'd, as + // opposed to growing forever. + oq.pendingPDUs = append( + []*gomatrixserverlib.HeaderedEvent{}, + oq.pendingPDUs[len(pendingPDUs):]..., + ) + oq.pendingEDUs = append( + []*gomatrixserverlib.EDU{}, + oq.pendingEDUs[len(pendingEDUs):]..., + ) oq.runningMutex.Unlock() } } @@ -226,7 +232,12 @@ func (oq *destinationQueue) backgroundSend() { // the pending invites. if invites { oq.runningMutex.Lock() - oq.pendingInvites = oq.pendingInvites[len(pendingInvites):] + // Reallocate so that the underlying array can be GC'd, as + // opposed to growing forever. + oq.pendingInvites = append( + []*gomatrixserverlib.InviteV2Request{}, + oq.pendingInvites[len(pendingInvites):]..., + ) oq.runningMutex.Unlock() } }