From 6f129ca0eef4c0c7200805240eaa8dabced8fb15 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 2 Jul 2021 10:47:09 +0100 Subject: [PATCH] Try refactoring /send concurrency --- federationapi/routing/send.go | 196 ++++++++++++++++------------- federationapi/routing/send_fifo.go | 64 ++++++++++ 2 files changed, 172 insertions(+), 88 deletions(-) create mode 100644 federationapi/routing/send_fifo.go diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index f3dd62e8c..b7341dd32 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -22,9 +22,7 @@ import ( "fmt" "net/http" "sync" - "time" - "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/clientapi/jsonerror" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" @@ -36,6 +34,7 @@ import ( "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) const ( @@ -191,11 +190,26 @@ type txnFederationClient interface { roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) } +type inputTask struct { + ctx context.Context + t *txnReq + event *gomatrixserverlib.Event + wg *sync.WaitGroup + err error // written back by worker, only safe to read when all tasks are done +} + +type inputWorker struct { + running atomic.Bool + input *fifoQueue +} + +var inputWorkers sync.Map // room ID -> *inputWorker + func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { results := make(map[string]gomatrixserverlib.PDUResult) - var resultsMutex sync.Mutex + //var resultsMutex sync.Mutex - pdus := []*gomatrixserverlib.HeaderedEvent{} + var wg sync.WaitGroup for _, pdu := range t.PDUs { pduCountTotal.WithLabelValues("total").Inc() var header struct { @@ -246,106 +260,112 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res } continue } - pdus = append(pdus, event.Headered(verRes.RoomVersion)) + v, _ := inputWorkers.LoadOrStore(event.RoomID(), &inputWorker{ + input: newFIFOQueue(), + }) + worker := v.(*inputWorker) + if !worker.running.Load() { + go worker.run() + } + wg.Add(1) + worker.input.push(&inputTask{ + ctx: ctx, + t: t, + event: event, + wg: &wg, + }) } - perRoom := map[string]chan *gomatrixserverlib.Event{} - perCount := map[string]int{} - for _, e := range pdus { - perCount[e.RoomID()]++ - } - for s, c := range perCount { - perRoom[s] = make(chan *gomatrixserverlib.Event, c) - } - for _, e := range pdus { - perRoom[e.RoomID()] <- e.Unwrap() - } - pdus = nil // nolint:ineffassign - - var wg sync.WaitGroup - wg.Add(len(perRoom) + 1) + wg.Wait() go func() { defer wg.Done() t.processEDUs(ctx) }() - for _, q := range perRoom { - go func(q chan *gomatrixserverlib.Event) { - defer wg.Done() - for e := range q { - evStart := time.Now() - if err := t.processEvent(ctx, e); err != nil { - // If the error is due to the event itself being bad then we skip - // it and move onto the next event. We report an error so that the - // sender knows that we have skipped processing it. - // - // However if the event is due to a temporary failure in our server - // such as a database being unavailable then we should bail, and - // hope that the sender will retry when we are feeling better. - // - // It is uncertain what we should do if an event fails because - // we failed to fetch more information from the sending server. - // For example if a request to /state fails. - // If we skip the event then we risk missing the event until we - // receive another event referencing it. - // If we bail and stop processing then we risk wedging incoming - // transactions from that server forever. - if isProcessingErrorFatal(err) { - sentry.CaptureException(err) - // Any other error should be the result of a temporary error in - // our server so we should bail processing the transaction entirely. - util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) - processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - } else { - // Auth errors mean the event is 'rejected' which have to be silent to appease sytest - errMsg := "" - outcome := MetricsOutcomeRejected - _, rejected := err.(*gomatrixserverlib.NotAllowed) - if !rejected { - errMsg = err.Error() - outcome = MetricsOutcomeFail - } - util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( - "Failed to process incoming federation event, skipping", - ) - processEventSummary.WithLabelValues(t.work, outcome).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - resultsMutex.Lock() - results[e.EventID()] = gomatrixserverlib.PDUResult{ - Error: errMsg, - } - resultsMutex.Unlock() - } - } else { - resultsMutex.Lock() - results[e.EventID()] = gomatrixserverlib.PDUResult{} - resultsMutex.Unlock() - pduCountTotal.WithLabelValues("success").Inc() - processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - } - } - }(q) - } - wg.Wait() - for k := range perRoom { - close(perRoom[k]) - perRoom[k] = nil - } - if c := len(results); c > 0 { util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID) } return &gomatrixserverlib.RespSend{PDUs: results}, nil } +func (t *inputWorker) run() { + if !t.running.CAS(false, true) { + return + } + defer t.running.Store(false) + for { + task, ok := t.input.pop() + if !ok { + return + } + if task == nil { + continue + } + //evStart := time.Now() + task.err = task.t.processEvent(task.ctx, task.event) + /* + if task.err = task.t.processEvent(task.ctx, task.event); task.err != nil { + err := task.err + // If the error is due to the event itself being bad then we skip + // it and move onto the next event. We report an error so that the + // sender knows that we have skipped processing it. + // + // However if the event is due to a temporary failure in our server + // such as a database being unavailable then we should bail, and + // hope that the sender will retry when we are feeling better. + // + // It is uncertain what we should do if an event fails because + // we failed to fetch more information from the sending server. + // For example if a request to /state fails. + // If we skip the event then we risk missing the event until we + // receive another event referencing it. + // If we bail and stop processing then we risk wedging incoming + // transactions from that server forever. + if isProcessingErrorFatal(err) { + sentry.CaptureException(err) + // Any other error should be the result of a temporary error in + // our server so we should bail processing the transaction entirely. + util.GetLogger(task.ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) + processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + } else { + // Auth errors mean the event is 'rejected' which have to be silent to appease sytest + errMsg := "" + outcome := MetricsOutcomeRejected + _, rejected := err.(*gomatrixserverlib.NotAllowed) + if !rejected { + errMsg = err.Error() + outcome = MetricsOutcomeFail + } + util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( + "Failed to process incoming federation event, skipping", + ) + processEventSummary.WithLabelValues(t.work, outcome).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + resultsMutex.Lock() + results[e.EventID()] = gomatrixserverlib.PDUResult{ + Error: errMsg, + } + resultsMutex.Unlock() + } + } else { + resultsMutex.Lock() + results[e.EventID()] = gomatrixserverlib.PDUResult{} + resultsMutex.Unlock() + pduCountTotal.WithLabelValues("success").Inc() + processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + } + */ + } +} + // isProcessingErrorFatal returns true if the error is really bad and // we should stop processing the transaction, and returns false if it // is just some less serious error about a specific event. diff --git a/federationapi/routing/send_fifo.go b/federationapi/routing/send_fifo.go new file mode 100644 index 000000000..81335a8ac --- /dev/null +++ b/federationapi/routing/send_fifo.go @@ -0,0 +1,64 @@ +package routing + +import ( + "sync" +) + +type fifoQueue struct { + tasks []*inputTask + count int + mutex sync.Mutex + notifs chan struct{} +} + +func newFIFOQueue() *fifoQueue { + q := &fifoQueue{ + notifs: make(chan struct{}, 1), + } + return q +} + +func (q *fifoQueue) push(frame *inputTask) { + q.mutex.Lock() + defer q.mutex.Unlock() + q.tasks = append(q.tasks, frame) + q.count++ + select { + case q.notifs <- struct{}{}: + default: + } +} + +// pop returns the first item of the queue, if there is one. +// The second return value will indicate if a task was returned. +// You must check this value, even after calling wait(). +func (q *fifoQueue) pop() (*inputTask, bool) { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.count == 0 { + return nil, false + } + frame := q.tasks[0] + q.tasks[0] = nil + q.tasks = q.tasks[1:] + q.count-- + if q.count == 0 { + // Force a GC of the underlying array, since it might have + // grown significantly if the queue was hammered for some reason + q.tasks = nil + } + return frame, true +} + +// wait returns a channel which can be used to detect when an +// item is waiting in the queue. +func (q *fifoQueue) wait() <-chan struct{} { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.count > 0 && len(q.notifs) == 0 { + ch := make(chan struct{}) + close(ch) + return ch + } + return q.notifs +}