diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 7ab6f1f16..a574950cd 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" @@ -102,11 +103,26 @@ func (oq *destinationQueue) backgroundSend() { // idle timeout. select { case pdu := <-oq.incomingPDUs: + // Ordering of PDUs is important so we add them to the end + // of the queue and they will all be added to transactions + // in order. oq.pendingPDUs = append(oq.pendingPDUs, pdu) case edu := <-oq.incomingEDUs: + // Likewise for EDUs, although we should probably not try + // too hard with some EDUs (like typing notifications) after + // a certain amount of time has passed. + // TODO: think about EDU expiry some more oq.pendingEDUs = append(oq.pendingEDUs, edu) case invite := <-oq.incomingInvites: - oq.pendingInvites = append(oq.pendingInvites, invite) + // There's no strict ordering requirement for invites like + // there is for transactions, so we put the invite onto the + // front of the queue. This means that if an invite that is + // stuck failing already, that it won't block our new invite + // from being sent. + oq.pendingInvites = append( + []*gomatrixserverlib.InviteV2Request{invite}, + oq.pendingInvites..., + ) case <-time.After(time.Second * 30): // The worker is idle so stop the goroutine. It'll // get restarted automatically the next time we @@ -221,16 +237,33 @@ func (oq *destinationQueue) nextTransaction( logrus.WithField("server_name", oq.destination).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) + // TODO: we should check for 500-ish fails vs 400-ish here, + // since we shouldn't queue things indefinitely in response + // to a 400-ish error _, err := oq.client.SendTransaction(context.TODO(), t) - if err != nil { + switch e := err.(type) { + case nil: + // No error was returned so the transaction looks to have + // been successfully sent. + return true, nil + case gomatrix.HTTPError: + // We received a HTTP error back. In this instance we only + // should report an error if + if e.Code >= 400 && e.Code <= 499 { + // We tried but the remote side has sent back a client error. + // It's no use retrying because it will happen again. + return true, nil + } + // Otherwise, report that we failed to send the transaction + // and we will retry again. + return false, err + default: log.WithFields(log.Fields{ "destination": oq.destination, log.ErrorKey: err, }).Info("problem sending transaction") return false, err } - - return true, nil } // nextInvite takes pending invite events from the queue and sends @@ -253,7 +286,26 @@ func (oq *destinationQueue) nextInvites( oq.destination, *inviteReq, ) - if err != nil { + switch e := err.(type) { + case nil: + done++ + case gomatrix.HTTPError: + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "state_key": ev.StateKey(), + "destination": oq.destination, + "status_code": e.Code, + }).WithError(err).Error("failed to send invite due to HTTP error") + // Check whether we should do something about the error or + // just accept it as unavoidable. + if e.Code >= 400 && e.Code <= 499 { + // We tried but the remote side has sent back a client error. + // It's no use retrying because it will happen again. + done++ + continue + } + return done, err + default: log.WithFields(log.Fields{ "event_id": ev.EventID(), "state_key": ev.StateKey(), @@ -274,8 +326,6 @@ func (oq *destinationQueue) nextInvites( }).WithError(err).Error("failed to return signed invite to roomserver") return done, err } - - done++ } return done, nil