Get initial position for account data

This commit is contained in:
Neil Alexander 2021-01-08 14:00:23 +00:00
parent 12e563bce9
commit 7d3356f563
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 22 additions and 0 deletions

View file

@ -32,6 +32,7 @@ type Database interface {
MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error)
MaxStreamTokenForAccountData(ctx context.Context) (types.StreamPosition, error)
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)

View file

@ -85,6 +85,14 @@ func (d *Database) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPo
return types.StreamPosition(id), nil return types.StreamPosition(id), nil
} }
func (d *Database) MaxStreamTokenForAccountData(ctx context.Context) (types.StreamPosition, error) {
id, err := d.AccountData.SelectMaxAccountDataID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.Invites.SelectMaxAccountDataID: %w", err)
}
return types.StreamPosition(id), nil
}
func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart) return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart)
} }

View file

@ -13,6 +13,19 @@ type AccountDataStreamProvider struct {
userAPI userapi.UserInternalAPI userAPI userapi.UserInternalAPI
} }
func (p *AccountDataStreamProvider) Setup() {
p.StreamProvider.Setup()
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamTokenForAccountData(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *AccountDataStreamProvider) CompleteSync( func (p *AccountDataStreamProvider) CompleteSync(
ctx context.Context, ctx context.Context,
req *types.SyncRequest, req *types.SyncRequest,