Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc

This commit is contained in:
Neil Alexander 2020-05-06 13:15:32 +01:00
parent 53f690f2d3
commit 9d293caec8
2 changed files with 13 additions and 10 deletions

View file

@ -170,7 +170,7 @@ func (oq *destinationQueue) backgroundSend() {
// Try sending the next invite and see what happens. // Try sending the next invite and see what happens.
if len(pendingInvites) > 0 { if len(pendingInvites) > 0 {
invites, ierr := oq.nextInvites(pendingInvites) sent, ierr := oq.nextInvites(pendingInvites)
if ierr != nil { if ierr != nil {
// We failed to send the transaction so increase the // We failed to send the transaction so increase the
// backoff and give it another go shortly. // backoff and give it another go shortly.
@ -180,7 +180,7 @@ func (oq *destinationQueue) backgroundSend() {
return return
} }
continue continue
} else if invites { } else if sent > 0 {
// If we successfully sent the invites then clear out // If we successfully sent the invites then clear out
// the pending invites. // the pending invites.
oq.statistics.Success() oq.statistics.Success()
@ -189,7 +189,7 @@ func (oq *destinationQueue) backgroundSend() {
// opposed to growing forever. // opposed to growing forever.
oq.pendingInvites = append( oq.pendingInvites = append(
[]*gomatrixserverlib.InviteV2Request{}, []*gomatrixserverlib.InviteV2Request{},
oq.pendingInvites[len(pendingInvites):]..., oq.pendingInvites[sent:]...,
) )
oq.runningMutex.Unlock() oq.runningMutex.Unlock()
} }
@ -256,7 +256,8 @@ func (oq *destinationQueue) nextTransaction(
// them. Returns true if a transaction was sent or false otherwise. // them. Returns true if a transaction was sent or false otherwise.
func (oq *destinationQueue) nextInvites( func (oq *destinationQueue) nextInvites(
pendingInvites []*gomatrixserverlib.InviteV2Request, pendingInvites []*gomatrixserverlib.InviteV2Request,
) (bool, error) { ) (int, error) {
done := 0
for _, inviteReq := range pendingInvites { for _, inviteReq := range pendingInvites {
ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion() ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion()
@ -277,7 +278,7 @@ func (oq *destinationQueue) nextInvites(
"state_key": ev.StateKey(), "state_key": ev.StateKey(),
"destination": oq.destination, "destination": oq.destination,
}).WithError(err).Error("failed to send invite") }).WithError(err).Error("failed to send invite")
return false, err return done, err
} }
if _, err = oq.rsProducer.SendInviteResponse( if _, err = oq.rsProducer.SendInviteResponse(
@ -290,9 +291,11 @@ func (oq *destinationQueue) nextInvites(
"state_key": ev.StateKey(), "state_key": ev.StateKey(),
"destination": oq.destination, "destination": oq.destination,
}).WithError(err).Error("failed to return signed invite to roomserver") }).WithError(err).Error("failed to return signed invite to roomserver")
return false, err return done, err
} }
done++
} }
return true, nil return done, nil
} }

View file

@ -88,7 +88,7 @@ func (oqs *OutgoingQueues) SendEvent(
oqs.queuesMutex.Unlock() oqs.queuesMutex.Unlock()
} }
go oq.sendEvent(ev) oq.sendEvent(ev)
} }
return nil return nil
@ -137,7 +137,7 @@ func (oqs *OutgoingQueues) SendInvite(
oqs.queuesMutex.Unlock() oqs.queuesMutex.Unlock()
} }
go oq.sendInvite(inviteReq) oq.sendInvite(inviteReq)
return nil return nil
} }
@ -181,7 +181,7 @@ func (oqs *OutgoingQueues) SendEDU(
oqs.queuesMutex.Unlock() oqs.queuesMutex.Unlock()
} }
go oq.sendEDU(e) oq.sendEDU(e)
} }
return nil return nil