diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 1b405c222..c5d17414a 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -1,4 +1,4 @@ -// Copyright 2019 Alex Chen +// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index ca00e3a2e..caac076f5 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -149,8 +149,8 @@ type Database interface { PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error) // RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error - // StoreReceipt stores new receipt events ofr + // StoreReceipt stores new receipt events StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) // GetRoomReceipts gets all receipts for a given roomID - GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) + GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) } diff --git a/syncapi/storage/postgres/receipt_table.go b/syncapi/storage/postgres/receipt_table.go index 8cde95c12..0ceafc818 100644 --- a/syncapi/storage/postgres/receipt_table.go +++ b/syncapi/storage/postgres/receipt_table.go @@ -17,15 +17,14 @@ package postgres import ( "context" "database/sql" - - "github.com/matrix-org/dendrite/syncapi/types" - - "github.com/matrix-org/gomatrixserverlib" + "fmt" "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/tables" - "github.com/pkg/errors" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" ) const receiptsSchema = ` @@ -71,10 +70,10 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) { db: db, } if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { - return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement") + return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err) } if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { - return nil, errors.Wrap(err, "unable to prepare selectRoomReceipts statement") + return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err) } return r, nil } @@ -85,22 +84,20 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room return } -func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]api.InputReceiptEvent, error) { +func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) { rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, streamPos) if err != nil { - return nil, errors.Wrap(err, "unable to query room receipts") + return nil, fmt.Errorf("unable to query room receipts: %w", err) } - var res []api.InputReceiptEvent + defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed") + var res []api.OutputReceiptEvent for rows.Next() { - r := api.InputReceiptEvent{} + r := api.OutputReceiptEvent{} err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) if err != nil { - return res, errors.Wrap(err, "unable to scan row to api.Receipts") + return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err) } res = append(res, r) } - if rows.Err() != nil || rows.Close() != nil { - return res, errors.Wrap(err, "error while scanning rows or error closing rows") - } - return res, nil + return res, rows.Err() } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index f2ea86c38..f2133dc89 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1475,6 +1475,6 @@ func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId return } -func (d *Database) GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) { +func (d *Database) GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) { return d.Receipts.SelectRoomReceiptsAfter(ctx, roomId, streamPos) } diff --git a/syncapi/storage/sqlite3/receipt_table.go b/syncapi/storage/sqlite3/receipt_table.go index 8d62a6554..961af79fd 100644 --- a/syncapi/storage/sqlite3/receipt_table.go +++ b/syncapi/storage/sqlite3/receipt_table.go @@ -17,16 +17,14 @@ package sqlite3 import ( "context" "database/sql" - - "github.com/matrix-org/dendrite/syncapi/types" - - "github.com/matrix-org/gomatrixserverlib" + "fmt" "github.com/matrix-org/dendrite/eduserver/api" - + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/tables" - "github.com/pkg/errors" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" ) const receiptsSchema = ` @@ -49,7 +47,7 @@ const upsertReceipt = "" + " (id, room_id, receipt_type, user_id, event_id, receipt_ts)" + " VALUES ($1, $2, $3, $4, $5, $6)" + " ON CONFLICT (room_id, receipt_type, user_id)" + - " DO UPDATE SET id = $1, event_id = $5, receipt_ts = $6" + " DO UPDATE SET id = $7, event_id = $8, receipt_ts = $9" const selectRoomReceipts = "" + "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" + @@ -73,10 +71,10 @@ func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Re streamIDStatements: streamID, } if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { - return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement") + return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err) } if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { - return nil, errors.Wrap(err, "unable to prepare selectRoomReceipts statement") + return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err) } return r, nil } @@ -88,27 +86,25 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room return } stmt := sqlutil.TxStmt(txn, r.upsertReceipt) - _, err = stmt.ExecContext(ctx, pos, roomId, receiptType, userId, eventId, timestamp) + _, err = stmt.ExecContext(ctx, pos, roomId, receiptType, userId, eventId, timestamp, pos, eventId, timestamp) return } // SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp -func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]api.InputReceiptEvent, error) { +func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) { rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, streamPos) if err != nil { - return []api.InputReceiptEvent{}, errors.Wrap(err, "unable to query room receipts") + return nil, fmt.Errorf("unable to query room receipts: %w", err) } - var res []api.InputReceiptEvent + defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed") + var res []api.OutputReceiptEvent for rows.Next() { - r := api.InputReceiptEvent{} + r := api.OutputReceiptEvent{} err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) if err != nil { - return res, errors.Wrap(err, "unable to scan row to types.Receipts") + return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err) } res = append(res, r) } - if rows.Err() != nil { - return res, errors.Wrap(err, "error while scanning rows") - } - return res, rows.Close() + return res, rows.Err() } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index a543d8984..b57eef6df 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -160,5 +160,5 @@ type Filter interface { type Receipts interface { UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) - SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) + SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) }