diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 63c8cbd0f..a9609361b 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -170,7 +170,7 @@ func (oq *destinationQueue) backgroundSend() { // Try sending the next invite and see what happens. if len(pendingInvites) > 0 { - invites, ierr := oq.nextInvites(pendingInvites) + sent, ierr := oq.nextInvites(pendingInvites) if ierr != nil { // We failed to send the transaction so increase the // backoff and give it another go shortly. @@ -180,7 +180,7 @@ func (oq *destinationQueue) backgroundSend() { return } continue - } else if invites { + } else if sent > 0 { // If we successfully sent the invites then clear out // the pending invites. oq.statistics.Success() @@ -189,7 +189,7 @@ func (oq *destinationQueue) backgroundSend() { // opposed to growing forever. oq.pendingInvites = append( []*gomatrixserverlib.InviteV2Request{}, - oq.pendingInvites[len(pendingInvites):]..., + oq.pendingInvites[sent:]..., ) oq.runningMutex.Unlock() } @@ -256,7 +256,8 @@ func (oq *destinationQueue) nextTransaction( // them. Returns true if a transaction was sent or false otherwise. func (oq *destinationQueue) nextInvites( pendingInvites []*gomatrixserverlib.InviteV2Request, -) (bool, error) { +) (int, error) { + done := 0 for _, inviteReq := range pendingInvites { ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion() @@ -277,7 +278,7 @@ func (oq *destinationQueue) nextInvites( "state_key": ev.StateKey(), "destination": oq.destination, }).WithError(err).Error("failed to send invite") - return false, err + return done, err } if _, err = oq.rsProducer.SendInviteResponse( @@ -290,9 +291,11 @@ func (oq *destinationQueue) nextInvites( "state_key": ev.StateKey(), "destination": oq.destination, }).WithError(err).Error("failed to return signed invite to roomserver") - return false, err + return done, err } + + done++ } - return true, nil + return done, nil } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 6f066efdd..74e6da3a9 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -88,7 +88,7 @@ func (oqs *OutgoingQueues) SendEvent( oqs.queuesMutex.Unlock() } - go oq.sendEvent(ev) + oq.sendEvent(ev) } return nil @@ -137,7 +137,7 @@ func (oqs *OutgoingQueues) SendInvite( oqs.queuesMutex.Unlock() } - go oq.sendInvite(inviteReq) + oq.sendInvite(inviteReq) return nil } @@ -181,7 +181,7 @@ func (oqs *OutgoingQueues) SendEDU( oqs.queuesMutex.Unlock() } - go oq.sendEDU(e) + oq.sendEDU(e) } return nil