Try to process rooms concurrently in FS /send

This commit is contained in:
Neil Alexander 2021-06-30 15:52:57 +01:00
parent 2647f6e9c5
commit 205b2516db
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -193,6 +193,7 @@ type txnFederationClient interface {
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
results := make(map[string]gomatrixserverlib.PDUResult) results := make(map[string]gomatrixserverlib.PDUResult)
var resultsMutex sync.Mutex
pdus := []*gomatrixserverlib.HeaderedEvent{} pdus := []*gomatrixserverlib.HeaderedEvent{}
for _, pdu := range t.PDUs { for _, pdu := range t.PDUs {
@ -248,63 +249,130 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
pdus = append(pdus, event.Headered(verRes.RoomVersion)) pdus = append(pdus, event.Headered(verRes.RoomVersion))
} }
// Process the events. perRoom := map[string]chan *gomatrixserverlib.Event{}
perCount := map[string]int{}
for _, e := range pdus { for _, e := range pdus {
evStart := time.Now() perCount[e.RoomID()]++
if err := t.processEvent(ctx, e.Unwrap()); err != nil { }
// If the error is due to the event itself being bad then we skip for s, c := range perCount {
// it and move onto the next event. We report an error so that the perRoom[s] = make(chan *gomatrixserverlib.Event, c)
// sender knows that we have skipped processing it. }
// for _, e := range pdus {
// However if the event is due to a temporary failure in our server perRoom[e.RoomID()] <- e.Unwrap()
// such as a database being unavailable then we should bail, and }
// hope that the sender will retry when we are feeling better.
// var wg sync.WaitGroup
// It is uncertain what we should do if an event fails because wg.Add(len(perRoom))
// we failed to fetch more information from the sending server.
// For example if a request to /state fails. for _, q := range perRoom {
// If we skip the event then we risk missing the event until we go func(q chan *gomatrixserverlib.Event) {
// receive another event referencing it. defer wg.Done()
// If we bail and stop processing then we risk wedging incoming for e := range q {
// transactions from that server forever. evStart := time.Now()
if isProcessingErrorFatal(err) { if err := t.processEvent(ctx, e); err != nil {
sentry.CaptureException(err) if isProcessingErrorFatal(err) {
// Any other error should be the result of a temporary error in sentry.CaptureException(err)
// our server so we should bail processing the transaction entirely. // Any other error should be the result of a temporary error in
util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) // our server so we should bail processing the transaction entirely.
jsonErr := util.ErrorResponse(err) util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe( processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000., float64(time.Since(evStart).Nanoseconds()) / 1000.,
) )
return nil, &jsonErr } else {
} else { // Auth errors mean the event is 'rejected' which have to be silent to appease sytest
// Auth errors mean the event is 'rejected' which have to be silent to appease sytest errMsg := ""
errMsg := "" outcome := MetricsOutcomeRejected
outcome := MetricsOutcomeRejected _, rejected := err.(*gomatrixserverlib.NotAllowed)
_, rejected := err.(*gomatrixserverlib.NotAllowed) if !rejected {
if !rejected { errMsg = err.Error()
errMsg = err.Error() outcome = MetricsOutcomeFail
outcome = MetricsOutcomeFail }
} util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( "Failed to process incoming federation event, skipping",
"Failed to process incoming federation event, skipping", )
) processEventSummary.WithLabelValues(t.work, outcome).Observe(
processEventSummary.WithLabelValues(t.work, outcome).Observe( float64(time.Since(evStart).Nanoseconds()) / 1000.,
float64(time.Since(evStart).Nanoseconds()) / 1000., )
) resultsMutex.Lock()
results[e.EventID()] = gomatrixserverlib.PDUResult{ results[e.EventID()] = gomatrixserverlib.PDUResult{
Error: errMsg, Error: errMsg,
}
resultsMutex.Unlock()
}
} else {
resultsMutex.Lock()
results[e.EventID()] = gomatrixserverlib.PDUResult{}
resultsMutex.Unlock()
pduCountTotal.WithLabelValues("success").Inc()
processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
} }
} }
} else { }(q)
results[e.EventID()] = gomatrixserverlib.PDUResult{}
pduCountTotal.WithLabelValues("success").Inc()
processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
}
} }
wg.Wait()
/*
// Process the events.
for _, e := range pdus {
evStart := time.Now()
if err := t.processEvent(ctx, e.Unwrap()); err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
//
// However if the event is due to a temporary failure in our server
// such as a database being unavailable then we should bail, and
// hope that the sender will retry when we are feeling better.
//
// It is uncertain what we should do if an event fails because
// we failed to fetch more information from the sending server.
// For example if a request to /state fails.
// If we skip the event then we risk missing the event until we
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
if isProcessingErrorFatal(err) {
sentry.CaptureException(err)
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
jsonErr := util.ErrorResponse(err)
processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
return nil, &jsonErr
} else {
// Auth errors mean the event is 'rejected' which have to be silent to appease sytest
errMsg := ""
outcome := MetricsOutcomeRejected
_, rejected := err.(*gomatrixserverlib.NotAllowed)
if !rejected {
errMsg = err.Error()
outcome = MetricsOutcomeFail
}
util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
"Failed to process incoming federation event, skipping",
)
processEventSummary.WithLabelValues(t.work, outcome).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
results[e.EventID()] = gomatrixserverlib.PDUResult{
Error: errMsg,
}
}
} else {
results[e.EventID()] = gomatrixserverlib.PDUResult{}
pduCountTotal.WithLabelValues("success").Inc()
processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
}
}
*/
t.processEDUs(ctx) t.processEDUs(ctx)
if c := len(results); c > 0 { if c := len(results); c > 0 {
util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID) util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID)