From dc52d9ae1c5360853c6aabbfe2fd030c47866577 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 14 May 2020 17:45:51 +0100 Subject: [PATCH] Add Range --- syncapi/storage/interface.go | 2 +- .../storage/postgres/account_data_table.go | 11 ++------ syncapi/storage/shared/syncserver.go | 4 +-- syncapi/storage/sqlite3/account_data_table.go | 11 ++------ syncapi/storage/tables/interface.go | 4 +-- syncapi/sync/requestpool.go | 15 ++++++++-- syncapi/types/types.go | 28 +++++++++++++++++++ 7 files changed, 49 insertions(+), 26 deletions(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 227167899..eba008b34 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -64,7 +64,7 @@ type Database interface { // Returns a map following the format data[roomID] = []dataTypes // If no data is retrieved, returns an empty map // If there was an issue with the retrieval, returns an error - GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) + GetAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) // UpsertAccountData keeps track of new or updated account data, by saving the type // of the new/updated data, and the user ID and room ID the data is related to (empty) // room ID means the data isn't specific to any room) diff --git a/syncapi/storage/postgres/account_data_table.go b/syncapi/storage/postgres/account_data_table.go index 58fb21983..a5e0c1214 100644 --- a/syncapi/storage/postgres/account_data_table.go +++ b/syncapi/storage/postgres/account_data_table.go @@ -100,19 +100,12 @@ func (s *accountDataStatements) InsertAccountData( func (s *accountDataStatements) SelectAccountDataInRange( ctx context.Context, userID string, - oldPos, newPos types.StreamPosition, + r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter, ) (data map[string][]string, err error) { data = make(map[string][]string) - // If both positions are the same, it means that the data was saved after the - // latest room event. In that case, we need to decrement the old position as - // it would prevent the SQL request from returning anything. - if oldPos == newPos { - oldPos-- - } - - rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos, + rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High(), pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.Types)), pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.NotTypes)), accountDataEventFilter.Limit, diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index e03a6b9ff..077de8227 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -167,10 +167,10 @@ func (d *Database) RetireInviteEvent( // If no data is retrieved, returns an empty map // If there was an issue with the retrieval, returns an error func (d *Database) GetAccountDataInRange( - ctx context.Context, userID string, oldPos, newPos types.StreamPosition, + ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter, ) (map[string][]string, error) { - return d.AccountData.SelectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) + return d.AccountData.SelectAccountDataInRange(ctx, userID, r, accountDataFilterPart) } // UpsertAccountData keeps track of new or updated account data, by saving the type diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go index e5f2417b8..435a399c3 100644 --- a/syncapi/storage/sqlite3/account_data_table.go +++ b/syncapi/storage/sqlite3/account_data_table.go @@ -91,19 +91,12 @@ func (s *accountDataStatements) InsertAccountData( func (s *accountDataStatements) SelectAccountDataInRange( ctx context.Context, userID string, - oldPos, newPos types.StreamPosition, + r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter, ) (data map[string][]string, err error) { data = make(map[string][]string) - // If both positions are the same, it means that the data was saved after the - // latest room event. In that case, we need to decrement the old position as - // it would prevent the SQL request from returning anything. - if oldPos == newPos { - oldPos-- - } - - rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos) + rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High()) if err != nil { return } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 1e5351b5b..d50786548 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -11,8 +11,8 @@ import ( type AccountData interface { InsertAccountData(ctx context.Context, txn *sql.Tx, userID, roomID, dataType string) (pos types.StreamPosition, err error) - // SelectAccountDataInRange returns a map of room ID to a list of `dataType`. The range is exclusive of `lowPos` and inclusive of `hiPos`. - SelectAccountDataInRange(ctx context.Context, userID string, lowPos, hiPos types.StreamPosition, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error) + // SelectAccountDataInRange returns a map of room ID to a list of `dataType`. The range is exclusive of low and inclusive of high. + SelectAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error) SelectMaxAccountDataID(ctx context.Context, txn *sql.Tx) (id int64, err error) } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 126e76f5b..6e0f44e9a 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -184,11 +184,20 @@ func (rp *RequestPool) appendAccountData( return data, nil } + r := types.Range{ + From: req.since.PDUPosition(), + To: currentPos, + } + // If both positions are the same, it means that the data was saved after the + // latest room event. In that case, we need to decrement the old position as + // results are exclusive of Low. + if r.Low() == r.High() { + r.From-- + } + // Sync is not initial, get all account data since the latest sync dataTypes, err := rp.db.GetAccountDataInRange( - req.ctx, userID, - types.StreamPosition(req.since.PDUPosition()), types.StreamPosition(currentPos), - accountDataFilter, + req.ctx, userID, r, accountDataFilter, ) if err != nil { return nil, err diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 8a79ccd43..caa1b3ade 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -47,6 +47,34 @@ type StreamEvent struct { ExcludeFromSync bool } +// Range represents a range between two stream positions. +type Range struct { + // From is the position the client has already received. + From StreamPosition + // To is the position the client is going towards. + To StreamPosition + // True if the client is going backwards + Backwards bool +} + +// Low returns the low number of the range. +// This represents the position the client already has and hence is exclusive. +func (r *Range) Low() StreamPosition { + if !r.Backwards { + return r.From + } + return r.To +} + +// High returns the high number of the range +// This represents the position the client is going towards and hence is inclusive. +func (r *Range) High() StreamPosition { + if !r.Backwards { + return r.To + } + return r.From +} + // SyncTokenType represents the type of a sync token. // It can be either "s" (representing a position in the whole stream of events) // or "t" (representing a position in a room's topology/depth).