mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-23 23:01:56 -06:00
b5a8935042
* It's half-alive * Wakeups largely working * Other tweaks, typing works * Fix bugs, add receipt stream * Delete notifier, other tweaks * Dedupe a bit, add a template for the invite stream * Clean up, add templates for other streams * Don't leak channels * Bring forward some more PDU logic, clean up other places * Add some more wakeups * Use addRoomDeltaToResponse * Log tweaks, typing fixed? * Fix timed out syncs * Don't reset next batch position on timeout * Add account data stream/position * End of day * Fix complete sync for receipt, typing * Streams package * Clean up a bit * Complete sync send-to-device * Don't drop errors * More lightweight notifications * Fix typing positions * Don't advance position on remove again unless needed * Device list updates * Advance account data position * Use limit for incremental sync * Limit fixes, amongst other things * Remove some fmt.Println * Tweaks * Re-add notifier * Fix invite position * Fixes * Notify account data without advancing PDU position in notifier * Apply account data position * Get initial position for account data * Fix position update * Fix complete sync positions * Review comments @Kegsay * Room consumer parameters
79 lines
2.6 KiB
Go
79 lines
2.6 KiB
Go
package streams
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/matrix-org/dendrite/eduserver/cache"
|
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
|
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
)
|
|
|
|
type Streams struct {
|
|
PDUStreamProvider types.StreamProvider
|
|
TypingStreamProvider types.StreamProvider
|
|
ReceiptStreamProvider types.StreamProvider
|
|
InviteStreamProvider types.StreamProvider
|
|
SendToDeviceStreamProvider types.StreamProvider
|
|
AccountDataStreamProvider types.StreamProvider
|
|
DeviceListStreamProvider types.PartitionedStreamProvider
|
|
}
|
|
|
|
func NewSyncStreamProviders(
|
|
d storage.Database, userAPI userapi.UserInternalAPI,
|
|
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
|
|
eduCache *cache.EDUCache,
|
|
) *Streams {
|
|
streams := &Streams{
|
|
PDUStreamProvider: &PDUStreamProvider{
|
|
StreamProvider: StreamProvider{DB: d},
|
|
},
|
|
TypingStreamProvider: &TypingStreamProvider{
|
|
StreamProvider: StreamProvider{DB: d},
|
|
EDUCache: eduCache,
|
|
},
|
|
ReceiptStreamProvider: &ReceiptStreamProvider{
|
|
StreamProvider: StreamProvider{DB: d},
|
|
},
|
|
InviteStreamProvider: &InviteStreamProvider{
|
|
StreamProvider: StreamProvider{DB: d},
|
|
},
|
|
SendToDeviceStreamProvider: &SendToDeviceStreamProvider{
|
|
StreamProvider: StreamProvider{DB: d},
|
|
},
|
|
AccountDataStreamProvider: &AccountDataStreamProvider{
|
|
StreamProvider: StreamProvider{DB: d},
|
|
userAPI: userAPI,
|
|
},
|
|
DeviceListStreamProvider: &DeviceListStreamProvider{
|
|
PartitionedStreamProvider: PartitionedStreamProvider{DB: d},
|
|
rsAPI: rsAPI,
|
|
keyAPI: keyAPI,
|
|
},
|
|
}
|
|
|
|
streams.PDUStreamProvider.Setup()
|
|
streams.TypingStreamProvider.Setup()
|
|
streams.ReceiptStreamProvider.Setup()
|
|
streams.InviteStreamProvider.Setup()
|
|
streams.SendToDeviceStreamProvider.Setup()
|
|
streams.AccountDataStreamProvider.Setup()
|
|
streams.DeviceListStreamProvider.Setup()
|
|
|
|
return streams
|
|
}
|
|
|
|
func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
|
|
return types.StreamingToken{
|
|
PDUPosition: s.PDUStreamProvider.LatestPosition(ctx),
|
|
TypingPosition: s.TypingStreamProvider.LatestPosition(ctx),
|
|
ReceiptPosition: s.PDUStreamProvider.LatestPosition(ctx),
|
|
InvitePosition: s.InviteStreamProvider.LatestPosition(ctx),
|
|
SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx),
|
|
AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
|
|
DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
|
|
}
|
|
}
|