Add max receipt queries, missing stream_id table entry for SQLite

This commit is contained in:
Neil Alexander 2020-12-11 13:13:28 +00:00
parent 88685be9d2
commit 7eb4c506d9
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
6 changed files with 49 additions and 2 deletions

View file

@ -59,10 +59,14 @@ const selectRoomReceipts = "" +
" FROM syncapi_receipts" + " FROM syncapi_receipts" +
" WHERE room_id = ANY($1) AND id > $2" " WHERE room_id = ANY($1) AND id > $2"
const selectMaxReceiptIDSQL = "" +
"SELECT MAX(id) FROM syncapi_receipts"
type receiptStatements struct { type receiptStatements struct {
db *sql.DB db *sql.DB
upsertReceipt *sql.Stmt upsertReceipt *sql.Stmt
selectRoomReceipts *sql.Stmt selectRoomReceipts *sql.Stmt
selectMaxReceiptID *sql.Stmt
} }
func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) { func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
@ -79,6 +83,9 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil {
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err) return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
} }
if r.selectMaxReceiptID, err = db.Prepare(selectMaxReceiptIDSQL); err != nil {
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
}
return r, nil return r, nil
} }
@ -105,3 +112,15 @@ func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs
} }
return res, rows.Err() return res, rows.Err()
} }
func (s *receiptStatements) SelectMaxReceiptID(
ctx context.Context, txn *sql.Tx,
) (id int64, err error) {
var nullableID sql.NullInt64
stmt := sqlutil.TxStmt(txn, s.selectMaxReceiptID)
err = stmt.QueryRowContext(ctx).Scan(&nullableID)
if nullableID.Valid {
id = nullableID.Int64
}
return
}

View file

@ -483,10 +483,15 @@ func (d *Database) syncPositionTx(
if maxPeekID > maxEventID { if maxPeekID > maxEventID {
maxEventID = maxPeekID maxEventID = maxPeekID
} }
maxReceiptID, err := d.Receipts.SelectMaxReceiptID(ctx, txn)
if err != nil {
return sp, err
}
// TODO: complete these positions // TODO: complete these positions
sp = types.StreamingToken{ sp = types.StreamingToken{
PDUPosition: types.StreamPosition(maxEventID), PDUPosition: types.StreamPosition(maxEventID),
TypingPosition: types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), TypingPosition: types.StreamPosition(d.EDUCache.GetLatestSyncPosition()),
ReceiptPosition: types.StreamPosition(maxReceiptID),
} }
return return
} }

View file

@ -55,11 +55,15 @@ const selectRoomReceipts = "" +
" FROM syncapi_receipts" + " FROM syncapi_receipts" +
" WHERE id > $1 and room_id in ($2)" " WHERE id > $1 and room_id in ($2)"
const selectMaxReceiptIDSQL = "" +
"SELECT MAX(id) FROM syncapi_receipts"
type receiptStatements struct { type receiptStatements struct {
db *sql.DB db *sql.DB
streamIDStatements *streamIDStatements streamIDStatements *streamIDStatements
upsertReceipt *sql.Stmt upsertReceipt *sql.Stmt
selectRoomReceipts *sql.Stmt selectRoomReceipts *sql.Stmt
selectMaxReceiptID *sql.Stmt
} }
func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Receipts, error) { func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Receipts, error) {
@ -77,6 +81,9 @@ func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Re
if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil {
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err) return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
} }
if r.selectMaxReceiptID, err = db.Prepare(selectMaxReceiptIDSQL); err != nil {
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
}
return r, nil return r, nil
} }
@ -116,3 +123,15 @@ func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs
} }
return res, rows.Err() return res, rows.Err()
} }
func (s *receiptStatements) SelectMaxReceiptID(
ctx context.Context, txn *sql.Tx,
) (id int64, err error) {
var nullableID sql.NullInt64
stmt := sqlutil.TxStmt(txn, s.selectMaxReceiptID)
err = stmt.QueryRowContext(ctx).Scan(&nullableID)
if nullableID.Valid {
id = nullableID.Int64
}
return
}

View file

@ -18,6 +18,8 @@ CREATE TABLE IF NOT EXISTS syncapi_stream_id (
); );
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("global", 0) INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("global", 0)
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("receipt", 0)
ON CONFLICT DO NOTHING;
` `
const increaseStreamIDStmt = "" + const increaseStreamIDStmt = "" +

View file

@ -53,6 +53,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
return &d, nil return &d, nil
} }
// nolint:gocyclo
func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (err error) { func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (err error) {
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
return err return err

View file

@ -161,4 +161,5 @@ type Filter interface {
type Receipts interface { type Receipts interface {
UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
} }