Use real receipt positions

This commit is contained in:
Neil Alexander 2020-12-18 09:23:06 +00:00
parent 25febe0ab7
commit 92eadf18b9
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 28 additions and 18 deletions

View file

@ -55,7 +55,7 @@ const upsertReceipt = "" +
" RETURNING id" " RETURNING id"
const selectRoomReceipts = "" + const selectRoomReceipts = "" +
"SELECT room_id, receipt_type, user_id, event_id, receipt_ts" + "SELECT id, room_id, receipt_type, user_id, event_id, receipt_ts" +
" FROM syncapi_receipts" + " FROM syncapi_receipts" +
" WHERE room_id = ANY($1) AND id > $2" " WHERE room_id = ANY($1) AND id > $2"
@ -95,22 +95,27 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
return return
} }
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) { func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
lastPos := types.StreamPosition(0)
rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos) rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to query room receipts: %w", err) return 0, nil, fmt.Errorf("unable to query room receipts: %w", err)
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
var res []api.OutputReceiptEvent var res []api.OutputReceiptEvent
for rows.Next() { for rows.Next() {
r := api.OutputReceiptEvent{} r := api.OutputReceiptEvent{}
err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) var id types.StreamPosition
err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
if err != nil { if err != nil {
return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err) return 0, res, fmt.Errorf("unable to scan row to api.Receipts: %w", err)
} }
res = append(res, r) res = append(res, r)
if id > lastPos {
lastPos = id
} }
return res, rows.Err() }
return lastPos, res, rows.Err()
} }
func (s *receiptStatements) SelectMaxReceiptID( func (s *receiptStatements) SelectMaxReceiptID(

View file

@ -578,8 +578,8 @@ func (d *Database) addTypingDeltaToResponse(
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
res.Rooms.Join[roomID] = jr res.Rooms.Join[roomID] = jr
} }
res.NextBatch.TypingPosition = types.StreamPosition(d.EDUCache.GetLatestSyncPosition())
} }
res.NextBatch.TypingPosition = types.StreamPosition(d.EDUCache.GetLatestSyncPosition())
return nil return nil
} }
@ -590,7 +590,7 @@ func (d *Database) addReceiptDeltaToResponse(
joinedRoomIDs []string, joinedRoomIDs []string,
res *types.Response, res *types.Response,
) error { ) error {
receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition) lastPos, receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition)
if err != nil { if err != nil {
return fmt.Errorf("unable to select receipts for rooms: %w", err) return fmt.Errorf("unable to select receipts for rooms: %w", err)
} }
@ -625,7 +625,6 @@ func (d *Database) addReceiptDeltaToResponse(
} }
read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp} read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp}
content[receipt.EventID] = read content[receipt.EventID] = read
res.NextBatch.ReceiptPosition++
} }
ev.Content, err = json.Marshal(content) ev.Content, err = json.Marshal(content)
if err != nil { if err != nil {
@ -636,6 +635,7 @@ func (d *Database) addReceiptDeltaToResponse(
res.Rooms.Join[roomID] = jr res.Rooms.Join[roomID] = jr
} }
res.NextBatch.ReceiptPosition = lastPos
return nil return nil
} }
@ -1527,5 +1527,6 @@ func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId
} }
func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) { func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) {
return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) _, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
return receipts, err
} }

View file

@ -51,7 +51,7 @@ const upsertReceipt = "" +
" DO UPDATE SET id = $7, event_id = $8, receipt_ts = $9" " DO UPDATE SET id = $7, event_id = $8, receipt_ts = $9"
const selectRoomReceipts = "" + const selectRoomReceipts = "" +
"SELECT room_id, receipt_type, user_id, event_id, receipt_ts" + "SELECT id, room_id, receipt_type, user_id, event_id, receipt_ts" +
" FROM syncapi_receipts" + " FROM syncapi_receipts" +
" WHERE id > $1 and room_id in ($2)" " WHERE id > $1 and room_id in ($2)"
@ -99,9 +99,9 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
} }
// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp // SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) { func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1) selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1)
lastPos := types.StreamPosition(0)
params := make([]interface{}, len(roomIDs)+1) params := make([]interface{}, len(roomIDs)+1)
params[0] = streamPos params[0] = streamPos
for k, v := range roomIDs { for k, v := range roomIDs {
@ -109,19 +109,23 @@ func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs
} }
rows, err := r.db.QueryContext(ctx, selectSQL, params...) rows, err := r.db.QueryContext(ctx, selectSQL, params...)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to query room receipts: %w", err) return 0, nil, fmt.Errorf("unable to query room receipts: %w", err)
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
var res []api.OutputReceiptEvent var res []api.OutputReceiptEvent
for rows.Next() { for rows.Next() {
r := api.OutputReceiptEvent{} r := api.OutputReceiptEvent{}
err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) var id types.StreamPosition
err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
if err != nil { if err != nil {
return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err) return 0, res, fmt.Errorf("unable to scan row to api.Receipts: %w", err)
} }
res = append(res, r) res = append(res, r)
if id > lastPos {
lastPos = id
} }
return res, rows.Err() }
return lastPos, res, rows.Err()
} }
func (s *receiptStatements) SelectMaxReceiptID( func (s *receiptStatements) SelectMaxReceiptID(

View file

@ -160,6 +160,6 @@ type Filter interface {
type Receipts interface { type Receipts interface {
UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error) SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
} }