From cc56e0b88e7b85114b2318856f4a5609ad75b1de Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Sun, 14 Jan 2018 23:28:41 +0100 Subject: [PATCH] Account data filtering Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../matrix-org/dendrite/common/types.go | 1 + .../dendrite/syncapi/consumers/clientapi.go | 2 +- .../syncapi/storage/account_data_table.go | 27 ++++++++++++++----- .../dendrite/syncapi/storage/syncserver.go | 8 +++--- .../dendrite/syncapi/sync/requestpool.go | 2 +- 5 files changed, 27 insertions(+), 13 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/common/types.go b/src/github.com/matrix-org/dendrite/common/types.go index d8c5c5a7e..a17e87b4d 100644 --- a/src/github.com/matrix-org/dendrite/common/types.go +++ b/src/github.com/matrix-org/dendrite/common/types.go @@ -19,6 +19,7 @@ package common type AccountData struct { RoomID string `json:"room_id"` Type string `json:"type"` + Sender string `json:"sender"` } // ProfileResponse is a struct containing all known user profile data diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go index d05a76920..b1c4d700b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go @@ -79,7 +79,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Info("received data from client API server") syncStreamPos, err := s.db.UpsertAccountData( - context.TODO(), string(msg.Key), output.RoomID, output.Type, + context.TODO(), string(msg.Key), output.RoomID, output.Type, output.Sender, ) if err != nil { log.WithFields(log.Fields{ diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go index d4d74d158..30ccc26b8 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go @@ -18,7 +18,9 @@ import ( "context" "database/sql" + "github.com/lib/pq" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/dendrite/syncapi/types" ) @@ -38,16 +40,17 @@ CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( room_id TEXT NOT NULL, -- Type of the data type TEXT NOT NULL, + sender TEXT NOT NULL, -- We don't want two entries of the same type for the same user CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type) ); -CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id); +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id, type, sender); ` const insertAccountDataSQL = "" + - "INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" + + "INSERT INTO syncapi_account_data_type (user_id, room_id, type, sender) VALUES ($1, $2, $3, $4)" + " ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" + " DO UPDATE SET id = EXCLUDED.id" + " RETURNING id" @@ -55,7 +58,11 @@ const insertAccountDataSQL = "" + const selectAccountDataInRangeSQL = "" + "SELECT room_id, type FROM syncapi_account_data_type" + " WHERE user_id = $1 AND id > $2 AND id <= $3" + - " ORDER BY id ASC" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + + " ORDER BY id ASC LIMIT $8" const selectMaxAccountDataIDSQL = "" + "SELECT MAX(id) FROM syncapi_account_data_type" @@ -85,16 +92,16 @@ func (s *accountDataStatements) prepare(db *sql.DB) (err error) { func (s *accountDataStatements) insertAccountData( ctx context.Context, - userID, roomID, dataType string, + userID, roomID, dataType, sender string, ) (pos int64, err error) { - err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos) + err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType, sender).Scan(&pos) return } func (s *accountDataStatements) selectAccountDataInRange( ctx context.Context, userID string, - oldPos, newPos types.StreamPosition, + oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrix.FilterPart, ) (data map[string][]string, err error) { data = make(map[string][]string) @@ -105,7 +112,13 @@ func (s *accountDataStatements) selectAccountDataInRange( oldPos-- } - rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos) + rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos, + pq.StringArray(filterConvertWildcardToSQL(accountDataFilterPart.Types)), + pq.StringArray(filterConvertWildcardToSQL(accountDataFilterPart.NotTypes)), + pq.StringArray(accountDataFilterPart.Senders), + pq.StringArray(accountDataFilterPart.NotSenders), + accountDataFilterPart.Limit, + ) if err != nil { return } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index bfec0be30..9742b0388 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -364,9 +364,9 @@ var txReadOnlySnapshot = sql.TxOptions{ // If no data is retrieved, returns an empty map // If there was an issue with the retrieval, returns an error func (d *SyncServerDatabase) GetAccountDataInRange( - ctx context.Context, userID string, oldPos, newPos types.StreamPosition, + ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrix.FilterPart, ) (map[string][]string, error) { - return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos) + return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) } // UpsertAccountData keeps track of new or updated account data, by saving the type @@ -376,9 +376,9 @@ func (d *SyncServerDatabase) GetAccountDataInRange( // creates a new row, else update the existing one // Returns an error if there was an issue with the upsert func (d *SyncServerDatabase) UpsertAccountData( - ctx context.Context, userID, roomID, dataType string, + ctx context.Context, userID, roomID, dataType, sender string, ) (types.StreamPosition, error) { - pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType) + pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType, sender) return types.StreamPosition(pos), err } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 2ca1e9cf7..4751e38dd 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -170,7 +170,7 @@ func (rp *RequestPool) appendAccountData( } // Sync is not initial, get all account data since the latest sync - dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos) + dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos, &req.filter.AccountData) if err != nil { return nil, err }