From 7d3356f5635809248d1ef81d17a740cfbfc7698b Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 8 Jan 2021 14:00:23 +0000 Subject: [PATCH] Get initial position for account data --- syncapi/storage/interface.go | 1 + syncapi/storage/shared/syncserver.go | 8 ++++++++ syncapi/streams/stream_accountdata.go | 13 +++++++++++++ 3 files changed, 22 insertions(+) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index db5264ce6..45d724372 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -32,6 +32,7 @@ type Database interface { MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error) + MaxStreamTokenForAccountData(ctx context.Context) (types.StreamPosition, error) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index af8fc1974..85d1a095f 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -85,6 +85,14 @@ func (d *Database) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPo return types.StreamPosition(id), nil } +func (d *Database) MaxStreamTokenForAccountData(ctx context.Context) (types.StreamPosition, error) { + id, err := d.AccountData.SelectMaxAccountDataID(ctx, nil) + if err != nil { + return 0, fmt.Errorf("d.Invites.SelectMaxAccountDataID: %w", err) + } + return types.StreamPosition(id), nil +} + func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart) } diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index dd9b520cb..666414f1e 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -13,6 +13,19 @@ type AccountDataStreamProvider struct { userAPI userapi.UserInternalAPI } +func (p *AccountDataStreamProvider) Setup() { + p.StreamProvider.Setup() + + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := p.DB.MaxStreamTokenForAccountData(context.Background()) + if err != nil { + panic(err) + } + p.latest = id +} + func (p *AccountDataStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest,