Remove concurrency

This commit is contained in:
Neil Alexander 2022-09-28 13:51:29 +01:00
parent 09a3c807f9
commit 232d69512c
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -5,7 +5,6 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"sort" "sort"
"sync"
"time" "time"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
@ -18,7 +17,6 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"go.uber.org/atomic"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
) )
@ -35,39 +33,16 @@ const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8
type PDUStreamProvider struct { type PDUStreamProvider struct {
DefaultStreamProvider DefaultStreamProvider
tasks chan func()
workers atomic.Int32
// userID+deviceID -> lazy loading cache // userID+deviceID -> lazy loading cache
lazyLoadCache caching.LazyLoadCache lazyLoadCache caching.LazyLoadCache
rsAPI roomserverAPI.SyncRoomserverAPI rsAPI roomserverAPI.SyncRoomserverAPI
notifier *notifier.Notifier notifier *notifier.Notifier
} }
func (p *PDUStreamProvider) worker() {
defer p.workers.Dec()
for {
select {
case f := <-p.tasks:
f()
case <-time.After(time.Second * 10):
return
}
}
}
func (p *PDUStreamProvider) queue(f func()) {
if p.workers.Load() < PDU_STREAM_WORKERS {
p.workers.Inc()
go p.worker()
}
p.tasks <- f
}
func (p *PDUStreamProvider) Setup( func (p *PDUStreamProvider) Setup(
ctx context.Context, snapshot storage.DatabaseSnapshot, ctx context.Context, snapshot storage.DatabaseSnapshot,
) { ) {
p.DefaultStreamProvider.Setup(ctx, snapshot) p.DefaultStreamProvider.Setup(ctx, snapshot)
p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE)
p.latestMutex.Lock() p.latestMutex.Lock()
defer p.latestMutex.Unlock() defer p.latestMutex.Unlock()
@ -120,31 +95,18 @@ func (p *PDUStreamProvider) CompleteSync(
} }
// Build up a /sync response. Add joined rooms. // Build up a /sync response. Add joined rooms.
var reqMutex sync.Mutex for _, roomID := range joinedRoomIDs {
var reqWaitGroup sync.WaitGroup jr, jerr := p.getJoinResponseForCompleteSync(
reqWaitGroup.Add(len(joinedRoomIDs)) ctx, snapshot, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false,
for _, room := range joinedRoomIDs { )
roomID := room if jerr != nil {
p.queue(func() { req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
defer reqWaitGroup.Done() return from
}
jr, jerr := p.getJoinResponseForCompleteSync( req.Response.Rooms.Join[roomID] = *jr
ctx, snapshot, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false, req.Rooms[roomID] = gomatrixserverlib.Join
)
if jerr != nil {
req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
return
}
reqMutex.Lock()
defer reqMutex.Unlock()
req.Response.Rooms.Join[roomID] = *jr
req.Rooms[roomID] = gomatrixserverlib.Join
})
} }
reqWaitGroup.Wait()
// Add peeked rooms. // Add peeked rooms.
peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
if err != nil { if err != nil {