diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 477436196..93372f7a2 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -37,6 +37,7 @@ type Database interface { ReceiptStream() types.StreamProvider InviteStream() types.StreamProvider SendToDeviceStream() types.StreamProvider + AccountDataStream() types.StreamProvider DeviceListStream() types.StreamLogProvider // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. diff --git a/syncapi/storage/shared/stream_accountdata.go b/syncapi/storage/shared/stream_accountdata.go new file mode 100644 index 000000000..8487dd683 --- /dev/null +++ b/syncapi/storage/shared/stream_accountdata.go @@ -0,0 +1,31 @@ +package shared + +import ( + "context" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +type AccountDataStreamProvider struct { + StreamProvider +} + +func (p *AccountDataStreamProvider) Setup() { + p.StreamProvider.Setup() +} + +func (p *AccountDataStreamProvider) CompleteSync( + ctx context.Context, + req *types.SyncRequest, +) types.StreamPosition { + return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) +} + +func (p *AccountDataStreamProvider) IncrementalSync( + ctx context.Context, + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { + + return to +} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index cb25818c0..130c3f4b2 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -57,6 +57,7 @@ type Database struct { ReceiptStreamProvider types.StreamProvider InviteStreamProvider types.StreamProvider SendToDeviceStreamProvider types.StreamProvider + AccountDataStreamProvider types.StreamProvider DeviceListStreamProvider types.StreamLogProvider } @@ -69,6 +70,7 @@ func (d *Database) ConfigureProviders() { d.ReceiptStreamProvider = &ReceiptStreamProvider{StreamProvider{DB: d}} d.InviteStreamProvider = &InviteStreamProvider{StreamProvider{DB: d}} d.SendToDeviceStreamProvider = &SendToDeviceStreamProvider{StreamProvider{DB: d}} + d.AccountDataStreamProvider = &AccountDataStreamProvider{StreamProvider{DB: d}} d.DeviceListStreamProvider = &DeviceListStreamProvider{StreamLogProvider{DB: d}} d.PDUStreamProvider.Setup() @@ -76,6 +78,7 @@ func (d *Database) ConfigureProviders() { d.ReceiptStreamProvider.Setup() d.InviteStreamProvider.Setup() d.SendToDeviceStreamProvider.Setup() + d.AccountDataStreamProvider.Setup() d.DeviceListStreamProvider.Setup() d.PDUTopologyProvider = &PDUTopologyProvider{DB: d} @@ -105,6 +108,10 @@ func (d *Database) SendToDeviceStream() types.StreamProvider { return d.SendToDeviceStreamProvider } +func (d *Database) AccountDataStream() types.StreamProvider { + return d.AccountDataStreamProvider +} + func (d *Database) DeviceListStream() types.StreamLogProvider { return d.DeviceListStreamProvider } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index d03748248..2827271fd 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -50,6 +50,7 @@ type RequestPool struct { receiptStream types.StreamProvider sendToDeviceStream types.StreamProvider inviteStream types.StreamProvider + accountDataStream types.StreamProvider deviceListStream types.StreamLogProvider } @@ -71,6 +72,7 @@ func NewRequestPool( receiptStream: db.ReceiptStream(), sendToDeviceStream: db.SendToDeviceStream(), inviteStream: db.InviteStream(), + accountDataStream: db.AccountDataStream(), deviceListStream: db.DeviceListStream(), } go rp.cleanLastSeen() @@ -189,6 +191,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. case <-rp.receiptStream.NotifyAfter(waitctx, syncReq.Since.ReceiptPosition): case <-rp.inviteStream.NotifyAfter(waitctx, syncReq.Since.InvitePosition): case <-rp.sendToDeviceStream.NotifyAfter(waitctx, syncReq.Since.SendToDevicePosition): + case <-rp.accountDataStream.NotifyAfter(waitctx, syncReq.Since.AccountDataPosition): case <-rp.deviceListStream.NotifyAfter(waitctx, syncReq.Since.DeviceListPosition): } @@ -216,6 +219,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. SendToDevicePosition: rp.sendToDeviceStream.CompleteSync( syncReq.Context, syncReq, ), + AccountDataPosition: rp.accountDataStream.CompleteSync( + syncReq.Context, syncReq, + ), DeviceListPosition: rp.deviceListStream.CompleteSync( syncReq.Context, syncReq, ), @@ -243,6 +249,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. syncReq.Context, syncReq, syncReq.Since.SendToDevicePosition, rp.sendToDeviceStream.LatestPosition(syncReq.Context), ), + AccountDataPosition: rp.accountDataStream.IncrementalSync( + syncReq.Context, syncReq, syncReq.Since.AccountDataPosition, + rp.accountDataStream.LatestPosition(syncReq.Context), + ), DeviceListPosition: rp.deviceListStream.IncrementalSync( syncReq.Context, syncReq, syncReq.Since.DeviceListPosition, rp.db.DeviceListStream().LatestPosition(syncReq.Context), diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 8e5260326..a9a4d40c1 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -114,6 +114,7 @@ type StreamingToken struct { ReceiptPosition StreamPosition SendToDevicePosition StreamPosition InvitePosition StreamPosition + AccountDataPosition StreamPosition DeviceListPosition LogPosition } @@ -130,10 +131,10 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) { func (t StreamingToken) String() string { posStr := fmt.Sprintf( - "s%d_%d_%d_%d_%d", + "s%d_%d_%d_%d_%d_%d", t.PDUPosition, t.TypingPosition, t.ReceiptPosition, t.SendToDevicePosition, - t.InvitePosition, + t.InvitePosition, t.AccountDataPosition, ) if dl := t.DeviceListPosition; !dl.IsEmpty() { posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset) @@ -154,6 +155,8 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { return true case t.InvitePosition > other.InvitePosition: return true + case t.AccountDataPosition > other.AccountDataPosition: + return true case t.DeviceListPosition.IsAfter(&other.DeviceListPosition): return true } @@ -161,7 +164,7 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { } func (t *StreamingToken) IsEmpty() bool { - return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition == 0 && t.DeviceListPosition.IsEmpty() + return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition == 0 && t.DeviceListPosition.IsEmpty() } // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. @@ -193,6 +196,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) { if other.InvitePosition > 0 { t.InvitePosition = other.InvitePosition } + if other.AccountDataPosition > 0 { + t.AccountDataPosition = other.AccountDataPosition + } if other.DeviceListPosition.Offset > 0 { t.DeviceListPosition = other.DeviceListPosition } @@ -286,7 +292,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { } categories := strings.Split(tok[1:], ".") parts := strings.Split(categories[0], "_") - var positions [5]StreamPosition + var positions [6]StreamPosition for i, p := range parts { if i > len(positions) { break @@ -304,6 +310,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { ReceiptPosition: positions[2], SendToDevicePosition: positions[3], InvitePosition: positions[4], + AccountDataPosition: positions[5], } // dl-0-1234 // $log_name-$partition-$offset diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index 3698fbeea..3e5777888 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -10,10 +10,10 @@ import ( func TestNewSyncTokenWithLogs(t *testing.T) { tests := map[string]*StreamingToken{ - "s4_0_0_0_0": { + "s4_0_0_0_0_0": { PDUPosition: 4, }, - "s4_0_0_0_0.dl-0-123": { + "s4_0_0_0_0_0.dl-0-123": { PDUPosition: 4, DeviceListPosition: LogPosition{ Partition: 0, @@ -42,10 +42,10 @@ func TestNewSyncTokenWithLogs(t *testing.T) { func TestSyncTokens(t *testing.T) { shouldPass := map[string]string{ - "s4_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, LogPosition{}}.String(), - "s3_1_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, LogPosition{1, 2}}.String(), - "s3_1_2_3_5": StreamingToken{3, 1, 2, 3, 5, LogPosition{}}.String(), - "t3_1": TopologyToken{3, 1}.String(), + "s4_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, LogPosition{}}.String(), + "s3_1_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, LogPosition{1, 2}}.String(), + "s3_1_2_3_5_0": StreamingToken{3, 1, 2, 3, 5, 0, LogPosition{}}.String(), + "t3_1": TopologyToken{3, 1}.String(), } for a, b := range shouldPass {