- Use OutputReceiptEvent instead of InputReceiptEvent as result

- Don't use the errors package for errors
- Defer CloseAndLogIfError to close rows
- Fix Copyright
This commit is contained in:
Till Faelligen 2020-10-16 16:16:44 +02:00
parent 27eac8f915
commit c428d52652
6 changed files with 33 additions and 40 deletions

View file

@ -1,4 +1,4 @@
// Copyright 2019 Alex Chen // Copyright 2020 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.

View file

@ -149,8 +149,8 @@ type Database interface {
PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error) PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error)
// RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event // RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event
RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error
// StoreReceipt stores new receipt events ofr // StoreReceipt stores new receipt events
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
// GetRoomReceipts gets all receipts for a given roomID // GetRoomReceipts gets all receipts for a given roomID
GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
} }

View file

@ -17,15 +17,14 @@ package postgres
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/pkg/errors" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
) )
const receiptsSchema = ` const receiptsSchema = `
@ -71,10 +70,10 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
db: db, db: db,
} }
if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil {
return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement") return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err)
} }
if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil {
return nil, errors.Wrap(err, "unable to prepare selectRoomReceipts statement") return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
} }
return r, nil return r, nil
} }
@ -85,22 +84,20 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
return return
} }
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]api.InputReceiptEvent, error) { func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) {
rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, streamPos) rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, streamPos)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "unable to query room receipts") return nil, fmt.Errorf("unable to query room receipts: %w", err)
} }
var res []api.InputReceiptEvent defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
var res []api.OutputReceiptEvent
for rows.Next() { for rows.Next() {
r := api.InputReceiptEvent{} r := api.OutputReceiptEvent{}
err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
if err != nil { if err != nil {
return res, errors.Wrap(err, "unable to scan row to api.Receipts") return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err)
} }
res = append(res, r) res = append(res, r)
} }
if rows.Err() != nil || rows.Close() != nil { return res, rows.Err()
return res, errors.Wrap(err, "error while scanning rows or error closing rows")
}
return res, nil
} }

View file

@ -1475,6 +1475,6 @@ func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId
return return
} }
func (d *Database) GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) { func (d *Database) GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) {
return d.Receipts.SelectRoomReceiptsAfter(ctx, roomId, streamPos) return d.Receipts.SelectRoomReceiptsAfter(ctx, roomId, streamPos)
} }

View file

@ -17,16 +17,14 @@ package sqlite3
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/pkg/errors" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
) )
const receiptsSchema = ` const receiptsSchema = `
@ -49,7 +47,7 @@ const upsertReceipt = "" +
" (id, room_id, receipt_type, user_id, event_id, receipt_ts)" + " (id, room_id, receipt_type, user_id, event_id, receipt_ts)" +
" VALUES ($1, $2, $3, $4, $5, $6)" + " VALUES ($1, $2, $3, $4, $5, $6)" +
" ON CONFLICT (room_id, receipt_type, user_id)" + " ON CONFLICT (room_id, receipt_type, user_id)" +
" DO UPDATE SET id = $1, event_id = $5, receipt_ts = $6" " DO UPDATE SET id = $7, event_id = $8, receipt_ts = $9"
const selectRoomReceipts = "" + const selectRoomReceipts = "" +
"SELECT room_id, receipt_type, user_id, event_id, receipt_ts" + "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" +
@ -73,10 +71,10 @@ func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Re
streamIDStatements: streamID, streamIDStatements: streamID,
} }
if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil {
return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement") return nil, fmt.Errorf("unable to prepare upsertReceipt statement: %w", err)
} }
if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil {
return nil, errors.Wrap(err, "unable to prepare selectRoomReceipts statement") return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
} }
return r, nil return r, nil
} }
@ -88,27 +86,25 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
return return
} }
stmt := sqlutil.TxStmt(txn, r.upsertReceipt) stmt := sqlutil.TxStmt(txn, r.upsertReceipt)
_, err = stmt.ExecContext(ctx, pos, roomId, receiptType, userId, eventId, timestamp) _, err = stmt.ExecContext(ctx, pos, roomId, receiptType, userId, eventId, timestamp, pos, eventId, timestamp)
return return
} }
// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp // SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]api.InputReceiptEvent, error) { func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]api.OutputReceiptEvent, error) {
rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, streamPos) rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, streamPos)
if err != nil { if err != nil {
return []api.InputReceiptEvent{}, errors.Wrap(err, "unable to query room receipts") return nil, fmt.Errorf("unable to query room receipts: %w", err)
} }
var res []api.InputReceiptEvent defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
var res []api.OutputReceiptEvent
for rows.Next() { for rows.Next() {
r := api.InputReceiptEvent{} r := api.OutputReceiptEvent{}
err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
if err != nil { if err != nil {
return res, errors.Wrap(err, "unable to scan row to types.Receipts") return res, fmt.Errorf("unable to scan row to api.Receipts: %w", err)
} }
res = append(res, r) res = append(res, r)
} }
if rows.Err() != nil { return res, rows.Err()
return res, errors.Wrap(err, "error while scanning rows")
}
return res, rows.Close()
} }

View file

@ -160,5 +160,5 @@ type Filter interface {
type Receipts interface { type Receipts interface {
UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
} }