Worker queue

This commit is contained in:
Neil Alexander 2021-01-29 14:23:03 +00:00
parent 2410ed58a4
commit 972c848730
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -3,19 +3,49 @@ package streams
import ( import (
"context" "context"
"sync" "sync"
"time"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"go.uber.org/atomic"
) )
const COMPLETE_SYNC_QUEUE = 2048
const COMPLETE_SYNC_WORKERS = 256
type PDUStreamProvider struct { type PDUStreamProvider struct {
StreamProvider StreamProvider
tasks chan func()
workers atomic.Int32
}
func (p *PDUStreamProvider) worker() {
defer p.workers.Dec()
for {
select {
case f := <-p.tasks:
f()
case <-time.After(time.Second * 10):
return
}
}
}
func (p *PDUStreamProvider) queue(f func()) {
p.tasks <- f
if p.workers.Load() < COMPLETE_SYNC_WORKERS {
p.workers.Inc()
go p.worker()
}
} }
func (p *PDUStreamProvider) Setup() { func (p *PDUStreamProvider) Setup() {
p.StreamProvider.Setup() p.StreamProvider.Setup()
p.tasks = make(chan func(), COMPLETE_SYNC_QUEUE)
p.latestMutex.Lock() p.latestMutex.Lock()
defer p.latestMutex.Unlock() defer p.latestMutex.Unlock()
@ -56,24 +86,29 @@ func (p *PDUStreamProvider) CompleteSync(
var reqMutex sync.Mutex var reqMutex sync.Mutex
var reqWaitGroup sync.WaitGroup var reqWaitGroup sync.WaitGroup
reqWaitGroup.Add(len(joinedRoomIDs)) reqWaitGroup.Add(len(joinedRoomIDs))
for _, roomID := range joinedRoomIDs { for _, room := range joinedRoomIDs {
go func(roomID string) { roomID := room
p.queue(func() {
defer reqWaitGroup.Done() defer reqWaitGroup.Done()
var jr *types.JoinResponse var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync( jr, err = p.getJoinResponseForCompleteSync(
ctx, roomID, r, &stateFilter, &eventFilter, req.Device, ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
) )
if err != nil { if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
return //from return
} }
reqMutex.Lock() reqMutex.Lock()
defer reqMutex.Unlock() defer reqMutex.Unlock()
req.Response.Rooms.Join[roomID] = *jr req.Response.Rooms.Join[roomID] = *jr
req.Rooms[roomID] = gomatrixserverlib.Join req.Rooms[roomID] = gomatrixserverlib.Join
}(roomID) })
} }
reqWaitGroup.Wait()
// Add peeked rooms. // Add peeked rooms.
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
if err != nil { if err != nil {