From 59ec573d7c1b4e4f1e287b49bee87bdb5fc733d6 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 2 Jul 2021 11:59:20 +0100 Subject: [PATCH] Move into one file --- federationapi/routing/send.go | 49 ++++++++++++++++++++++++++-- federationapi/routing/send_fifo.go | 51 ------------------------------ 2 files changed, 47 insertions(+), 53 deletions(-) delete mode 100644 federationapi/routing/send_fifo.go diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 8262c6a88..ae9a63fc2 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -89,6 +89,51 @@ func init() { ) } +type sendFIFOQueue struct { + tasks []*inputTask + count int + mutex sync.Mutex + notifs chan struct{} +} + +func newSendFIFOQueue() *sendFIFOQueue { + q := &sendFIFOQueue{ + notifs: make(chan struct{}, 1), + } + return q +} + +func (q *sendFIFOQueue) push(frame *inputTask) { + q.mutex.Lock() + defer q.mutex.Unlock() + q.tasks = append(q.tasks, frame) + q.count++ + select { + case q.notifs <- struct{}{}: + default: + } +} + +// pop returns the first item of the queue, if there is one. +// The second return value will indicate if a task was returned. +func (q *sendFIFOQueue) pop() (*inputTask, bool) { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.count == 0 { + return nil, false + } + frame := q.tasks[0] + q.tasks[0] = nil + q.tasks = q.tasks[1:] + q.count-- + if q.count == 0 { + // Force a GC of the underlying array, since it might have + // grown significantly if the queue was hammered for some reason + q.tasks = nil + } + return frame, true +} + type inputTask struct { ctx context.Context t *txnReq @@ -100,7 +145,7 @@ type inputTask struct { type inputWorker struct { running atomic.Bool - input *fifoQueue + input *sendFIFOQueue } var inputWorkers sync.Map // room ID -> *inputWorker @@ -265,7 +310,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res continue } v, _ := inputWorkers.LoadOrStore(event.RoomID(), &inputWorker{ - input: newFIFOQueue(), + input: newSendFIFOQueue(), }) worker := v.(*inputWorker) if !worker.running.Load() { diff --git a/federationapi/routing/send_fifo.go b/federationapi/routing/send_fifo.go deleted file mode 100644 index d19afa13b..000000000 --- a/federationapi/routing/send_fifo.go +++ /dev/null @@ -1,51 +0,0 @@ -package routing - -import ( - "sync" -) - -type fifoQueue struct { - tasks []*inputTask - count int - mutex sync.Mutex - notifs chan struct{} -} - -func newFIFOQueue() *fifoQueue { - q := &fifoQueue{ - notifs: make(chan struct{}, 1), - } - return q -} - -func (q *fifoQueue) push(frame *inputTask) { - q.mutex.Lock() - defer q.mutex.Unlock() - q.tasks = append(q.tasks, frame) - q.count++ - select { - case q.notifs <- struct{}{}: - default: - } -} - -// pop returns the first item of the queue, if there is one. -// The second return value will indicate if a task was returned. -// You must check this value, even after calling wait(). -func (q *fifoQueue) pop() (*inputTask, bool) { - q.mutex.Lock() - defer q.mutex.Unlock() - if q.count == 0 { - return nil, false - } - frame := q.tasks[0] - q.tasks[0] = nil - q.tasks = q.tasks[1:] - q.count-- - if q.count == 0 { - // Force a GC of the underlying array, since it might have - // grown significantly if the queue was hammered for some reason - q.tasks = nil - } - return frame, true -}