Move into one file

This commit is contained in:
Neil Alexander 2021-07-02 11:59:20 +01:00
parent eb42429b72
commit 59ec573d7c
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 47 additions and 53 deletions

View file

@ -89,6 +89,51 @@ func init() {
) )
} }
type sendFIFOQueue struct {
tasks []*inputTask
count int
mutex sync.Mutex
notifs chan struct{}
}
func newSendFIFOQueue() *sendFIFOQueue {
q := &sendFIFOQueue{
notifs: make(chan struct{}, 1),
}
return q
}
func (q *sendFIFOQueue) push(frame *inputTask) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.tasks = append(q.tasks, frame)
q.count++
select {
case q.notifs <- struct{}{}:
default:
}
}
// pop returns the first item of the queue, if there is one.
// The second return value will indicate if a task was returned.
func (q *sendFIFOQueue) pop() (*inputTask, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count == 0 {
return nil, false
}
frame := q.tasks[0]
q.tasks[0] = nil
q.tasks = q.tasks[1:]
q.count--
if q.count == 0 {
// Force a GC of the underlying array, since it might have
// grown significantly if the queue was hammered for some reason
q.tasks = nil
}
return frame, true
}
type inputTask struct { type inputTask struct {
ctx context.Context ctx context.Context
t *txnReq t *txnReq
@ -100,7 +145,7 @@ type inputTask struct {
type inputWorker struct { type inputWorker struct {
running atomic.Bool running atomic.Bool
input *fifoQueue input *sendFIFOQueue
} }
var inputWorkers sync.Map // room ID -> *inputWorker var inputWorkers sync.Map // room ID -> *inputWorker
@ -265,7 +310,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
continue continue
} }
v, _ := inputWorkers.LoadOrStore(event.RoomID(), &inputWorker{ v, _ := inputWorkers.LoadOrStore(event.RoomID(), &inputWorker{
input: newFIFOQueue(), input: newSendFIFOQueue(),
}) })
worker := v.(*inputWorker) worker := v.(*inputWorker)
if !worker.running.Load() { if !worker.running.Load() {

View file

@ -1,51 +0,0 @@
package routing
import (
"sync"
)
type fifoQueue struct {
tasks []*inputTask
count int
mutex sync.Mutex
notifs chan struct{}
}
func newFIFOQueue() *fifoQueue {
q := &fifoQueue{
notifs: make(chan struct{}, 1),
}
return q
}
func (q *fifoQueue) push(frame *inputTask) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.tasks = append(q.tasks, frame)
q.count++
select {
case q.notifs <- struct{}{}:
default:
}
}
// pop returns the first item of the queue, if there is one.
// The second return value will indicate if a task was returned.
// You must check this value, even after calling wait().
func (q *fifoQueue) pop() (*inputTask, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count == 0 {
return nil, false
}
frame := q.tasks[0]
q.tasks[0] = nil
q.tasks = q.tasks[1:]
q.count--
if q.count == 0 {
// Force a GC of the underlying array, since it might have
// grown significantly if the queue was hammered for some reason
q.tasks = nil
}
return frame, true
}