From f1a6fb2ece5dc8353bdd5f50de912040ff03fce2 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 7 Jan 2021 11:19:10 +0000 Subject: [PATCH] Clean up a bit --- ...log_devicelist.go => stream_devicelist.go} | 2 +- syncapi/streams/streams.go | 23 +++++++++++++------ ...plate_streamlog.go => template_pstream.go} | 12 +++++----- syncapi/types/provider.go | 14 +---------- 4 files changed, 24 insertions(+), 27 deletions(-) rename syncapi/streams/{streamlog_devicelist.go => stream_devicelist.go} (94%) rename syncapi/streams/{template_streamlog.go => template_pstream.go} (85%) diff --git a/syncapi/streams/streamlog_devicelist.go b/syncapi/streams/stream_devicelist.go similarity index 94% rename from syncapi/streams/streamlog_devicelist.go rename to syncapi/streams/stream_devicelist.go index 2f67e8da5..97e5da3cb 100644 --- a/syncapi/streams/streamlog_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -7,7 +7,7 @@ import ( ) type DeviceListStreamProvider struct { - StreamLogProvider + PartitionedStreamProvider } func (p *DeviceListStreamProvider) CompleteSync( diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index 966d0f2ae..43d041d91 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -9,13 +9,12 @@ import ( type Streams struct { PDUStreamProvider types.StreamProvider - PDUTopologyProvider types.TopologyProvider TypingStreamProvider types.StreamProvider ReceiptStreamProvider types.StreamProvider InviteStreamProvider types.StreamProvider SendToDeviceStreamProvider types.StreamProvider AccountDataStreamProvider types.StreamProvider - DeviceListStreamProvider types.StreamLogProvider + DeviceListStreamProvider types.PartitionedStreamProvider } func NewSyncStreamProviders( @@ -23,19 +22,29 @@ func NewSyncStreamProviders( eduCache *cache.EDUCache, ) *Streams { streams := &Streams{ - PDUStreamProvider: &PDUStreamProvider{StreamProvider{DB: d}}, + PDUStreamProvider: &PDUStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + }, TypingStreamProvider: &TypingStreamProvider{ StreamProvider: StreamProvider{DB: d}, EDUCache: eduCache, }, - ReceiptStreamProvider: &ReceiptStreamProvider{StreamProvider{DB: d}}, - InviteStreamProvider: &InviteStreamProvider{StreamProvider{DB: d}}, - SendToDeviceStreamProvider: &SendToDeviceStreamProvider{StreamProvider{DB: d}}, + ReceiptStreamProvider: &ReceiptStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + }, + InviteStreamProvider: &InviteStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + }, + SendToDeviceStreamProvider: &SendToDeviceStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + }, AccountDataStreamProvider: &AccountDataStreamProvider{ StreamProvider: StreamProvider{DB: d}, userAPI: userAPI, }, - DeviceListStreamProvider: &DeviceListStreamProvider{StreamLogProvider{DB: d}}, + DeviceListStreamProvider: &DeviceListStreamProvider{ + PartitionedStreamProvider: PartitionedStreamProvider{DB: d}, + }, } streams.PDUStreamProvider.Setup() diff --git a/syncapi/streams/template_streamlog.go b/syncapi/streams/template_pstream.go similarity index 85% rename from syncapi/streams/template_streamlog.go rename to syncapi/streams/template_pstream.go index 39566f8b1..230f07997 100644 --- a/syncapi/streams/template_streamlog.go +++ b/syncapi/streams/template_pstream.go @@ -8,19 +8,19 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" ) -type StreamLogProvider struct { +type PartitionedStreamProvider struct { DB storage.Database latest types.LogPosition latestMutex sync.RWMutex update *sync.Cond } -func (p *StreamLogProvider) Setup() { +func (p *PartitionedStreamProvider) Setup() { locker := &sync.Mutex{} p.update = sync.NewCond(locker) } -func (p *StreamLogProvider) Advance( +func (p *PartitionedStreamProvider) Advance( latest types.LogPosition, ) { p.latestMutex.Lock() @@ -32,7 +32,7 @@ func (p *StreamLogProvider) Advance( } } -func (p *StreamLogProvider) LatestPosition( +func (p *PartitionedStreamProvider) LatestPosition( ctx context.Context, ) types.LogPosition { p.latestMutex.RLock() @@ -41,7 +41,7 @@ func (p *StreamLogProvider) LatestPosition( return p.latest } -func (p *StreamLogProvider) NotifyAfter( +func (p *PartitionedStreamProvider) NotifyAfter( ctx context.Context, from types.LogPosition, ) chan struct{} { @@ -67,7 +67,7 @@ func (p *StreamLogProvider) NotifyAfter( // sync.Cond will fire every time the latest position // updates, so we can check and see if we've advanced // past it. - go func(p *StreamLogProvider) { + go func(p *PartitionedStreamProvider) { p.update.L.Lock() defer p.update.L.Unlock() diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 018dc16eb..3744c895e 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -48,7 +48,7 @@ type StreamProvider interface { LatestPosition(ctx context.Context) StreamPosition } -type StreamLogProvider interface { +type PartitionedStreamProvider interface { Setup() Advance(latest LogPosition) CompleteSync(ctx context.Context, req *SyncRequest) LogPosition @@ -56,15 +56,3 @@ type StreamLogProvider interface { NotifyAfter(ctx context.Context, from LogPosition) chan struct{} LatestPosition(ctx context.Context) LogPosition } - -type TopologyProvider interface { - // Range will update the response to include all updates between - // the from and to sync positions for the given room. It will always - // return immediately, making no changes if the range contains no - // updates. - TopologyRange(ctx context.Context, res *Response, roomID string, from, to TopologyToken, filter gomatrixserverlib.EventFilter) - - // LatestPosition returns the latest stream position for this stream - // for the given room. - TopologyLatestPosition(ctx context.Context, roomID string) TopologyToken -}