From 232d69512cce9c9709ab346aee1877217ad3b6c0 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 28 Sep 2022 13:51:29 +0100 Subject: [PATCH] Remove concurrency --- syncapi/streams/stream_pdu.go | 58 ++++++----------------------------- 1 file changed, 10 insertions(+), 48 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 485bab00a..e19276631 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -5,7 +5,6 @@ import ( "database/sql" "fmt" "sort" - "sync" "time" "github.com/matrix-org/dendrite/internal/caching" @@ -18,7 +17,6 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" "github.com/tidwall/gjson" - "go.uber.org/atomic" "github.com/matrix-org/dendrite/syncapi/notifier" ) @@ -35,39 +33,16 @@ const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8 type PDUStreamProvider struct { DefaultStreamProvider - tasks chan func() - workers atomic.Int32 // userID+deviceID -> lazy loading cache lazyLoadCache caching.LazyLoadCache rsAPI roomserverAPI.SyncRoomserverAPI notifier *notifier.Notifier } -func (p *PDUStreamProvider) worker() { - defer p.workers.Dec() - for { - select { - case f := <-p.tasks: - f() - case <-time.After(time.Second * 10): - return - } - } -} - -func (p *PDUStreamProvider) queue(f func()) { - if p.workers.Load() < PDU_STREAM_WORKERS { - p.workers.Inc() - go p.worker() - } - p.tasks <- f -} - func (p *PDUStreamProvider) Setup( ctx context.Context, snapshot storage.DatabaseSnapshot, ) { p.DefaultStreamProvider.Setup(ctx, snapshot) - p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE) p.latestMutex.Lock() defer p.latestMutex.Unlock() @@ -120,31 +95,18 @@ func (p *PDUStreamProvider) CompleteSync( } // Build up a /sync response. Add joined rooms. - var reqMutex sync.Mutex - var reqWaitGroup sync.WaitGroup - reqWaitGroup.Add(len(joinedRoomIDs)) - for _, room := range joinedRoomIDs { - roomID := room - p.queue(func() { - defer reqWaitGroup.Done() - - jr, jerr := p.getJoinResponseForCompleteSync( - ctx, snapshot, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false, - ) - if jerr != nil { - req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed") - return - } - - reqMutex.Lock() - defer reqMutex.Unlock() - req.Response.Rooms.Join[roomID] = *jr - req.Rooms[roomID] = gomatrixserverlib.Join - }) + for _, roomID := range joinedRoomIDs { + jr, jerr := p.getJoinResponseForCompleteSync( + ctx, snapshot, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false, + ) + if jerr != nil { + req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed") + return from + } + req.Response.Rooms.Join[roomID] = *jr + req.Rooms[roomID] = gomatrixserverlib.Join } - reqWaitGroup.Wait() - // Add peeked rooms. peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) if err != nil {