Add account data stream/position

This commit is contained in:
Neil Alexander 2021-01-06 17:17:57 +00:00
parent c31a1767f7
commit af6b07e1b4
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
6 changed files with 66 additions and 10 deletions

View file

@ -37,6 +37,7 @@ type Database interface {
ReceiptStream() types.StreamProvider ReceiptStream() types.StreamProvider
InviteStream() types.StreamProvider InviteStream() types.StreamProvider
SendToDeviceStream() types.StreamProvider SendToDeviceStream() types.StreamProvider
AccountDataStream() types.StreamProvider
DeviceListStream() types.StreamLogProvider DeviceListStream() types.StreamLogProvider
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.

View file

@ -0,0 +1,31 @@
package shared
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
)
type AccountDataStreamProvider struct {
StreamProvider
}
func (p *AccountDataStreamProvider) Setup() {
p.StreamProvider.Setup()
}
func (p *AccountDataStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *AccountDataStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
return to
}

View file

@ -57,6 +57,7 @@ type Database struct {
ReceiptStreamProvider types.StreamProvider ReceiptStreamProvider types.StreamProvider
InviteStreamProvider types.StreamProvider InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.StreamLogProvider DeviceListStreamProvider types.StreamLogProvider
} }
@ -69,6 +70,7 @@ func (d *Database) ConfigureProviders() {
d.ReceiptStreamProvider = &ReceiptStreamProvider{StreamProvider{DB: d}} d.ReceiptStreamProvider = &ReceiptStreamProvider{StreamProvider{DB: d}}
d.InviteStreamProvider = &InviteStreamProvider{StreamProvider{DB: d}} d.InviteStreamProvider = &InviteStreamProvider{StreamProvider{DB: d}}
d.SendToDeviceStreamProvider = &SendToDeviceStreamProvider{StreamProvider{DB: d}} d.SendToDeviceStreamProvider = &SendToDeviceStreamProvider{StreamProvider{DB: d}}
d.AccountDataStreamProvider = &AccountDataStreamProvider{StreamProvider{DB: d}}
d.DeviceListStreamProvider = &DeviceListStreamProvider{StreamLogProvider{DB: d}} d.DeviceListStreamProvider = &DeviceListStreamProvider{StreamLogProvider{DB: d}}
d.PDUStreamProvider.Setup() d.PDUStreamProvider.Setup()
@ -76,6 +78,7 @@ func (d *Database) ConfigureProviders() {
d.ReceiptStreamProvider.Setup() d.ReceiptStreamProvider.Setup()
d.InviteStreamProvider.Setup() d.InviteStreamProvider.Setup()
d.SendToDeviceStreamProvider.Setup() d.SendToDeviceStreamProvider.Setup()
d.AccountDataStreamProvider.Setup()
d.DeviceListStreamProvider.Setup() d.DeviceListStreamProvider.Setup()
d.PDUTopologyProvider = &PDUTopologyProvider{DB: d} d.PDUTopologyProvider = &PDUTopologyProvider{DB: d}
@ -105,6 +108,10 @@ func (d *Database) SendToDeviceStream() types.StreamProvider {
return d.SendToDeviceStreamProvider return d.SendToDeviceStreamProvider
} }
func (d *Database) AccountDataStream() types.StreamProvider {
return d.AccountDataStreamProvider
}
func (d *Database) DeviceListStream() types.StreamLogProvider { func (d *Database) DeviceListStream() types.StreamLogProvider {
return d.DeviceListStreamProvider return d.DeviceListStreamProvider
} }

View file

@ -50,6 +50,7 @@ type RequestPool struct {
receiptStream types.StreamProvider receiptStream types.StreamProvider
sendToDeviceStream types.StreamProvider sendToDeviceStream types.StreamProvider
inviteStream types.StreamProvider inviteStream types.StreamProvider
accountDataStream types.StreamProvider
deviceListStream types.StreamLogProvider deviceListStream types.StreamLogProvider
} }
@ -71,6 +72,7 @@ func NewRequestPool(
receiptStream: db.ReceiptStream(), receiptStream: db.ReceiptStream(),
sendToDeviceStream: db.SendToDeviceStream(), sendToDeviceStream: db.SendToDeviceStream(),
inviteStream: db.InviteStream(), inviteStream: db.InviteStream(),
accountDataStream: db.AccountDataStream(),
deviceListStream: db.DeviceListStream(), deviceListStream: db.DeviceListStream(),
} }
go rp.cleanLastSeen() go rp.cleanLastSeen()
@ -189,6 +191,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
case <-rp.receiptStream.NotifyAfter(waitctx, syncReq.Since.ReceiptPosition): case <-rp.receiptStream.NotifyAfter(waitctx, syncReq.Since.ReceiptPosition):
case <-rp.inviteStream.NotifyAfter(waitctx, syncReq.Since.InvitePosition): case <-rp.inviteStream.NotifyAfter(waitctx, syncReq.Since.InvitePosition):
case <-rp.sendToDeviceStream.NotifyAfter(waitctx, syncReq.Since.SendToDevicePosition): case <-rp.sendToDeviceStream.NotifyAfter(waitctx, syncReq.Since.SendToDevicePosition):
case <-rp.accountDataStream.NotifyAfter(waitctx, syncReq.Since.AccountDataPosition):
case <-rp.deviceListStream.NotifyAfter(waitctx, syncReq.Since.DeviceListPosition): case <-rp.deviceListStream.NotifyAfter(waitctx, syncReq.Since.DeviceListPosition):
} }
@ -216,6 +219,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
SendToDevicePosition: rp.sendToDeviceStream.CompleteSync( SendToDevicePosition: rp.sendToDeviceStream.CompleteSync(
syncReq.Context, syncReq, syncReq.Context, syncReq,
), ),
AccountDataPosition: rp.accountDataStream.CompleteSync(
syncReq.Context, syncReq,
),
DeviceListPosition: rp.deviceListStream.CompleteSync( DeviceListPosition: rp.deviceListStream.CompleteSync(
syncReq.Context, syncReq, syncReq.Context, syncReq,
), ),
@ -243,6 +249,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
syncReq.Context, syncReq, syncReq.Since.SendToDevicePosition, syncReq.Context, syncReq, syncReq.Since.SendToDevicePosition,
rp.sendToDeviceStream.LatestPosition(syncReq.Context), rp.sendToDeviceStream.LatestPosition(syncReq.Context),
), ),
AccountDataPosition: rp.accountDataStream.IncrementalSync(
syncReq.Context, syncReq, syncReq.Since.AccountDataPosition,
rp.accountDataStream.LatestPosition(syncReq.Context),
),
DeviceListPosition: rp.deviceListStream.IncrementalSync( DeviceListPosition: rp.deviceListStream.IncrementalSync(
syncReq.Context, syncReq, syncReq.Since.DeviceListPosition, syncReq.Context, syncReq, syncReq.Since.DeviceListPosition,
rp.db.DeviceListStream().LatestPosition(syncReq.Context), rp.db.DeviceListStream().LatestPosition(syncReq.Context),

View file

@ -114,6 +114,7 @@ type StreamingToken struct {
ReceiptPosition StreamPosition ReceiptPosition StreamPosition
SendToDevicePosition StreamPosition SendToDevicePosition StreamPosition
InvitePosition StreamPosition InvitePosition StreamPosition
AccountDataPosition StreamPosition
DeviceListPosition LogPosition DeviceListPosition LogPosition
} }
@ -130,10 +131,10 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
func (t StreamingToken) String() string { func (t StreamingToken) String() string {
posStr := fmt.Sprintf( posStr := fmt.Sprintf(
"s%d_%d_%d_%d_%d", "s%d_%d_%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition, t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition, t.ReceiptPosition, t.SendToDevicePosition,
t.InvitePosition, t.InvitePosition, t.AccountDataPosition,
) )
if dl := t.DeviceListPosition; !dl.IsEmpty() { if dl := t.DeviceListPosition; !dl.IsEmpty() {
posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset) posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset)
@ -154,6 +155,8 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true return true
case t.InvitePosition > other.InvitePosition: case t.InvitePosition > other.InvitePosition:
return true return true
case t.AccountDataPosition > other.AccountDataPosition:
return true
case t.DeviceListPosition.IsAfter(&other.DeviceListPosition): case t.DeviceListPosition.IsAfter(&other.DeviceListPosition):
return true return true
} }
@ -161,7 +164,7 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
} }
func (t *StreamingToken) IsEmpty() bool { func (t *StreamingToken) IsEmpty() bool {
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition == 0 && t.DeviceListPosition.IsEmpty() return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition == 0 && t.DeviceListPosition.IsEmpty()
} }
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@ -193,6 +196,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
if other.InvitePosition > 0 { if other.InvitePosition > 0 {
t.InvitePosition = other.InvitePosition t.InvitePosition = other.InvitePosition
} }
if other.AccountDataPosition > 0 {
t.AccountDataPosition = other.AccountDataPosition
}
if other.DeviceListPosition.Offset > 0 { if other.DeviceListPosition.Offset > 0 {
t.DeviceListPosition = other.DeviceListPosition t.DeviceListPosition = other.DeviceListPosition
} }
@ -286,7 +292,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
} }
categories := strings.Split(tok[1:], ".") categories := strings.Split(tok[1:], ".")
parts := strings.Split(categories[0], "_") parts := strings.Split(categories[0], "_")
var positions [5]StreamPosition var positions [6]StreamPosition
for i, p := range parts { for i, p := range parts {
if i > len(positions) { if i > len(positions) {
break break
@ -304,6 +310,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
ReceiptPosition: positions[2], ReceiptPosition: positions[2],
SendToDevicePosition: positions[3], SendToDevicePosition: positions[3],
InvitePosition: positions[4], InvitePosition: positions[4],
AccountDataPosition: positions[5],
} }
// dl-0-1234 // dl-0-1234
// $log_name-$partition-$offset // $log_name-$partition-$offset

View file

@ -10,10 +10,10 @@ import (
func TestNewSyncTokenWithLogs(t *testing.T) { func TestNewSyncTokenWithLogs(t *testing.T) {
tests := map[string]*StreamingToken{ tests := map[string]*StreamingToken{
"s4_0_0_0_0": { "s4_0_0_0_0_0": {
PDUPosition: 4, PDUPosition: 4,
}, },
"s4_0_0_0_0.dl-0-123": { "s4_0_0_0_0_0.dl-0-123": {
PDUPosition: 4, PDUPosition: 4,
DeviceListPosition: LogPosition{ DeviceListPosition: LogPosition{
Partition: 0, Partition: 0,
@ -42,10 +42,10 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
func TestSyncTokens(t *testing.T) { func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{ shouldPass := map[string]string{
"s4_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, LogPosition{}}.String(), "s4_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, LogPosition{}}.String(),
"s3_1_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, LogPosition{1, 2}}.String(), "s3_1_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, LogPosition{1, 2}}.String(),
"s3_1_2_3_5": StreamingToken{3, 1, 2, 3, 5, LogPosition{}}.String(), "s3_1_2_3_5_0": StreamingToken{3, 1, 2, 3, 5, 0, LogPosition{}}.String(),
"t3_1": TopologyToken{3, 1}.String(), "t3_1": TopologyToken{3, 1}.String(),
} }
for a, b := range shouldPass { for a, b := range shouldPass {