Complete sync performance (#1741)
* Parallelise PDU stream fetching for complete sync
* Fixes
* Fixes
* Worker queue
* Workers
* Don't populate device list changes on complete sync
* Don't fast-forward typing notifications either on complete sync
* Revert "Don't fast-forward typing notifications either on complete sync"
This reverts commit 01471f7843
.
* Comments
This commit is contained in:
parent
6d1c6f29e0
commit
62a325ded8
|
@ -19,7 +19,7 @@ func (p *DeviceListStreamProvider) CompleteSync(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req *types.SyncRequest,
|
req *types.SyncRequest,
|
||||||
) types.LogPosition {
|
) types.LogPosition {
|
||||||
return p.IncrementalSync(ctx, req, types.LogPosition{}, p.LatestPosition(ctx))
|
return p.LatestPosition(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *DeviceListStreamProvider) IncrementalSync(
|
func (p *DeviceListStreamProvider) IncrementalSync(
|
||||||
|
|
|
@ -2,18 +2,54 @@ package streams
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// The max number of per-room goroutines to have running.
|
||||||
|
// Too high and this will consume lots of CPU, too low and complete
|
||||||
|
// sync responses will take longer to process.
|
||||||
|
const PDU_STREAM_WORKERS = 256
|
||||||
|
|
||||||
|
// The maximum number of tasks that can be queued in total before
|
||||||
|
// backpressure will build up and the rests will start to block.
|
||||||
|
const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8
|
||||||
|
|
||||||
type PDUStreamProvider struct {
|
type PDUStreamProvider struct {
|
||||||
StreamProvider
|
StreamProvider
|
||||||
|
|
||||||
|
tasks chan func()
|
||||||
|
workers atomic.Int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PDUStreamProvider) worker() {
|
||||||
|
defer p.workers.Dec()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case f := <-p.tasks:
|
||||||
|
f()
|
||||||
|
case <-time.After(time.Second * 10):
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PDUStreamProvider) queue(f func()) {
|
||||||
|
if p.workers.Load() < PDU_STREAM_WORKERS {
|
||||||
|
p.workers.Inc()
|
||||||
|
go p.worker()
|
||||||
|
}
|
||||||
|
p.tasks <- f
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PDUStreamProvider) Setup() {
|
func (p *PDUStreamProvider) Setup() {
|
||||||
p.StreamProvider.Setup()
|
p.StreamProvider.Setup()
|
||||||
|
p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE)
|
||||||
|
|
||||||
p.latestMutex.Lock()
|
p.latestMutex.Lock()
|
||||||
defer p.latestMutex.Unlock()
|
defer p.latestMutex.Unlock()
|
||||||
|
@ -52,19 +88,32 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
eventFilter := req.Filter.Room.Timeline
|
eventFilter := req.Filter.Room.Timeline
|
||||||
|
|
||||||
// Build up a /sync response. Add joined rooms.
|
// Build up a /sync response. Add joined rooms.
|
||||||
for _, roomID := range joinedRoomIDs {
|
var reqMutex sync.Mutex
|
||||||
var jr *types.JoinResponse
|
var reqWaitGroup sync.WaitGroup
|
||||||
jr, err = p.getJoinResponseForCompleteSync(
|
reqWaitGroup.Add(len(joinedRoomIDs))
|
||||||
ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
|
for _, room := range joinedRoomIDs {
|
||||||
)
|
roomID := room
|
||||||
if err != nil {
|
p.queue(func() {
|
||||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
defer reqWaitGroup.Done()
|
||||||
return from
|
|
||||||
}
|
var jr *types.JoinResponse
|
||||||
req.Response.Rooms.Join[roomID] = *jr
|
jr, err = p.getJoinResponseForCompleteSync(
|
||||||
req.Rooms[roomID] = gomatrixserverlib.Join
|
ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
reqMutex.Lock()
|
||||||
|
defer reqMutex.Unlock()
|
||||||
|
req.Response.Rooms.Join[roomID] = *jr
|
||||||
|
req.Rooms[roomID] = gomatrixserverlib.Join
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reqWaitGroup.Wait()
|
||||||
|
|
||||||
// Add peeked rooms.
|
// Add peeked rooms.
|
||||||
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
|
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue