Ensure worker has work before starting goroutine

This commit is contained in:
Neil Alexander 2021-07-05 09:40:32 +01:00
parent 7f02eab47d
commit da95342c24
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -313,9 +313,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
input: newSendFIFOQueue(), input: newSendFIFOQueue(),
}) })
worker := v.(*inputWorker) worker := v.(*inputWorker)
if !worker.running.Load() {
go worker.run()
}
wg.Add(1) wg.Add(1)
task := &inputTask{ task := &inputTask{
ctx: ctx, ctx: ctx,
@ -325,6 +322,9 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} }
tasks = append(tasks, task) tasks = append(tasks, task)
worker.input.push(task) worker.input.push(task)
if !worker.running.CAS(false, true) {
go worker.run()
}
} }
go func() { go func() {
@ -351,9 +351,6 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} }
func (t *inputWorker) run() { func (t *inputWorker) run() {
if !t.running.CAS(false, true) {
return
}
defer t.running.Store(false) defer t.running.Store(false)
for { for {
task, ok := t.input.pop() task, ok := t.input.pop()