From 92eadf18b91ced30f81bca047e044aed750ab987 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 18 Dec 2020 09:23:06 +0000 Subject: [PATCH] Use real receipt positions --- syncapi/storage/postgres/receipt_table.go | 17 +++++++++++------ syncapi/storage/shared/syncserver.go | 9 +++++---- syncapi/storage/sqlite3/receipt_table.go | 18 +++++++++++------- syncapi/storage/tables/interface.go | 2 +- 4 files changed, 28 insertions(+), 18 deletions(-) diff --git a/syncapi/storage/postgres/receipt_table.go b/syncapi/storage/postgres/receipt_table.go index 23c66910f..73bf4179e 100644 --- a/syncapi/storage/postgres/receipt_table.go +++ b/syncapi/storage/postgres/receipt_table.go @@ -55,7 +55,7 @@ const upsertReceipt = "" + " RETURNING id" const selectRoomReceipts = "" + - "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" + + "SELECT id, room_id, receipt_type, user_id, event_id, receipt_ts" + " FROM syncapi_receipts" + " WHERE room_id = ANY($1) AND id > $2" @@ -95,22 +95,27 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room return } -func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) { +func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) { + lastPos := types.StreamPosition(0) rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos) if err != nil { - return nil, fmt.Errorf("unable to query room receipts: %w", err) + return 0, nil, fmt.Errorf("unable to query room receipts: %w", err) } defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed") var res []api.OutputReceiptEvent for rows.Next() { r := api.OutputReceiptEvent{} - err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) + var id types.StreamPosition + err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) if err != nil { - return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err) + return 0, res, fmt.Errorf("unable to scan row to api.Receipts: %w", err) } res = append(res, r) + if id > lastPos { + lastPos = id + } } - return res, rows.Err() + return lastPos, res, rows.Err() } func (s *receiptStatements) SelectMaxReceiptID( diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index e703c9e7d..003ace4dc 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -578,8 +578,8 @@ func (d *Database) addTypingDeltaToResponse( jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) res.Rooms.Join[roomID] = jr } - res.NextBatch.TypingPosition = types.StreamPosition(d.EDUCache.GetLatestSyncPosition()) } + res.NextBatch.TypingPosition = types.StreamPosition(d.EDUCache.GetLatestSyncPosition()) return nil } @@ -590,7 +590,7 @@ func (d *Database) addReceiptDeltaToResponse( joinedRoomIDs []string, res *types.Response, ) error { - receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition) + lastPos, receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition) if err != nil { return fmt.Errorf("unable to select receipts for rooms: %w", err) } @@ -625,7 +625,6 @@ func (d *Database) addReceiptDeltaToResponse( } read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp} content[receipt.EventID] = read - res.NextBatch.ReceiptPosition++ } ev.Content, err = json.Marshal(content) if err != nil { @@ -636,6 +635,7 @@ func (d *Database) addReceiptDeltaToResponse( res.Rooms.Join[roomID] = jr } + res.NextBatch.ReceiptPosition = lastPos return nil } @@ -1527,5 +1527,6 @@ func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId } func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) { - return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) + _, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) + return receipts, err } diff --git a/syncapi/storage/sqlite3/receipt_table.go b/syncapi/storage/sqlite3/receipt_table.go index dfde1fd2d..69fc4e9d0 100644 --- a/syncapi/storage/sqlite3/receipt_table.go +++ b/syncapi/storage/sqlite3/receipt_table.go @@ -51,7 +51,7 @@ const upsertReceipt = "" + " DO UPDATE SET id = $7, event_id = $8, receipt_ts = $9" const selectRoomReceipts = "" + - "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" + + "SELECT id, room_id, receipt_type, user_id, event_id, receipt_ts" + " FROM syncapi_receipts" + " WHERE id > $1 and room_id in ($2)" @@ -99,9 +99,9 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room } // SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp -func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) { +func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) { selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1) - + lastPos := types.StreamPosition(0) params := make([]interface{}, len(roomIDs)+1) params[0] = streamPos for k, v := range roomIDs { @@ -109,19 +109,23 @@ func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs } rows, err := r.db.QueryContext(ctx, selectSQL, params...) if err != nil { - return nil, fmt.Errorf("unable to query room receipts: %w", err) + return 0, nil, fmt.Errorf("unable to query room receipts: %w", err) } defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed") var res []api.OutputReceiptEvent for rows.Next() { r := api.OutputReceiptEvent{} - err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) + var id types.StreamPosition + err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) if err != nil { - return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err) + return 0, res, fmt.Errorf("unable to scan row to api.Receipts: %w", err) } res = append(res, r) + if id > lastPos { + lastPos = id + } } - return res, rows.Err() + return lastPos, res, rows.Err() } func (s *receiptStatements) SelectMaxReceiptID( diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 7a166d439..8ece8553f 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -160,6 +160,6 @@ type Filter interface { type Receipts interface { UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) - SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) + SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error) }