diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index c65c89b0a..e1300d3f5 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -158,5 +158,6 @@ type Database interface { GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) // GetKnownRooms returns a list of all rooms we know about. GetKnownRooms(ctx context.Context) ([]string, error) - StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string) error + StoreReceipt(ctx context.Context, roomID, receiptType, userID, eventID string) error + GetRoomReceipts(ctx context.Context, roomID string, timestamp int) ([]types.Receipt, error) } diff --git a/roomserver/storage/postgres/receipt_table.go b/roomserver/storage/postgres/receipt_table.go index c06fe49c1..d9b98737b 100644 --- a/roomserver/storage/postgres/receipt_table.go +++ b/roomserver/storage/postgres/receipt_table.go @@ -19,10 +19,10 @@ import ( "database/sql" "time" - "github.com/pkg/errors" - "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/tables" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/pkg/errors" ) const receiptsSchema = ` @@ -49,9 +49,15 @@ const upsertReceipt = "" + " ON CONFLICT (room_id, receipt_type, user_id)" + " DO UPDATE SET event_id = $4, receipt_ts = $5" +const selectRoomReceipts = "" + + "SELECT id, room_id, receipt_type, user_id, event_id, receipt_ts" + + " FROM roomserver_receipts" + + " WHERE room_id = $1 AND receipt_ts > $2" + type receiptStatements struct { - db *sql.DB - upsertReceipt *sql.Stmt + db *sql.DB + upsertReceipt *sql.Stmt + selectRoomReceipts *sql.Stmt } func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) { @@ -65,7 +71,9 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) { if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement") } - + if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { + return nil, errors.Wrap(err, "unable to prepare selectRoomReceipts statement") + } return r, nil } @@ -75,3 +83,23 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room _, err := stmt.ExecContext(ctx, roomId, receiptType, userId, eventId, receiptTs) return err } + +func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, timestamp int) ([]types.Receipt, error) { + rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, timestamp) + if err != nil { + return nil, errors.Wrap(err, "unable to query room receipts") + } + var res []types.Receipt + for rows.Next() { + r := types.Receipt{} + err = rows.Scan(&r.ID, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.TS) + if err != nil { + return res, errors.Wrap(err, "unable to scan row to api.Receipts") + } + res = append(res, r) + } + if rows.Err() != nil || rows.Close() != nil { + return res, errors.Wrap(err, "error while scanning rows or error closing rows") + } + return res, nil +} diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 80f1811c6..623f14b17 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -1031,3 +1031,7 @@ func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId return d.ReceiptsTable.UpsertReceipt(ctx, txn, roomId, receiptType, userId, eventId) }) } + +func (d *Database) GetRoomReceipts(ctx context.Context, roomId string, timestamp int) ([]types.Receipt, error) { + return d.ReceiptsTable.SelectRoomReceiptsAfter(ctx, roomId, timestamp) +} diff --git a/roomserver/storage/sqlite3/receipt_table.go b/roomserver/storage/sqlite3/receipt_table.go index bae699678..db1a7bb09 100644 --- a/roomserver/storage/sqlite3/receipt_table.go +++ b/roomserver/storage/sqlite3/receipt_table.go @@ -19,10 +19,10 @@ import ( "database/sql" "time" - "github.com/pkg/errors" - "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/tables" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/pkg/errors" ) const receiptsSchema = ` @@ -49,9 +49,15 @@ const upsertReceipt = "" + " ON CONFLICT (room_id, receipt_type, user_id)" + " DO UPDATE SET event_id = $4, receipt_ts = $5" +const selectRoomReceipts = "" + + "SELECT id, room_id, receipt_type, user_id, event_id, receipt_ts" + + " FROM roomserver_receipts" + + " WHERE room_id = $1 AND receipt_ts > $2" + type receiptStatements struct { - db *sql.DB - upsertReceipt *sql.Stmt + db *sql.DB + upsertReceipt *sql.Stmt + selectRoomReceipts *sql.Stmt } func NewSqliteReceiptsTable(db *sql.DB) (tables.Receipts, error) { @@ -65,13 +71,37 @@ func NewSqliteReceiptsTable(db *sql.DB) (tables.Receipts, error) { if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement") } - + if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { + return nil, errors.Wrap(err, "unable to prepare selectRoomReceipts statement") + } return r, nil } +// UpsertReceipt creates new user receipts func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string) error { receiptTs := time.Now().UnixNano() / int64(time.Millisecond) stmt := sqlutil.TxStmt(txn, r.upsertReceipt) _, err := stmt.ExecContext(ctx, roomId, receiptType, userId, eventId, receiptTs) return err } + +// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp +func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, timestamp int) ([]types.Receipt, error) { + rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, timestamp) + if err != nil { + return []types.Receipt{}, errors.Wrap(err, "unable to query room receipts") + } + var res []types.Receipt + for rows.Next() { + r := types.Receipt{} + err = rows.Scan(&r.ID, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.TS) + if err != nil { + return res, errors.Wrap(err, "unable to scan row to types.Receipts") + } + res = append(res, r) + } + if rows.Err() != nil { + return res, errors.Wrap(err, "error while scanning rows") + } + return res, rows.Close() +} diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index d7b81c3b3..a1ac16957 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -204,4 +204,5 @@ func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string { type Receipts interface { UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string) error + SelectRoomReceiptsAfter(ctx context.Context, roomId string, timestamp int) ([]types.Receipt, error) }