Respond to transaction

This commit is contained in:
Neil Alexander 2021-07-02 11:35:57 +01:00
parent ef41906e49
commit 6582b639f0
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -16,12 +16,12 @@ package routing
import ( import (
"context" "context"
"database/sql"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"sync" "sync"
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
@ -89,6 +89,22 @@ func init() {
) )
} }
type inputTask struct {
ctx context.Context
t *txnReq
event *gomatrixserverlib.Event
wg *sync.WaitGroup
err error // written back by worker, only safe to read when all tasks are done
duration time.Duration // written back by worker, only safe to read when all tasks are done
}
type inputWorker struct {
running atomic.Bool
input *fifoQueue
}
var inputWorkers sync.Map // room ID -> *inputWorker
// Send implements /_matrix/federation/v1/send/{txnID} // Send implements /_matrix/federation/v1/send/{txnID}
func Send( func Send(
httpReq *http.Request, httpReq *http.Request,
@ -190,26 +206,12 @@ type txnFederationClient interface {
roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error)
} }
type inputTask struct {
ctx context.Context
t *txnReq
event *gomatrixserverlib.Event
wg *sync.WaitGroup
err error // written back by worker, only safe to read when all tasks are done
}
type inputWorker struct {
running atomic.Bool
input *fifoQueue
}
var inputWorkers sync.Map // room ID -> *inputWorker
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
results := make(map[string]gomatrixserverlib.PDUResult) results := make(map[string]gomatrixserverlib.PDUResult)
//var resultsMutex sync.Mutex //var resultsMutex sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup
var tasks []*inputTask
wg.Add(1) // for processEDUs wg.Add(1) // for processEDUs
for _, pdu := range t.PDUs { for _, pdu := range t.PDUs {
@ -270,12 +272,14 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
go worker.run() go worker.run()
} }
wg.Add(1) wg.Add(1)
worker.input.push(&inputTask{ task := &inputTask{
ctx: ctx, ctx: ctx,
t: t, t: t,
event: event, event: event,
wg: &wg, wg: &wg,
}) }
tasks = append(tasks, task)
worker.input.push(task)
} }
go func() { go func() {
@ -285,6 +289,14 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
wg.Wait() wg.Wait()
for _, task := range tasks {
if task.err != nil {
results[task.event.EventID()] = gomatrixserverlib.PDUResult{
Error: task.err.Error(),
}
}
}
if c := len(results); c > 0 { if c := len(results); c > 0 {
util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID) util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID)
} }
@ -311,80 +323,36 @@ func (t *inputWorker) run() {
task.err = context.DeadlineExceeded task.err = context.DeadlineExceeded
return return
default: default:
evStart := time.Now()
task.err = task.t.processEvent(task.ctx, task.event) task.err = task.t.processEvent(task.ctx, task.event)
} task.duration = time.Since(evStart)
}() if err := task.err; err != nil {
//evStart := time.Now() switch err.(type) {
/* case *gomatrixserverlib.NotAllowed:
if task.err = task.t.processEvent(task.ctx, task.event); task.err != nil { processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeRejected).Observe(
err := task.err
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
//
// However if the event is due to a temporary failure in our server
// such as a database being unavailable then we should bail, and
// hope that the sender will retry when we are feeling better.
//
// It is uncertain what we should do if an event fails because
// we failed to fetch more information from the sending server.
// For example if a request to /state fails.
// If we skip the event then we risk missing the event until we
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
if isProcessingErrorFatal(err) {
sentry.CaptureException(err)
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
util.GetLogger(task.ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000., float64(time.Since(evStart).Nanoseconds()) / 1000.,
) )
} else { util.GetLogger(task.ctx).WithError(err).WithField("event_id", task.event.EventID()).WithField("rejected", true).Warn(
// Auth errors mean the event is 'rejected' which have to be silent to appease sytest
errMsg := ""
outcome := MetricsOutcomeRejected
_, rejected := err.(*gomatrixserverlib.NotAllowed)
if !rejected {
errMsg = err.Error()
outcome = MetricsOutcomeFail
}
util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
"Failed to process incoming federation event, skipping", "Failed to process incoming federation event, skipping",
) )
processEventSummary.WithLabelValues(t.work, outcome).Observe( task.err = nil // make "rejected" failures silent
default:
processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeFail).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000., float64(time.Since(evStart).Nanoseconds()) / 1000.,
) )
resultsMutex.Lock() util.GetLogger(task.ctx).WithError(err).WithField("event_id", task.event.EventID()).WithField("rejected", false).Warn(
results[e.EventID()] = gomatrixserverlib.PDUResult{ "Failed to process incoming federation event, skipping",
Error: errMsg, )
}
resultsMutex.Unlock()
} }
} else { } else {
resultsMutex.Lock()
results[e.EventID()] = gomatrixserverlib.PDUResult{}
resultsMutex.Unlock()
pduCountTotal.WithLabelValues("success").Inc() pduCountTotal.WithLabelValues("success").Inc()
processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe( processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeOK).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000., float64(time.Since(evStart).Nanoseconds()) / 1000.,
) )
} }
*/
} }
} }()
// isProcessingErrorFatal returns true if the error is really bad and
// we should stop processing the transaction, and returns false if it
// is just some less serious error about a specific event.
func isProcessingErrorFatal(err error) bool {
switch err {
case sql.ErrConnDone:
case sql.ErrTxDone:
return true
} }
return false
} }
type roomNotFoundError struct { type roomNotFoundError struct {