diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 086e315c2..0d2378f14 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -3,19 +3,49 @@ package streams import ( "context" "sync" + "time" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" ) +const COMPLETE_SYNC_QUEUE = 2048 +const COMPLETE_SYNC_WORKERS = 256 + type PDUStreamProvider struct { StreamProvider + + tasks chan func() + workers atomic.Int32 +} + +func (p *PDUStreamProvider) worker() { + defer p.workers.Dec() + for { + select { + case f := <-p.tasks: + f() + case <-time.After(time.Second * 10): + return + } + } +} + +func (p *PDUStreamProvider) queue(f func()) { + p.tasks <- f + if p.workers.Load() < COMPLETE_SYNC_WORKERS { + p.workers.Inc() + go p.worker() + } } func (p *PDUStreamProvider) Setup() { p.StreamProvider.Setup() + p.tasks = make(chan func(), COMPLETE_SYNC_QUEUE) + p.latestMutex.Lock() defer p.latestMutex.Unlock() @@ -56,24 +86,29 @@ func (p *PDUStreamProvider) CompleteSync( var reqMutex sync.Mutex var reqWaitGroup sync.WaitGroup reqWaitGroup.Add(len(joinedRoomIDs)) - for _, roomID := range joinedRoomIDs { - go func(roomID string) { + for _, room := range joinedRoomIDs { + roomID := room + p.queue(func() { defer reqWaitGroup.Done() + var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( ctx, roomID, r, &stateFilter, &eventFilter, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") - return //from + return } + reqMutex.Lock() defer reqMutex.Unlock() req.Response.Rooms.Join[roomID] = *jr req.Rooms[roomID] = gomatrixserverlib.Join - }(roomID) + }) } + reqWaitGroup.Wait() + // Add peeked rooms. peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) if err != nil {