mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-12 09:23:09 -06:00
parent
a8daf97a13
commit
cc56e0b88e
|
|
@ -19,6 +19,7 @@ package common
|
||||||
type AccountData struct {
|
type AccountData struct {
|
||||||
RoomID string `json:"room_id"`
|
RoomID string `json:"room_id"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
|
Sender string `json:"sender"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProfileResponse is a struct containing all known user profile data
|
// ProfileResponse is a struct containing all known user profile data
|
||||||
|
|
|
||||||
|
|
@ -79,7 +79,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
|
||||||
}).Info("received data from client API server")
|
}).Info("received data from client API server")
|
||||||
|
|
||||||
syncStreamPos, err := s.db.UpsertAccountData(
|
syncStreamPos, err := s.db.UpsertAccountData(
|
||||||
context.TODO(), string(msg.Key), output.RoomID, output.Type,
|
context.TODO(), string(msg.Key), output.RoomID, output.Type, output.Sender,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,9 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
)
|
)
|
||||||
|
|
@ -38,16 +40,17 @@ CREATE TABLE IF NOT EXISTS syncapi_account_data_type (
|
||||||
room_id TEXT NOT NULL,
|
room_id TEXT NOT NULL,
|
||||||
-- Type of the data
|
-- Type of the data
|
||||||
type TEXT NOT NULL,
|
type TEXT NOT NULL,
|
||||||
|
sender TEXT NOT NULL,
|
||||||
|
|
||||||
-- We don't want two entries of the same type for the same user
|
-- We don't want two entries of the same type for the same user
|
||||||
CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type)
|
CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id);
|
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id, type, sender);
|
||||||
`
|
`
|
||||||
|
|
||||||
const insertAccountDataSQL = "" +
|
const insertAccountDataSQL = "" +
|
||||||
"INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" +
|
"INSERT INTO syncapi_account_data_type (user_id, room_id, type, sender) VALUES ($1, $2, $3, $4)" +
|
||||||
" ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" +
|
" ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" +
|
||||||
" DO UPDATE SET id = EXCLUDED.id" +
|
" DO UPDATE SET id = EXCLUDED.id" +
|
||||||
" RETURNING id"
|
" RETURNING id"
|
||||||
|
|
@ -55,7 +58,11 @@ const insertAccountDataSQL = "" +
|
||||||
const selectAccountDataInRangeSQL = "" +
|
const selectAccountDataInRangeSQL = "" +
|
||||||
"SELECT room_id, type FROM syncapi_account_data_type" +
|
"SELECT room_id, type FROM syncapi_account_data_type" +
|
||||||
" WHERE user_id = $1 AND id > $2 AND id <= $3" +
|
" WHERE user_id = $1 AND id > $2 AND id <= $3" +
|
||||||
" ORDER BY id ASC"
|
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||||
|
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||||
|
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||||
|
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||||
|
" ORDER BY id ASC LIMIT $8"
|
||||||
|
|
||||||
const selectMaxAccountDataIDSQL = "" +
|
const selectMaxAccountDataIDSQL = "" +
|
||||||
"SELECT MAX(id) FROM syncapi_account_data_type"
|
"SELECT MAX(id) FROM syncapi_account_data_type"
|
||||||
|
|
@ -85,16 +92,16 @@ func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
|
||||||
func (s *accountDataStatements) insertAccountData(
|
func (s *accountDataStatements) insertAccountData(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
userID, roomID, dataType string,
|
userID, roomID, dataType, sender string,
|
||||||
) (pos int64, err error) {
|
) (pos int64, err error) {
|
||||||
err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos)
|
err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType, sender).Scan(&pos)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *accountDataStatements) selectAccountDataInRange(
|
func (s *accountDataStatements) selectAccountDataInRange(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
userID string,
|
userID string,
|
||||||
oldPos, newPos types.StreamPosition,
|
oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrix.FilterPart,
|
||||||
) (data map[string][]string, err error) {
|
) (data map[string][]string, err error) {
|
||||||
data = make(map[string][]string)
|
data = make(map[string][]string)
|
||||||
|
|
||||||
|
|
@ -105,7 +112,13 @@ func (s *accountDataStatements) selectAccountDataInRange(
|
||||||
oldPos--
|
oldPos--
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos)
|
rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos,
|
||||||
|
pq.StringArray(filterConvertWildcardToSQL(accountDataFilterPart.Types)),
|
||||||
|
pq.StringArray(filterConvertWildcardToSQL(accountDataFilterPart.NotTypes)),
|
||||||
|
pq.StringArray(accountDataFilterPart.Senders),
|
||||||
|
pq.StringArray(accountDataFilterPart.NotSenders),
|
||||||
|
accountDataFilterPart.Limit,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -364,9 +364,9 @@ var txReadOnlySnapshot = sql.TxOptions{
|
||||||
// If no data is retrieved, returns an empty map
|
// If no data is retrieved, returns an empty map
|
||||||
// If there was an issue with the retrieval, returns an error
|
// If there was an issue with the retrieval, returns an error
|
||||||
func (d *SyncServerDatabase) GetAccountDataInRange(
|
func (d *SyncServerDatabase) GetAccountDataInRange(
|
||||||
ctx context.Context, userID string, oldPos, newPos types.StreamPosition,
|
ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrix.FilterPart,
|
||||||
) (map[string][]string, error) {
|
) (map[string][]string, error) {
|
||||||
return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos)
|
return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpsertAccountData keeps track of new or updated account data, by saving the type
|
// UpsertAccountData keeps track of new or updated account data, by saving the type
|
||||||
|
|
@ -376,9 +376,9 @@ func (d *SyncServerDatabase) GetAccountDataInRange(
|
||||||
// creates a new row, else update the existing one
|
// creates a new row, else update the existing one
|
||||||
// Returns an error if there was an issue with the upsert
|
// Returns an error if there was an issue with the upsert
|
||||||
func (d *SyncServerDatabase) UpsertAccountData(
|
func (d *SyncServerDatabase) UpsertAccountData(
|
||||||
ctx context.Context, userID, roomID, dataType string,
|
ctx context.Context, userID, roomID, dataType, sender string,
|
||||||
) (types.StreamPosition, error) {
|
) (types.StreamPosition, error) {
|
||||||
pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType)
|
pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType, sender)
|
||||||
return types.StreamPosition(pos), err
|
return types.StreamPosition(pos), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -170,7 +170,7 @@ func (rp *RequestPool) appendAccountData(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync is not initial, get all account data since the latest sync
|
// Sync is not initial, get all account data since the latest sync
|
||||||
dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos)
|
dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos, &req.filter.AccountData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue