Implement postgres and sqlite3 storage for receipts

This commit is contained in:
Till Faelligen 2020-10-13 17:34:40 +02:00
parent 0815809634
commit 5b53b7c80b
5 changed files with 75 additions and 11 deletions

View file

@ -158,5 +158,6 @@ type Database interface {
GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error)
// GetKnownRooms returns a list of all rooms we know about.
GetKnownRooms(ctx context.Context) ([]string, error)
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string) error
StoreReceipt(ctx context.Context, roomID, receiptType, userID, eventID string) error
GetRoomReceipts(ctx context.Context, roomID string, timestamp int) ([]types.Receipt, error)
}

View file

@ -19,10 +19,10 @@ import (
"database/sql"
"time"
"github.com/pkg/errors"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/pkg/errors"
)
const receiptsSchema = `
@ -49,9 +49,15 @@ const upsertReceipt = "" +
" ON CONFLICT (room_id, receipt_type, user_id)" +
" DO UPDATE SET event_id = $4, receipt_ts = $5"
const selectRoomReceipts = "" +
"SELECT id, room_id, receipt_type, user_id, event_id, receipt_ts" +
" FROM roomserver_receipts" +
" WHERE room_id = $1 AND receipt_ts > $2"
type receiptStatements struct {
db *sql.DB
upsertReceipt *sql.Stmt
db *sql.DB
upsertReceipt *sql.Stmt
selectRoomReceipts *sql.Stmt
}
func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
@ -65,7 +71,9 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil {
return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement")
}
if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil {
return nil, errors.Wrap(err, "unable to prepare selectRoomReceipts statement")
}
return r, nil
}
@ -75,3 +83,23 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
_, err := stmt.ExecContext(ctx, roomId, receiptType, userId, eventId, receiptTs)
return err
}
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, timestamp int) ([]types.Receipt, error) {
rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, timestamp)
if err != nil {
return nil, errors.Wrap(err, "unable to query room receipts")
}
var res []types.Receipt
for rows.Next() {
r := types.Receipt{}
err = rows.Scan(&r.ID, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.TS)
if err != nil {
return res, errors.Wrap(err, "unable to scan row to api.Receipts")
}
res = append(res, r)
}
if rows.Err() != nil || rows.Close() != nil {
return res, errors.Wrap(err, "error while scanning rows or error closing rows")
}
return res, nil
}

View file

@ -1031,3 +1031,7 @@ func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId
return d.ReceiptsTable.UpsertReceipt(ctx, txn, roomId, receiptType, userId, eventId)
})
}
func (d *Database) GetRoomReceipts(ctx context.Context, roomId string, timestamp int) ([]types.Receipt, error) {
return d.ReceiptsTable.SelectRoomReceiptsAfter(ctx, roomId, timestamp)
}

View file

@ -19,10 +19,10 @@ import (
"database/sql"
"time"
"github.com/pkg/errors"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/pkg/errors"
)
const receiptsSchema = `
@ -49,9 +49,15 @@ const upsertReceipt = "" +
" ON CONFLICT (room_id, receipt_type, user_id)" +
" DO UPDATE SET event_id = $4, receipt_ts = $5"
const selectRoomReceipts = "" +
"SELECT id, room_id, receipt_type, user_id, event_id, receipt_ts" +
" FROM roomserver_receipts" +
" WHERE room_id = $1 AND receipt_ts > $2"
type receiptStatements struct {
db *sql.DB
upsertReceipt *sql.Stmt
db *sql.DB
upsertReceipt *sql.Stmt
selectRoomReceipts *sql.Stmt
}
func NewSqliteReceiptsTable(db *sql.DB) (tables.Receipts, error) {
@ -65,13 +71,37 @@ func NewSqliteReceiptsTable(db *sql.DB) (tables.Receipts, error) {
if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil {
return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement")
}
if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil {
return nil, errors.Wrap(err, "unable to prepare selectRoomReceipts statement")
}
return r, nil
}
// UpsertReceipt creates new user receipts
func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string) error {
receiptTs := time.Now().UnixNano() / int64(time.Millisecond)
stmt := sqlutil.TxStmt(txn, r.upsertReceipt)
_, err := stmt.ExecContext(ctx, roomId, receiptType, userId, eventId, receiptTs)
return err
}
// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, timestamp int) ([]types.Receipt, error) {
rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, timestamp)
if err != nil {
return []types.Receipt{}, errors.Wrap(err, "unable to query room receipts")
}
var res []types.Receipt
for rows.Next() {
r := types.Receipt{}
err = rows.Scan(&r.ID, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.TS)
if err != nil {
return res, errors.Wrap(err, "unable to scan row to types.Receipts")
}
res = append(res, r)
}
if rows.Err() != nil {
return res, errors.Wrap(err, "error while scanning rows")
}
return res, rows.Close()
}

View file

@ -204,4 +204,5 @@ func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string {
type Receipts interface {
UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string) error
SelectRoomReceiptsAfter(ctx context.Context, roomId string, timestamp int) ([]types.Receipt, error)
}