From 75010468a5469bb78982880679f9b6cfb7d6b95a Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Sun, 11 Oct 2020 12:37:16 +0200 Subject: [PATCH] Initial draft of implementing receipts --- clientapi/routing/receipt.go | 48 ++++++++++++ clientapi/routing/routing.go | 14 ++++ roomserver/api/api.go | 2 + roomserver/api/api_trace.go | 10 +++ roomserver/api/receipt.go | 10 +++ roomserver/internal/api.go | 12 +++ .../internal/perform/perform_receipt.go | 16 ++++ roomserver/inthttp/client.go | 25 ++++-- roomserver/storage/interface.go | 1 + roomserver/storage/shared/storage.go | 8 ++ roomserver/storage/sqlite3/receipt_table.go | 77 +++++++++++++++++++ roomserver/storage/sqlite3/storage.go | 5 ++ roomserver/storage/tables/interface.go | 4 + sytest-whitelist | 1 + 14 files changed, 227 insertions(+), 6 deletions(-) create mode 100644 clientapi/routing/receipt.go create mode 100644 roomserver/api/receipt.go create mode 100644 roomserver/internal/perform/perform_receipt.go create mode 100644 roomserver/storage/sqlite3/receipt_table.go diff --git a/clientapi/routing/receipt.go b/clientapi/routing/receipt.go new file mode 100644 index 000000000..d705034e7 --- /dev/null +++ b/clientapi/routing/receipt.go @@ -0,0 +1,48 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "net/http" + + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" +) + +func SetReceipt(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, device *userapi.Device, roomId, receiptType, eventId string) util.JSONResponse { + logrus.WithFields(logrus.Fields{ + "roomId": roomId, + "receiptType": receiptType, + "eventId": eventId, + "userId": device.UserID, + }).Debug("Setting receipt") + userReq := &roomserverAPI.PerformUserReceiptUpdate{ + RoomID: roomId, + ReceiptType: receiptType, + EventID: eventId, + UserID: device.UserID, + } + userResp := &roomserverAPI.PerformUserReceiptUpdateResponse{} + + if err := rsAPI.PerformUserReceiptUpdate(req.Context(), userReq, userResp); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{ + Code: http.StatusOK, + JSON: struct{}{}, + } +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 4f99237f5..640543429 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -830,4 +830,18 @@ func Setup( return ClaimKeys(req, keyAPI) }), ).Methods(http.MethodPost, http.MethodOptions) + + r0mux.Handle("/rooms/{roomId}/receipt/{receiptType}/{eventId}", + httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + if r := rateLimits.rateLimit(req); r != nil { + return *r + } + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + + return SetReceipt(req, rsAPI, device, vars["roomId"], vars["receiptType"], vars["eventId"]) + }), + ).Methods(http.MethodPost, http.MethodOptions) } diff --git a/roomserver/api/api.go b/roomserver/api/api.go index 043f72221..6608adc34 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -195,4 +195,6 @@ type RoomserverInternalAPI interface { req *RemoveRoomAliasRequest, response *RemoveRoomAliasResponse, ) error + + PerformUserReceiptUpdate(ctx context.Context, req *PerformUserReceiptUpdate, res *PerformUserReceiptUpdateResponse) error } diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go index f4eaddc1e..a486c602f 100644 --- a/roomserver/api/api_trace.go +++ b/roomserver/api/api_trace.go @@ -312,3 +312,13 @@ func js(thing interface{}) string { } return string(b) } + +func (t *RoomserverInternalAPITrace) PerformUserReceiptUpdate( + ctx context.Context, + req *PerformUserReceiptUpdate, + res *PerformUserReceiptUpdateResponse, +) error { + err := t.Impl.PerformUserReceiptUpdate(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("PerformUserReceiptUpdate req=%+v res=%+v", js(req), js(res)) + return err +} diff --git a/roomserver/api/receipt.go b/roomserver/api/receipt.go new file mode 100644 index 000000000..5f07f0917 --- /dev/null +++ b/roomserver/api/receipt.go @@ -0,0 +1,10 @@ +package api + +type PerformUserReceiptUpdate struct { + RoomID string + ReceiptType string + EventID string + UserID string +} + +type PerformUserReceiptUpdateResponse struct{} diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index ee4e4ec96..22d41e38b 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -26,6 +26,7 @@ type RoomserverInternalAPI struct { *perform.Leaver *perform.Publisher *perform.Backfiller + *perform.Receipter DB storage.Database Cfg *config.RoomServer Producer sarama.SyncProducer @@ -112,6 +113,9 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen // than trying random servers PreferServers: r.PerspectiveServerNames, } + r.Receipter = &perform.Receipter{ + DB: r.DB, + } } func (r *RoomserverInternalAPI) PerformInvite( @@ -143,3 +147,11 @@ func (r *RoomserverInternalAPI) PerformLeave( } return r.WriteOutputEvents(req.RoomID, outputEvents) } + +func (r *RoomserverInternalAPI) PerformUserReceiptUpdate( + ctx context.Context, + req *api.PerformUserReceiptUpdate, + res *api.PerformUserReceiptUpdateResponse, +) error { + return r.Receipter.PerformUserReceiptUpdate(ctx, req, res) +} diff --git a/roomserver/internal/perform/perform_receipt.go b/roomserver/internal/perform/perform_receipt.go new file mode 100644 index 000000000..e8f975866 --- /dev/null +++ b/roomserver/internal/perform/perform_receipt.go @@ -0,0 +1,16 @@ +package perform + +import ( + "context" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/storage" +) + +type Receipter struct { + DB storage.Database +} + +func (r *Receipter) PerformUserReceiptUpdate(ctx context.Context, req *api.PerformUserReceiptUpdate, res *api.PerformUserReceiptUpdateResponse) error { + return r.DB.StoreReceipt(ctx, req.RoomID, req.ReceiptType, req.UserID, req.EventID) +} diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index 24a82adf8..735c64eda 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -25,12 +25,13 @@ const ( RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents" // Perform operations - RoomserverPerformInvitePath = "/roomserver/performInvite" - RoomserverPerformPeekPath = "/roomserver/performPeek" - RoomserverPerformJoinPath = "/roomserver/performJoin" - RoomserverPerformLeavePath = "/roomserver/performLeave" - RoomserverPerformBackfillPath = "/roomserver/performBackfill" - RoomserverPerformPublishPath = "/roomserver/performPublish" + RoomserverPerformInvitePath = "/roomserver/performInvite" + RoomserverPerformPeekPath = "/roomserver/performPeek" + RoomserverPerformJoinPath = "/roomserver/performJoin" + RoomserverPerformLeavePath = "/roomserver/performLeave" + RoomserverPerformBackfillPath = "/roomserver/performBackfill" + RoomserverPerformPublishPath = "/roomserver/performPublish" + RoomserverPerformReceiptUpdatePath = "/roomserver/performReceiptUpdate" // Query operations RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" @@ -492,3 +493,15 @@ func (h *httpRoomserverInternalAPI) QueryServerBannedFromRoom( apiURL := h.roomserverURL + RoomserverQueryServerBannedFromRoomPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) } + +func (h *httpRoomserverInternalAPI) PerformUserReceiptUpdate( + ctx context.Context, + req *api.PerformUserReceiptUpdate, + res *api.PerformUserReceiptUpdateResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformUserReceiptUpdate") + defer span.Finish() + + apiURL := h.roomserverURL + RoomserverPerformReceiptUpdatePath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) +} diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 10a380e85..c65c89b0a 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -158,4 +158,5 @@ type Database interface { GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) // GetKnownRooms returns a list of all rooms we know about. GetKnownRooms(ctx context.Context) ([]string, error) + StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string) error } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index e96eab71b..80f1811c6 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -44,6 +44,7 @@ type Database struct { MembershipTable tables.Membership PublishedTable tables.Published RedactionsTable tables.Redactions + ReceiptsTable tables.Receipts } func (d *Database) SupportsConcurrentRoomInputs() bool { @@ -1023,3 +1024,10 @@ func (s stateEntryByStateKeySorter) Less(i, j int) bool { return s[i].StateKeyTuple.LessThan(s[j].StateKeyTuple) } func (s stateEntryByStateKeySorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +// StoreReceipt stores user receipts +func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.ReceiptsTable.UpsertReceipt(ctx, txn, roomId, receiptType, userId, eventId) + }) +} diff --git a/roomserver/storage/sqlite3/receipt_table.go b/roomserver/storage/sqlite3/receipt_table.go new file mode 100644 index 000000000..8a088f29e --- /dev/null +++ b/roomserver/storage/sqlite3/receipt_table.go @@ -0,0 +1,77 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "time" + + "github.com/pkg/errors" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/storage/tables" +) + +const receiptsSchema = ` +-- Stores data about receipts +CREATE TABLE IF NOT EXISTS roomserver_receipts ( + -- The ID + id INTEGER PRIMARY KEY AUTOINCREMENT, + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL, + receipt_ts BIGINT NOT NULL, + + CONSTRAINT roomserver_receipts_unique UNIQUE (room_id, receipt_type, user_id, event_id) +); + +CREATE INDEX IF NOT EXISTS roomserver_receipts_user_id ON roomserver_receipts(user_id); +` + +const upsertReceipt = "" + + "INSERT INTO roomserver_receipts" + + " (room_id, receipt_type, user_id, event_id, receipt_ts)" + + " VALUES ($1, $2, $3, $4, $5)" + + " ON CONFLICT (room_id, receipt_type, user_id, event_id)" + + " DO UPDATE SET receipt_ts = $5" + +type receiptStatements struct { + db *sql.DB + upsertReceipt *sql.Stmt +} + +func NewSqliteReceiptsTable(db *sql.DB) (tables.Receipts, error) { + _, err := db.Exec(receiptsSchema) + if err != nil { + return nil, err + } + r := &receiptStatements{ + db: db, + } + if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil { + return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement") + } + + return r, nil +} + +func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string) error { + receiptTs := time.Now().UnixNano() / int64(time.Millisecond) + stmt := sqlutil.TxStmt(txn, r.upsertReceipt) + _, err := stmt.ExecContext(ctx, roomId, receiptType, userId, eventId, receiptTs) + return err +} diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 4a74bf736..1401be946 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -119,6 +119,10 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) if err != nil { return nil, err } + receipts, err := NewSqliteReceiptsTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Cache: cache, @@ -137,6 +141,7 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) MembershipTable: d.membership, PublishedTable: published, RedactionsTable: redactions, + ReceiptsTable: receipts, } return &d, nil } diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index eba878ba5..d7b81c3b3 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -201,3 +201,7 @@ func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string { // this returns the empty string if this is not a string type return result.Str } + +type Receipts interface { + UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string) error +} diff --git a/sytest-whitelist b/sytest-whitelist index 805f0e4dd..c2016d389 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -482,3 +482,4 @@ m.room.history_visibility == "joined" allows/forbids appropriately for Real user POST rejects invalid utf-8 in JSON Users cannot kick users who have already left a room A prev_batch token from incremental sync can be used in the v1 messages API +POST /rooms/:room_id/receipt can create receipts