This commit is contained in:
Neil Alexander 2021-06-30 15:58:59 +01:00
parent 205b2516db
commit 671ac8ac42
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -270,6 +270,21 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
for e := range q {
evStart := time.Now()
if err := t.processEvent(ctx, e); err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
//
// However if the event is due to a temporary failure in our server
// such as a database being unavailable then we should bail, and
// hope that the sender will retry when we are feeling better.
//
// It is uncertain what we should do if an event fails because
// we failed to fetch more information from the sending server.
// For example if a request to /state fails.
// If we skip the event then we risk missing the event until we
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
if isProcessingErrorFatal(err) {
sentry.CaptureException(err)
// Any other error should be the result of a temporary error in
@ -314,64 +329,10 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
wg.Wait()
/*
// Process the events.
for _, e := range pdus {
evStart := time.Now()
if err := t.processEvent(ctx, e.Unwrap()); err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
//
// However if the event is due to a temporary failure in our server
// such as a database being unavailable then we should bail, and
// hope that the sender will retry when we are feeling better.
//
// It is uncertain what we should do if an event fails because
// we failed to fetch more information from the sending server.
// For example if a request to /state fails.
// If we skip the event then we risk missing the event until we
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
if isProcessingErrorFatal(err) {
sentry.CaptureException(err)
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
jsonErr := util.ErrorResponse(err)
processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
return nil, &jsonErr
} else {
// Auth errors mean the event is 'rejected' which have to be silent to appease sytest
errMsg := ""
outcome := MetricsOutcomeRejected
_, rejected := err.(*gomatrixserverlib.NotAllowed)
if !rejected {
errMsg = err.Error()
outcome = MetricsOutcomeFail
}
util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
"Failed to process incoming federation event, skipping",
)
processEventSummary.WithLabelValues(t.work, outcome).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
results[e.EventID()] = gomatrixserverlib.PDUResult{
Error: errMsg,
}
}
} else {
results[e.EventID()] = gomatrixserverlib.PDUResult{}
pduCountTotal.WithLabelValues("success").Inc()
processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
}
}
*/
for k := range perRoom {
close(perRoom[k])
perRoom[k] = nil
}
t.processEDUs(ctx)
if c := len(results); c > 0 {