From a6549669cafe054411c0697faf48910a494121b0 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 7 Jan 2021 13:48:32 +0000 Subject: [PATCH] More lightweight notifications --- syncapi/streams/template_pstream.go | 70 +++++++++++++++-------------- syncapi/streams/template_stream.go | 70 +++++++++++++++-------------- syncapi/sync/requestpool.go | 14 +++--- syncapi/types/provider.go | 4 +- 4 files changed, 83 insertions(+), 75 deletions(-) diff --git a/syncapi/streams/template_pstream.go b/syncapi/streams/template_pstream.go index 230f07997..56f374a1d 100644 --- a/syncapi/streams/template_pstream.go +++ b/syncapi/streams/template_pstream.go @@ -6,18 +6,25 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" ) type PartitionedStreamProvider struct { - DB storage.Database - latest types.LogPosition - latestMutex sync.RWMutex - update *sync.Cond + DB storage.Database + latest types.LogPosition + latestMutex sync.RWMutex + subscriptions map[string]*partitionedStreamSubscription // userid+deviceid + subscriptionsMutex sync.Mutex +} + +type partitionedStreamSubscription struct { + ctx context.Context + from types.LogPosition + ch chan struct{} } func (p *PartitionedStreamProvider) Setup() { - locker := &sync.Mutex{} - p.update = sync.NewCond(locker) + p.subscriptions = make(map[string]*partitionedStreamSubscription) } func (p *PartitionedStreamProvider) Advance( @@ -28,7 +35,22 @@ func (p *PartitionedStreamProvider) Advance( if latest.IsAfter(&p.latest) { p.latest = latest - p.update.Broadcast() + + p.subscriptionsMutex.Lock() + for id, s := range p.subscriptions { + select { + case <-s.ctx.Done(): + close(s.ch) + delete(p.subscriptions, id) + continue + default: + if latest.IsAfter(&s.from) { + close(s.ch) + delete(p.subscriptions, id) + } + } + } + p.subscriptionsMutex.Unlock() } } @@ -43,6 +65,7 @@ func (p *PartitionedStreamProvider) LatestPosition( func (p *PartitionedStreamProvider) NotifyAfter( ctx context.Context, + device *userapi.Device, from types.LogPosition, ) chan struct{} { ch := make(chan struct{}) @@ -63,32 +86,13 @@ func (p *PartitionedStreamProvider) NotifyAfter( return ch } - // If we haven't, then we'll subscribe to updates. The - // sync.Cond will fire every time the latest position - // updates, so we can check and see if we've advanced - // past it. - go func(p *PartitionedStreamProvider) { - p.update.L.Lock() - defer p.update.L.Unlock() - - for { - select { - case <-ctx.Done(): - // The context has expired, so there's no point - // in continuing to wait for the update. - close(ch) - return - default: - // The latest position has been advanced. Let's - // see if it's advanced to the position we care - // about. If it has then we'll return. - p.update.Wait() - if check() { - return - } - } - } - }(p) + id := device.UserID + device.ID + p.subscriptionsMutex.Lock() + if s, ok := p.subscriptions[id]; ok { + close(s.ch) + } + p.subscriptions[id] = &partitionedStreamSubscription{ctx, from, ch} + p.subscriptionsMutex.Unlock() return ch } diff --git a/syncapi/streams/template_stream.go b/syncapi/streams/template_stream.go index e85529f62..9ce3ef852 100644 --- a/syncapi/streams/template_stream.go +++ b/syncapi/streams/template_stream.go @@ -6,18 +6,25 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" ) type StreamProvider struct { - DB storage.Database - latest types.StreamPosition - latestMutex sync.RWMutex - update *sync.Cond + DB storage.Database + latest types.StreamPosition + latestMutex sync.RWMutex + subscriptions map[string]*streamSubscription // userid+deviceid + subscriptionsMutex sync.Mutex +} + +type streamSubscription struct { + ctx context.Context + from types.StreamPosition + ch chan struct{} } func (p *StreamProvider) Setup() { - locker := &sync.Mutex{} - p.update = sync.NewCond(locker) + p.subscriptions = make(map[string]*streamSubscription) } func (p *StreamProvider) Advance( @@ -28,7 +35,22 @@ func (p *StreamProvider) Advance( if latest > p.latest { p.latest = latest - p.update.Broadcast() + + p.subscriptionsMutex.Lock() + for id, s := range p.subscriptions { + select { + case <-s.ctx.Done(): + close(s.ch) + delete(p.subscriptions, id) + continue + default: + if latest > s.from { + close(s.ch) + delete(p.subscriptions, id) + } + } + } + p.subscriptionsMutex.Unlock() } } @@ -43,6 +65,7 @@ func (p *StreamProvider) LatestPosition( func (p *StreamProvider) NotifyAfter( ctx context.Context, + device *userapi.Device, from types.StreamPosition, ) chan struct{} { ch := make(chan struct{}) @@ -63,32 +86,13 @@ func (p *StreamProvider) NotifyAfter( return ch } - // If we haven't, then we'll subscribe to updates. The - // sync.Cond will fire every time the latest position - // updates, so we can check and see if we've advanced - // past it. - go func(p *StreamProvider) { - p.update.L.Lock() - defer p.update.L.Unlock() - - for { - select { - case <-ctx.Done(): - // The context has expired, so there's no point - // in continuing to wait for the update. - close(ch) - return - default: - // The latest position has been advanced. Let's - // see if it's advanced to the position we care - // about. If it has then we'll return. - p.update.Wait() - if check() { - return - } - } - } - }(p) + id := device.UserID + device.ID + p.subscriptionsMutex.Lock() + if s, ok := p.subscriptions[id]; ok { + close(s.ch) + } + p.subscriptions[id] = &streamSubscription{ctx, from, ch} + p.subscriptionsMutex.Unlock() return ch } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index ff84d688a..7096347b5 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -176,13 +176,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. case <-timer.C: // Timeout reached return giveup() - case <-rp.streams.PDUStreamProvider.NotifyAfter(waitctx, syncReq.Since.PDUPosition): - case <-rp.streams.TypingStreamProvider.NotifyAfter(waitctx, syncReq.Since.TypingPosition): - case <-rp.streams.ReceiptStreamProvider.NotifyAfter(waitctx, syncReq.Since.ReceiptPosition): - case <-rp.streams.InviteStreamProvider.NotifyAfter(waitctx, syncReq.Since.InvitePosition): - case <-rp.streams.SendToDeviceStreamProvider.NotifyAfter(waitctx, syncReq.Since.SendToDevicePosition): - case <-rp.streams.AccountDataStreamProvider.NotifyAfter(waitctx, syncReq.Since.AccountDataPosition): - case <-rp.streams.DeviceListStreamProvider.NotifyAfter(waitctx, syncReq.Since.DeviceListPosition): + case <-rp.streams.PDUStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.PDUPosition): + case <-rp.streams.TypingStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.TypingPosition): + case <-rp.streams.ReceiptStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.ReceiptPosition): + case <-rp.streams.InviteStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.InvitePosition): + case <-rp.streams.SendToDeviceStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.SendToDevicePosition): + case <-rp.streams.AccountDataStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.AccountDataPosition): + case <-rp.streams.DeviceListStreamProvider.NotifyAfter(waitctx, device, syncReq.Since.DeviceListPosition): } syncReq.Log.Println("Responding to sync after wakeup") diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 3744c895e..eb6087905 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -42,7 +42,7 @@ type StreamProvider interface { // NotifyAfter returns a channel which will be closed once the // stream advances past the "from" position. - NotifyAfter(ctx context.Context, from StreamPosition) chan struct{} + NotifyAfter(ctx context.Context, device *userapi.Device, from StreamPosition) chan struct{} // LatestPosition returns the latest stream position for this stream. LatestPosition(ctx context.Context) StreamPosition @@ -53,6 +53,6 @@ type PartitionedStreamProvider interface { Advance(latest LogPosition) CompleteSync(ctx context.Context, req *SyncRequest) LogPosition IncrementalSync(ctx context.Context, req *SyncRequest, from, to LogPosition) LogPosition - NotifyAfter(ctx context.Context, from LogPosition) chan struct{} + NotifyAfter(ctx context.Context, device *userapi.Device, from LogPosition) chan struct{} LatestPosition(ctx context.Context) LogPosition }