Try refactoring /send concurrency

This commit is contained in:
Neil Alexander 2021-07-02 10:47:09 +01:00
parent 39afdcfdd3
commit 6f129ca0ee
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 172 additions and 88 deletions

View file

@ -22,9 +22,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"sync" "sync"
"time"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
@ -36,6 +34,7 @@ import (
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"go.uber.org/atomic"
) )
const ( const (
@ -191,11 +190,26 @@ type txnFederationClient interface {
roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error)
} }
type inputTask struct {
ctx context.Context
t *txnReq
event *gomatrixserverlib.Event
wg *sync.WaitGroup
err error // written back by worker, only safe to read when all tasks are done
}
type inputWorker struct {
running atomic.Bool
input *fifoQueue
}
var inputWorkers sync.Map // room ID -> *inputWorker
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
results := make(map[string]gomatrixserverlib.PDUResult) results := make(map[string]gomatrixserverlib.PDUResult)
var resultsMutex sync.Mutex //var resultsMutex sync.Mutex
pdus := []*gomatrixserverlib.HeaderedEvent{} var wg sync.WaitGroup
for _, pdu := range t.PDUs { for _, pdu := range t.PDUs {
pduCountTotal.WithLabelValues("total").Inc() pduCountTotal.WithLabelValues("total").Inc()
var header struct { var header struct {
@ -246,106 +260,112 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} }
continue continue
} }
pdus = append(pdus, event.Headered(verRes.RoomVersion)) v, _ := inputWorkers.LoadOrStore(event.RoomID(), &inputWorker{
input: newFIFOQueue(),
})
worker := v.(*inputWorker)
if !worker.running.Load() {
go worker.run()
}
wg.Add(1)
worker.input.push(&inputTask{
ctx: ctx,
t: t,
event: event,
wg: &wg,
})
} }
perRoom := map[string]chan *gomatrixserverlib.Event{} wg.Wait()
perCount := map[string]int{}
for _, e := range pdus {
perCount[e.RoomID()]++
}
for s, c := range perCount {
perRoom[s] = make(chan *gomatrixserverlib.Event, c)
}
for _, e := range pdus {
perRoom[e.RoomID()] <- e.Unwrap()
}
pdus = nil // nolint:ineffassign
var wg sync.WaitGroup
wg.Add(len(perRoom) + 1)
go func() { go func() {
defer wg.Done() defer wg.Done()
t.processEDUs(ctx) t.processEDUs(ctx)
}() }()
for _, q := range perRoom {
go func(q chan *gomatrixserverlib.Event) {
defer wg.Done()
for e := range q {
evStart := time.Now()
if err := t.processEvent(ctx, e); err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
//
// However if the event is due to a temporary failure in our server
// such as a database being unavailable then we should bail, and
// hope that the sender will retry when we are feeling better.
//
// It is uncertain what we should do if an event fails because
// we failed to fetch more information from the sending server.
// For example if a request to /state fails.
// If we skip the event then we risk missing the event until we
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
if isProcessingErrorFatal(err) {
sentry.CaptureException(err)
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
} else {
// Auth errors mean the event is 'rejected' which have to be silent to appease sytest
errMsg := ""
outcome := MetricsOutcomeRejected
_, rejected := err.(*gomatrixserverlib.NotAllowed)
if !rejected {
errMsg = err.Error()
outcome = MetricsOutcomeFail
}
util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
"Failed to process incoming federation event, skipping",
)
processEventSummary.WithLabelValues(t.work, outcome).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
resultsMutex.Lock()
results[e.EventID()] = gomatrixserverlib.PDUResult{
Error: errMsg,
}
resultsMutex.Unlock()
}
} else {
resultsMutex.Lock()
results[e.EventID()] = gomatrixserverlib.PDUResult{}
resultsMutex.Unlock()
pduCountTotal.WithLabelValues("success").Inc()
processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
}
}
}(q)
}
wg.Wait() wg.Wait()
for k := range perRoom {
close(perRoom[k])
perRoom[k] = nil
}
if c := len(results); c > 0 { if c := len(results); c > 0 {
util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID) util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID)
} }
return &gomatrixserverlib.RespSend{PDUs: results}, nil return &gomatrixserverlib.RespSend{PDUs: results}, nil
} }
func (t *inputWorker) run() {
if !t.running.CAS(false, true) {
return
}
defer t.running.Store(false)
for {
task, ok := t.input.pop()
if !ok {
return
}
if task == nil {
continue
}
//evStart := time.Now()
task.err = task.t.processEvent(task.ctx, task.event)
/*
if task.err = task.t.processEvent(task.ctx, task.event); task.err != nil {
err := task.err
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
//
// However if the event is due to a temporary failure in our server
// such as a database being unavailable then we should bail, and
// hope that the sender will retry when we are feeling better.
//
// It is uncertain what we should do if an event fails because
// we failed to fetch more information from the sending server.
// For example if a request to /state fails.
// If we skip the event then we risk missing the event until we
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
if isProcessingErrorFatal(err) {
sentry.CaptureException(err)
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
util.GetLogger(task.ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
} else {
// Auth errors mean the event is 'rejected' which have to be silent to appease sytest
errMsg := ""
outcome := MetricsOutcomeRejected
_, rejected := err.(*gomatrixserverlib.NotAllowed)
if !rejected {
errMsg = err.Error()
outcome = MetricsOutcomeFail
}
util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
"Failed to process incoming federation event, skipping",
)
processEventSummary.WithLabelValues(t.work, outcome).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
resultsMutex.Lock()
results[e.EventID()] = gomatrixserverlib.PDUResult{
Error: errMsg,
}
resultsMutex.Unlock()
}
} else {
resultsMutex.Lock()
results[e.EventID()] = gomatrixserverlib.PDUResult{}
resultsMutex.Unlock()
pduCountTotal.WithLabelValues("success").Inc()
processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe(
float64(time.Since(evStart).Nanoseconds()) / 1000.,
)
}
*/
}
}
// isProcessingErrorFatal returns true if the error is really bad and // isProcessingErrorFatal returns true if the error is really bad and
// we should stop processing the transaction, and returns false if it // we should stop processing the transaction, and returns false if it
// is just some less serious error about a specific event. // is just some less serious error about a specific event.

View file

@ -0,0 +1,64 @@
package routing
import (
"sync"
)
type fifoQueue struct {
tasks []*inputTask
count int
mutex sync.Mutex
notifs chan struct{}
}
func newFIFOQueue() *fifoQueue {
q := &fifoQueue{
notifs: make(chan struct{}, 1),
}
return q
}
func (q *fifoQueue) push(frame *inputTask) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.tasks = append(q.tasks, frame)
q.count++
select {
case q.notifs <- struct{}{}:
default:
}
}
// pop returns the first item of the queue, if there is one.
// The second return value will indicate if a task was returned.
// You must check this value, even after calling wait().
func (q *fifoQueue) pop() (*inputTask, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count == 0 {
return nil, false
}
frame := q.tasks[0]
q.tasks[0] = nil
q.tasks = q.tasks[1:]
q.count--
if q.count == 0 {
// Force a GC of the underlying array, since it might have
// grown significantly if the queue was hammered for some reason
q.tasks = nil
}
return frame, true
}
// wait returns a channel which can be used to detect when an
// item is waiting in the queue.
func (q *fifoQueue) wait() <-chan struct{} {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count > 0 && len(q.notifs) == 0 {
ch := make(chan struct{})
close(ch)
return ch
}
return q.notifs
}