From 6582b639f0467dac5785026a4f6c8fe87b86d3c2 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 2 Jul 2021 11:35:57 +0100 Subject: [PATCH] Respond to transaction --- federationapi/routing/send.go | 144 +++++++++++++--------------------- 1 file changed, 56 insertions(+), 88 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index a4c88745c..8192b0c12 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -16,12 +16,12 @@ package routing import ( "context" - "database/sql" "encoding/json" "errors" "fmt" "net/http" "sync" + "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" @@ -89,6 +89,22 @@ func init() { ) } +type inputTask struct { + ctx context.Context + t *txnReq + event *gomatrixserverlib.Event + wg *sync.WaitGroup + err error // written back by worker, only safe to read when all tasks are done + duration time.Duration // written back by worker, only safe to read when all tasks are done +} + +type inputWorker struct { + running atomic.Bool + input *fifoQueue +} + +var inputWorkers sync.Map // room ID -> *inputWorker + // Send implements /_matrix/federation/v1/send/{txnID} func Send( httpReq *http.Request, @@ -190,26 +206,12 @@ type txnFederationClient interface { roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) } -type inputTask struct { - ctx context.Context - t *txnReq - event *gomatrixserverlib.Event - wg *sync.WaitGroup - err error // written back by worker, only safe to read when all tasks are done -} - -type inputWorker struct { - running atomic.Bool - input *fifoQueue -} - -var inputWorkers sync.Map // room ID -> *inputWorker - func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { results := make(map[string]gomatrixserverlib.PDUResult) //var resultsMutex sync.Mutex var wg sync.WaitGroup + var tasks []*inputTask wg.Add(1) // for processEDUs for _, pdu := range t.PDUs { @@ -270,12 +272,14 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res go worker.run() } wg.Add(1) - worker.input.push(&inputTask{ + task := &inputTask{ ctx: ctx, t: t, event: event, wg: &wg, - }) + } + tasks = append(tasks, task) + worker.input.push(task) } go func() { @@ -285,6 +289,14 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res wg.Wait() + for _, task := range tasks { + if task.err != nil { + results[task.event.EventID()] = gomatrixserverlib.PDUResult{ + Error: task.err.Error(), + } + } + } + if c := len(results); c > 0 { util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID) } @@ -311,82 +323,38 @@ func (t *inputWorker) run() { task.err = context.DeadlineExceeded return default: + evStart := time.Now() task.err = task.t.processEvent(task.ctx, task.event) + task.duration = time.Since(evStart) + if err := task.err; err != nil { + switch err.(type) { + case *gomatrixserverlib.NotAllowed: + processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeRejected).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + util.GetLogger(task.ctx).WithError(err).WithField("event_id", task.event.EventID()).WithField("rejected", true).Warn( + "Failed to process incoming federation event, skipping", + ) + task.err = nil // make "rejected" failures silent + default: + processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeFail).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + util.GetLogger(task.ctx).WithError(err).WithField("event_id", task.event.EventID()).WithField("rejected", false).Warn( + "Failed to process incoming federation event, skipping", + ) + } + } else { + pduCountTotal.WithLabelValues("success").Inc() + processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeOK).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + } } }() - //evStart := time.Now() - /* - if task.err = task.t.processEvent(task.ctx, task.event); task.err != nil { - err := task.err - // If the error is due to the event itself being bad then we skip - // it and move onto the next event. We report an error so that the - // sender knows that we have skipped processing it. - // - // However if the event is due to a temporary failure in our server - // such as a database being unavailable then we should bail, and - // hope that the sender will retry when we are feeling better. - // - // It is uncertain what we should do if an event fails because - // we failed to fetch more information from the sending server. - // For example if a request to /state fails. - // If we skip the event then we risk missing the event until we - // receive another event referencing it. - // If we bail and stop processing then we risk wedging incoming - // transactions from that server forever. - if isProcessingErrorFatal(err) { - sentry.CaptureException(err) - // Any other error should be the result of a temporary error in - // our server so we should bail processing the transaction entirely. - util.GetLogger(task.ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) - processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - } else { - // Auth errors mean the event is 'rejected' which have to be silent to appease sytest - errMsg := "" - outcome := MetricsOutcomeRejected - _, rejected := err.(*gomatrixserverlib.NotAllowed) - if !rejected { - errMsg = err.Error() - outcome = MetricsOutcomeFail - } - util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( - "Failed to process incoming federation event, skipping", - ) - processEventSummary.WithLabelValues(t.work, outcome).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - resultsMutex.Lock() - results[e.EventID()] = gomatrixserverlib.PDUResult{ - Error: errMsg, - } - resultsMutex.Unlock() - } - } else { - resultsMutex.Lock() - results[e.EventID()] = gomatrixserverlib.PDUResult{} - resultsMutex.Unlock() - pduCountTotal.WithLabelValues("success").Inc() - processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - } - */ } } -// isProcessingErrorFatal returns true if the error is really bad and -// we should stop processing the transaction, and returns false if it -// is just some less serious error about a specific event. -func isProcessingErrorFatal(err error) bool { - switch err { - case sql.ErrConnDone: - case sql.ErrTxDone: - return true - } - return false -} - type roomNotFoundError struct { roomID string }