Clean up a bit

This commit is contained in:
Neil Alexander 2021-01-07 11:19:10 +00:00
parent 6929b8a4ec
commit f1a6fb2ece
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 24 additions and 27 deletions

View file

@ -7,7 +7,7 @@ import (
) )
type DeviceListStreamProvider struct { type DeviceListStreamProvider struct {
StreamLogProvider PartitionedStreamProvider
} }
func (p *DeviceListStreamProvider) CompleteSync( func (p *DeviceListStreamProvider) CompleteSync(

View file

@ -9,13 +9,12 @@ import (
type Streams struct { type Streams struct {
PDUStreamProvider types.StreamProvider PDUStreamProvider types.StreamProvider
PDUTopologyProvider types.TopologyProvider
TypingStreamProvider types.StreamProvider TypingStreamProvider types.StreamProvider
ReceiptStreamProvider types.StreamProvider ReceiptStreamProvider types.StreamProvider
InviteStreamProvider types.StreamProvider InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.StreamLogProvider DeviceListStreamProvider types.PartitionedStreamProvider
} }
func NewSyncStreamProviders( func NewSyncStreamProviders(
@ -23,19 +22,29 @@ func NewSyncStreamProviders(
eduCache *cache.EDUCache, eduCache *cache.EDUCache,
) *Streams { ) *Streams {
streams := &Streams{ streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{StreamProvider{DB: d}}, PDUStreamProvider: &PDUStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
TypingStreamProvider: &TypingStreamProvider{ TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d}, StreamProvider: StreamProvider{DB: d},
EDUCache: eduCache, EDUCache: eduCache,
}, },
ReceiptStreamProvider: &ReceiptStreamProvider{StreamProvider{DB: d}}, ReceiptStreamProvider: &ReceiptStreamProvider{
InviteStreamProvider: &InviteStreamProvider{StreamProvider{DB: d}}, StreamProvider: StreamProvider{DB: d},
SendToDeviceStreamProvider: &SendToDeviceStreamProvider{StreamProvider{DB: d}}, },
InviteStreamProvider: &InviteStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
SendToDeviceStreamProvider: &SendToDeviceStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
AccountDataStreamProvider: &AccountDataStreamProvider{ AccountDataStreamProvider: &AccountDataStreamProvider{
StreamProvider: StreamProvider{DB: d}, StreamProvider: StreamProvider{DB: d},
userAPI: userAPI, userAPI: userAPI,
}, },
DeviceListStreamProvider: &DeviceListStreamProvider{StreamLogProvider{DB: d}}, DeviceListStreamProvider: &DeviceListStreamProvider{
PartitionedStreamProvider: PartitionedStreamProvider{DB: d},
},
} }
streams.PDUStreamProvider.Setup() streams.PDUStreamProvider.Setup()

View file

@ -8,19 +8,19 @@ import (
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
) )
type StreamLogProvider struct { type PartitionedStreamProvider struct {
DB storage.Database DB storage.Database
latest types.LogPosition latest types.LogPosition
latestMutex sync.RWMutex latestMutex sync.RWMutex
update *sync.Cond update *sync.Cond
} }
func (p *StreamLogProvider) Setup() { func (p *PartitionedStreamProvider) Setup() {
locker := &sync.Mutex{} locker := &sync.Mutex{}
p.update = sync.NewCond(locker) p.update = sync.NewCond(locker)
} }
func (p *StreamLogProvider) Advance( func (p *PartitionedStreamProvider) Advance(
latest types.LogPosition, latest types.LogPosition,
) { ) {
p.latestMutex.Lock() p.latestMutex.Lock()
@ -32,7 +32,7 @@ func (p *StreamLogProvider) Advance(
} }
} }
func (p *StreamLogProvider) LatestPosition( func (p *PartitionedStreamProvider) LatestPosition(
ctx context.Context, ctx context.Context,
) types.LogPosition { ) types.LogPosition {
p.latestMutex.RLock() p.latestMutex.RLock()
@ -41,7 +41,7 @@ func (p *StreamLogProvider) LatestPosition(
return p.latest return p.latest
} }
func (p *StreamLogProvider) NotifyAfter( func (p *PartitionedStreamProvider) NotifyAfter(
ctx context.Context, ctx context.Context,
from types.LogPosition, from types.LogPosition,
) chan struct{} { ) chan struct{} {
@ -67,7 +67,7 @@ func (p *StreamLogProvider) NotifyAfter(
// sync.Cond will fire every time the latest position // sync.Cond will fire every time the latest position
// updates, so we can check and see if we've advanced // updates, so we can check and see if we've advanced
// past it. // past it.
go func(p *StreamLogProvider) { go func(p *PartitionedStreamProvider) {
p.update.L.Lock() p.update.L.Lock()
defer p.update.L.Unlock() defer p.update.L.Unlock()

View file

@ -48,7 +48,7 @@ type StreamProvider interface {
LatestPosition(ctx context.Context) StreamPosition LatestPosition(ctx context.Context) StreamPosition
} }
type StreamLogProvider interface { type PartitionedStreamProvider interface {
Setup() Setup()
Advance(latest LogPosition) Advance(latest LogPosition)
CompleteSync(ctx context.Context, req *SyncRequest) LogPosition CompleteSync(ctx context.Context, req *SyncRequest) LogPosition
@ -56,15 +56,3 @@ type StreamLogProvider interface {
NotifyAfter(ctx context.Context, from LogPosition) chan struct{} NotifyAfter(ctx context.Context, from LogPosition) chan struct{}
LatestPosition(ctx context.Context) LogPosition LatestPosition(ctx context.Context) LogPosition
} }
type TopologyProvider interface {
// Range will update the response to include all updates between
// the from and to sync positions for the given room. It will always
// return immediately, making no changes if the range contains no
// updates.
TopologyRange(ctx context.Context, res *Response, roomID string, from, to TopologyToken, filter gomatrixserverlib.EventFilter)
// LatestPosition returns the latest stream position for this stream
// for the given room.
TopologyLatestPosition(ctx context.Context, roomID string) TopologyToken
}