diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 0d2378f14..220629cfb 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -11,8 +11,8 @@ import ( "go.uber.org/atomic" ) -const COMPLETE_SYNC_QUEUE = 2048 -const COMPLETE_SYNC_WORKERS = 256 +const PDU_STREAM_QUEUESIZE = 2048 +const PDU_STREAM_WORKERS = 256 type PDUStreamProvider struct { StreamProvider @@ -34,17 +34,16 @@ func (p *PDUStreamProvider) worker() { } func (p *PDUStreamProvider) queue(f func()) { - p.tasks <- f - if p.workers.Load() < COMPLETE_SYNC_WORKERS { + if p.workers.Load() < PDU_STREAM_WORKERS { p.workers.Inc() go p.worker() } + p.tasks <- f } func (p *PDUStreamProvider) Setup() { p.StreamProvider.Setup() - - p.tasks = make(chan func(), COMPLETE_SYNC_QUEUE) + p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE) p.latestMutex.Lock() defer p.latestMutex.Unlock()