Check HTTP response codes, push new invites to front of queue

This commit is contained in:
Neil Alexander 2020-05-07 10:43:07 +01:00
parent 3a2ae87358
commit b73139e68c

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -102,11 +103,26 @@ func (oq *destinationQueue) backgroundSend() {
// idle timeout. // idle timeout.
select { select {
case pdu := <-oq.incomingPDUs: case pdu := <-oq.incomingPDUs:
// Ordering of PDUs is important so we add them to the end
// of the queue and they will all be added to transactions
// in order.
oq.pendingPDUs = append(oq.pendingPDUs, pdu) oq.pendingPDUs = append(oq.pendingPDUs, pdu)
case edu := <-oq.incomingEDUs: case edu := <-oq.incomingEDUs:
// Likewise for EDUs, although we should probably not try
// too hard with some EDUs (like typing notifications) after
// a certain amount of time has passed.
// TODO: think about EDU expiry some more
oq.pendingEDUs = append(oq.pendingEDUs, edu) oq.pendingEDUs = append(oq.pendingEDUs, edu)
case invite := <-oq.incomingInvites: case invite := <-oq.incomingInvites:
oq.pendingInvites = append(oq.pendingInvites, invite) // There's no strict ordering requirement for invites like
// there is for transactions, so we put the invite onto the
// front of the queue. This means that if an invite that is
// stuck failing already, that it won't block our new invite
// from being sent.
oq.pendingInvites = append(
[]*gomatrixserverlib.InviteV2Request{invite},
oq.pendingInvites...,
)
case <-time.After(time.Second * 30): case <-time.After(time.Second * 30):
// The worker is idle so stop the goroutine. It'll // The worker is idle so stop the goroutine. It'll
// get restarted automatically the next time we // get restarted automatically the next time we
@ -221,16 +237,33 @@ func (oq *destinationQueue) nextTransaction(
logrus.WithField("server_name", oq.destination).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) logrus.WithField("server_name", oq.destination).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
// TODO: we should check for 500-ish fails vs 400-ish here,
// since we shouldn't queue things indefinitely in response
// to a 400-ish error
_, err := oq.client.SendTransaction(context.TODO(), t) _, err := oq.client.SendTransaction(context.TODO(), t)
if err != nil { switch e := err.(type) {
case nil:
// No error was returned so the transaction looks to have
// been successfully sent.
return true, nil
case gomatrix.HTTPError:
// We received a HTTP error back. In this instance we only
// should report an error if
if e.Code >= 400 && e.Code <= 499 {
// We tried but the remote side has sent back a client error.
// It's no use retrying because it will happen again.
return true, nil
}
// Otherwise, report that we failed to send the transaction
// and we will retry again.
return false, err
default:
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"destination": oq.destination, "destination": oq.destination,
log.ErrorKey: err, log.ErrorKey: err,
}).Info("problem sending transaction") }).Info("problem sending transaction")
return false, err return false, err
} }
return true, nil
} }
// nextInvite takes pending invite events from the queue and sends // nextInvite takes pending invite events from the queue and sends
@ -253,7 +286,26 @@ func (oq *destinationQueue) nextInvites(
oq.destination, oq.destination,
*inviteReq, *inviteReq,
) )
if err != nil { switch e := err.(type) {
case nil:
done++
case gomatrix.HTTPError:
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": ev.StateKey(),
"destination": oq.destination,
"status_code": e.Code,
}).WithError(err).Error("failed to send invite due to HTTP error")
// Check whether we should do something about the error or
// just accept it as unavoidable.
if e.Code >= 400 && e.Code <= 499 {
// We tried but the remote side has sent back a client error.
// It's no use retrying because it will happen again.
done++
continue
}
return done, err
default:
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
"state_key": ev.StateKey(), "state_key": ev.StateKey(),
@ -274,8 +326,6 @@ func (oq *destinationQueue) nextInvites(
}).WithError(err).Error("failed to return signed invite to roomserver") }).WithError(err).Error("failed to return signed invite to roomserver")
return done, err return done, err
} }
done++
} }
return done, nil return done, nil