Federation API fixes (#1899)

* Ensure worker has work before starting goroutine

* Revert "Remove processEventWithMissingStateMutex"

This reverts commit 7f02eab47d.

* Use request context when processing transactions

* Keep goroutine count down by not starting work for things where the caller gave up

* Remove mutex, start workers at correct time
This commit is contained in:
Neil Alexander 2021-07-05 12:14:31 +01:00 committed by GitHub
parent 7f02eab47d
commit 99d8e1c107
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -205,7 +205,7 @@ func Send(
util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
resp, jsonErr := t.processTransaction(context.Background()) resp, jsonErr := t.processTransaction(httpReq.Context())
if jsonErr != nil { if jsonErr != nil {
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
return *jsonErr return *jsonErr
@ -253,11 +253,8 @@ type txnFederationClient interface {
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
results := make(map[string]gomatrixserverlib.PDUResult) results := make(map[string]gomatrixserverlib.PDUResult)
//var resultsMutex sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup
var tasks []*inputTask var tasks []*inputTask
wg.Add(1) // for processEDUs
for _, pdu := range t.PDUs { for _, pdu := range t.PDUs {
pduCountTotal.WithLabelValues("total").Inc() pduCountTotal.WithLabelValues("total").Inc()
@ -313,9 +310,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
input: newSendFIFOQueue(), input: newSendFIFOQueue(),
}) })
worker := v.(*inputWorker) worker := v.(*inputWorker)
if !worker.running.Load() {
go worker.run()
}
wg.Add(1) wg.Add(1)
task := &inputTask{ task := &inputTask{
ctx: ctx, ctx: ctx,
@ -325,13 +319,12 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} }
tasks = append(tasks, task) tasks = append(tasks, task)
worker.input.push(task) worker.input.push(task)
if worker.running.CAS(false, true) {
go worker.run()
}
} }
go func() {
defer wg.Done()
t.processEDUs(ctx) t.processEDUs(ctx)
}()
wg.Wait() wg.Wait()
for _, task := range tasks { for _, task := range tasks {
@ -351,9 +344,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} }
func (t *inputWorker) run() { func (t *inputWorker) run() {
if !t.running.CAS(false, true) {
return
}
defer t.running.Store(false) defer t.running.Store(false)
for { for {
task, ok := t.input.pop() task, ok := t.input.pop()
@ -371,7 +361,10 @@ func (t *inputWorker) run() {
return return
default: default:
evStart := time.Now() evStart := time.Now()
task.err = task.t.processEvent(task.ctx, task.event) // TODO: Is 5 minutes too long?
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
task.err = task.t.processEvent(ctx, task.event)
cancel()
task.duration = time.Since(evStart) task.duration = time.Since(evStart)
if err := task.err; err != nil { if err := task.err; err != nil {
switch err.(type) { switch err.(type) {