From ceb7ca7c7d6a2bf1bdeda3fe26e3a2f0bf762232 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Wed, 14 Oct 2020 21:49:41 +0200 Subject: [PATCH] Add receipts table to syncapi --- syncapi/storage/interface.go | 6 ++ syncapi/storage/postgres/receipt_table.go | 106 ++++++++++++++++++++ syncapi/storage/postgres/syncserver.go | 5 + syncapi/storage/shared/syncserver.go | 64 +++++++++++- syncapi/storage/sqlite3/receipt_table.go | 114 ++++++++++++++++++++++ syncapi/storage/sqlite3/syncserver.go | 5 + syncapi/storage/tables/interface.go | 6 ++ 7 files changed, 302 insertions(+), 4 deletions(-) create mode 100644 syncapi/storage/postgres/receipt_table.go create mode 100644 syncapi/storage/sqlite3/receipt_table.go diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index ce7f1c152..57ba2c2a7 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -18,6 +18,8 @@ import ( "context" "time" + eduAPI "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" @@ -147,4 +149,8 @@ type Database interface { PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error) // RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error + // StoreReceipt stores new receipt events ofr + StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) error + // GetRoomReceipts gets all receipts for a given roomID + GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) } diff --git a/syncapi/storage/postgres/receipt_table.go b/syncapi/storage/postgres/receipt_table.go new file mode 100644 index 000000000..e21b2e7a3 --- /dev/null +++ b/syncapi/storage/postgres/receipt_table.go @@ -0,0 +1,106 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/syncapi/types" + + "github.com/matrix-org/gomatrixserverlib" + + "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/pkg/errors" +) + +const receiptsSchema = ` +-- Stores data about receipts +CREATE TABLE IF NOT EXISTS syncapi_receipts ( + -- The ID + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL, + receipt_ts BIGINT NOT NULL, + CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id) +); +CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id ON syncapi_receipts(room_id); +` + +const upsertReceipt = "" + + "INSERT INTO syncapi_receipts" + + " (room_id, receipt_type, user_id, event_id, receipt_ts)" + + " VALUES ($1, $2, $3, $4, $5)" + + " ON CONFLICT (room_id, receipt_type, user_id)" + + " DO UPDATE SET event_id = $4, receipt_ts = $5" + + " RETURNING id" + +const selectRoomReceipts = "" + + "SELECT id, room_id, receipt_type, user_id, event_id, receipt_ts" + + " FROM syncapi_receipts" + + " WHERE room_id = $1 AND id > $2" + +type receiptStatements struct { + db *sql.DB + upsertReceipt *sql.Stmt + selectRoomReceipts *sql.Stmt +} + +func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) { + _, err := db.Exec(receiptsSchema) + if err != nil { + return nil, err + } + r := &receiptStatements{ + db: db, + } + if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { + return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement") + } + if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { + return nil, errors.Wrap(err, "unable to prepare selectRoomReceipts statement") + } + return r, nil +} + +func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) { + stmt := sqlutil.TxStmt(txn, r.upsertReceipt) + err = stmt.QueryRowContext(ctx, roomId, receiptType, userId, eventId, timestamp).Scan(&pos) + return +} + +func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]api.InputReceiptEvent, error) { + rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, streamPos) + if err != nil { + return nil, errors.Wrap(err, "unable to query room receipts") + } + var res []api.InputReceiptEvent + for rows.Next() { + r := api.InputReceiptEvent{} + err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) + if err != nil { + return res, errors.Wrap(err, "unable to scan row to api.Receipts") + } + res = append(res, r) + } + if rows.Err() != nil || rows.Close() != nil { + return res, errors.Wrap(err, "error while scanning rows or error closing rows") + } + return res, nil +} diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 7f19722ae..979e19a0b 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -82,6 +82,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e if err != nil { return nil, err } + receipts, err := NewPostgresReceiptsTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer, @@ -94,6 +98,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e BackwardExtremities: backwardExtremities, Filter: filter, SendToDevice: sendToDevice, + Receipts: receipts, EDUCache: cache.New(), } return &d, nil diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index b9f21913e..74a640aab 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + eduAPI "github.com/matrix-org/dendrite/eduserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/eduserver/cache" @@ -47,6 +48,7 @@ type Database struct { BackwardExtremities tables.BackwardsExtremities SendToDevice tables.SendToDevice Filter tables.Filter + Receipts tables.Receipts EDUCache *cache.EDUCache } @@ -563,6 +565,44 @@ func (d *Database) addTypingDeltaToResponse( return nil } +func (d *Database) addReceiptDeltaToResponse(since types.StreamPosition, joinedRoomIDs []string, res *types.Response) error { + var jr types.JoinResponse + // check all joinedRooms for receipts + for _, roomID := range joinedRoomIDs { + receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), roomID, since) + if err != nil { + return err + } + + for _, receipt := range receipts { + data := map[string]interface{}{ + receipt.EventID: map[string]interface{}{ + "m.read": map[string]interface{}{ + receipt.UserID: struct { + gomatrixserverlib.Timestamp `json:"ts"` + }{ + receipt.Timestamp, + }, + }, + }, + } + + ev := gomatrixserverlib.ClientEvent{ + Type: "m.receipt", + RoomID: roomID, + } + ev.Content, err = json.Marshal(data) + if err != nil { + return err + } + jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) + res.Rooms.Join[roomID] = jr + } + } + + return nil +} + // addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if // the positions of that type are not equal in fromPos and toPos. func (d *Database) addEDUDeltaToResponse( @@ -570,11 +610,15 @@ func (d *Database) addEDUDeltaToResponse( joinedRoomIDs []string, res *types.Response, ) (err error) { - if fromPos.EDUPosition() != toPos.EDUPosition() { - err = d.addTypingDeltaToResponse( - fromPos, joinedRoomIDs, res, - ) + // add typing deltas + if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil { + return + } + // add receipt deltas + if err := d.addReceiptDeltaToResponse(fromPos.EDUPosition(), joinedRoomIDs, res); err != nil { + return + } } return @@ -1413,3 +1457,15 @@ type stateDelta struct { // Can be 0 if there is no membership event in this delta. membershipPos types.StreamPosition } + +// StoreReceipt stores user receipts +func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + _, err := d.Receipts.UpsertReceipt(ctx, txn, roomId, receiptType, userId, eventId, timestamp) + return err + }) +} + +func (d *Database) GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) { + return d.Receipts.SelectRoomReceiptsAfter(ctx, roomId, streamPos) +} diff --git a/syncapi/storage/sqlite3/receipt_table.go b/syncapi/storage/sqlite3/receipt_table.go new file mode 100644 index 000000000..802570ea2 --- /dev/null +++ b/syncapi/storage/sqlite3/receipt_table.go @@ -0,0 +1,114 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/syncapi/types" + + "github.com/matrix-org/gomatrixserverlib" + + "github.com/matrix-org/dendrite/eduserver/api" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/pkg/errors" +) + +const receiptsSchema = ` +-- Stores data about receipts +CREATE TABLE IF NOT EXISTS syncapi_receipts ( + -- The ID + id BIGINT, + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL, + receipt_ts BIGINT NOT NULL, + CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id) +); +CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id_idx ON syncapi_receipts(room_id); +` + +const upsertReceipt = "" + + "INSERT INTO syncapi_receipts" + + " (id, room_id, receipt_type, user_id, event_id, receipt_ts)" + + " VALUES ($1, $2, $3, $4, $5, $6)" + + " ON CONFLICT (room_id, receipt_type, user_id)" + + " DO UPDATE SET id = $1, event_id = $4, receipt_ts = $5" + +const selectRoomReceipts = "" + + "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" + + " FROM syncapi_receipts" + + " WHERE room_id = $1 AND id > $2" + +type receiptStatements struct { + db *sql.DB + streamIDStatements *streamIDStatements + upsertReceipt *sql.Stmt + selectRoomReceipts *sql.Stmt +} + +func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Receipts, error) { + _, err := db.Exec(receiptsSchema) + if err != nil { + return nil, err + } + r := &receiptStatements{ + db: db, + streamIDStatements: streamID, + } + if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { + return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement") + } + if r.selectRoomReceipts, err = db.Prepare(selectRoomReceipts); err != nil { + return nil, errors.Wrap(err, "unable to prepare selectRoomReceipts statement") + } + return r, nil +} + +// UpsertReceipt creates new user receipts +func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) { + pos, err = r.streamIDStatements.nextStreamID(ctx, txn) + if err != nil { + return + } + stmt := sqlutil.TxStmt(txn, r.upsertReceipt) + _, err = stmt.ExecContext(ctx, pos, roomId, receiptType, userId, eventId, timestamp) + return +} + +// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp +func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]api.InputReceiptEvent, error) { + rows, err := r.selectRoomReceipts.QueryContext(ctx, roomId, streamPos) + if err != nil { + return []api.InputReceiptEvent{}, errors.Wrap(err, "unable to query room receipts") + } + var res []api.InputReceiptEvent + for rows.Next() { + r := api.InputReceiptEvent{} + err = rows.Scan(&r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) + if err != nil { + return res, errors.Wrap(err, "unable to scan row to types.Receipts") + } + res = append(res, r) + } + if rows.Err() != nil { + return res, errors.Wrap(err, "error while scanning rows") + } + return res, rows.Close() +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 86d83ec98..036e2b2e5 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -95,6 +95,10 @@ func (d *SyncServerDatasource) prepare() (err error) { if err != nil { return err } + receipts, err := NewSqliteReceiptsTable(d.db, &d.streamID) + if err != nil { + return err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer, @@ -107,6 +111,7 @@ func (d *SyncServerDatasource) prepare() (err error) { Topology: topology, Filter: filter, SendToDevice: sendToDevice, + Receipts: receipts, EDUCache: cache.New(), } return nil diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index da095be53..a543d8984 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" + eduAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -156,3 +157,8 @@ type Filter interface { SelectFilter(ctx context.Context, localpart string, filterID string) (*gomatrixserverlib.Filter, error) InsertFilter(ctx context.Context, filter *gomatrixserverlib.Filter, localpart string) (filterID string, err error) } + +type Receipts interface { + UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) + SelectRoomReceiptsAfter(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) +}