Initial draft of implementing receipts

This commit is contained in:
Till Faelligen 2020-10-11 12:37:16 +02:00
parent 9096bfcee8
commit 75010468a5
14 changed files with 227 additions and 6 deletions

View file

@ -0,0 +1,48 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"net/http"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
func SetReceipt(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, device *userapi.Device, roomId, receiptType, eventId string) util.JSONResponse {
logrus.WithFields(logrus.Fields{
"roomId": roomId,
"receiptType": receiptType,
"eventId": eventId,
"userId": device.UserID,
}).Debug("Setting receipt")
userReq := &roomserverAPI.PerformUserReceiptUpdate{
RoomID: roomId,
ReceiptType: receiptType,
EventID: eventId,
UserID: device.UserID,
}
userResp := &roomserverAPI.PerformUserReceiptUpdateResponse{}
if err := rsAPI.PerformUserReceiptUpdate(req.Context(), userReq, userResp); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}

View file

@ -830,4 +830,18 @@ func Setup(
return ClaimKeys(req, keyAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomId}/receipt/{receiptType}/{eventId}",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return SetReceipt(req, rsAPI, device, vars["roomId"], vars["receiptType"], vars["eventId"])
}),
).Methods(http.MethodPost, http.MethodOptions)
}

View file

@ -195,4 +195,6 @@ type RoomserverInternalAPI interface {
req *RemoveRoomAliasRequest,
response *RemoveRoomAliasResponse,
) error
PerformUserReceiptUpdate(ctx context.Context, req *PerformUserReceiptUpdate, res *PerformUserReceiptUpdateResponse) error
}

View file

@ -312,3 +312,13 @@ func js(thing interface{}) string {
}
return string(b)
}
func (t *RoomserverInternalAPITrace) PerformUserReceiptUpdate(
ctx context.Context,
req *PerformUserReceiptUpdate,
res *PerformUserReceiptUpdateResponse,
) error {
err := t.Impl.PerformUserReceiptUpdate(ctx, req, res)
util.GetLogger(ctx).WithError(err).Infof("PerformUserReceiptUpdate req=%+v res=%+v", js(req), js(res))
return err
}

10
roomserver/api/receipt.go Normal file
View file

@ -0,0 +1,10 @@
package api
type PerformUserReceiptUpdate struct {
RoomID string
ReceiptType string
EventID string
UserID string
}
type PerformUserReceiptUpdateResponse struct{}

View file

@ -26,6 +26,7 @@ type RoomserverInternalAPI struct {
*perform.Leaver
*perform.Publisher
*perform.Backfiller
*perform.Receipter
DB storage.Database
Cfg *config.RoomServer
Producer sarama.SyncProducer
@ -112,6 +113,9 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
// than trying random servers
PreferServers: r.PerspectiveServerNames,
}
r.Receipter = &perform.Receipter{
DB: r.DB,
}
}
func (r *RoomserverInternalAPI) PerformInvite(
@ -143,3 +147,11 @@ func (r *RoomserverInternalAPI) PerformLeave(
}
return r.WriteOutputEvents(req.RoomID, outputEvents)
}
func (r *RoomserverInternalAPI) PerformUserReceiptUpdate(
ctx context.Context,
req *api.PerformUserReceiptUpdate,
res *api.PerformUserReceiptUpdateResponse,
) error {
return r.Receipter.PerformUserReceiptUpdate(ctx, req, res)
}

View file

@ -0,0 +1,16 @@
package perform
import (
"context"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage"
)
type Receipter struct {
DB storage.Database
}
func (r *Receipter) PerformUserReceiptUpdate(ctx context.Context, req *api.PerformUserReceiptUpdate, res *api.PerformUserReceiptUpdateResponse) error {
return r.DB.StoreReceipt(ctx, req.RoomID, req.ReceiptType, req.UserID, req.EventID)
}

View file

@ -25,12 +25,13 @@ const (
RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents"
// Perform operations
RoomserverPerformInvitePath = "/roomserver/performInvite"
RoomserverPerformPeekPath = "/roomserver/performPeek"
RoomserverPerformJoinPath = "/roomserver/performJoin"
RoomserverPerformLeavePath = "/roomserver/performLeave"
RoomserverPerformBackfillPath = "/roomserver/performBackfill"
RoomserverPerformPublishPath = "/roomserver/performPublish"
RoomserverPerformInvitePath = "/roomserver/performInvite"
RoomserverPerformPeekPath = "/roomserver/performPeek"
RoomserverPerformJoinPath = "/roomserver/performJoin"
RoomserverPerformLeavePath = "/roomserver/performLeave"
RoomserverPerformBackfillPath = "/roomserver/performBackfill"
RoomserverPerformPublishPath = "/roomserver/performPublish"
RoomserverPerformReceiptUpdatePath = "/roomserver/performReceiptUpdate"
// Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@ -492,3 +493,15 @@ func (h *httpRoomserverInternalAPI) QueryServerBannedFromRoom(
apiURL := h.roomserverURL + RoomserverQueryServerBannedFromRoomPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
}
func (h *httpRoomserverInternalAPI) PerformUserReceiptUpdate(
ctx context.Context,
req *api.PerformUserReceiptUpdate,
res *api.PerformUserReceiptUpdateResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformUserReceiptUpdate")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverPerformReceiptUpdatePath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
}

View file

@ -158,4 +158,5 @@ type Database interface {
GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error)
// GetKnownRooms returns a list of all rooms we know about.
GetKnownRooms(ctx context.Context) ([]string, error)
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string) error
}

View file

@ -44,6 +44,7 @@ type Database struct {
MembershipTable tables.Membership
PublishedTable tables.Published
RedactionsTable tables.Redactions
ReceiptsTable tables.Receipts
}
func (d *Database) SupportsConcurrentRoomInputs() bool {
@ -1023,3 +1024,10 @@ func (s stateEntryByStateKeySorter) Less(i, j int) bool {
return s[i].StateKeyTuple.LessThan(s[j].StateKeyTuple)
}
func (s stateEntryByStateKeySorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// StoreReceipt stores user receipts
func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.ReceiptsTable.UpsertReceipt(ctx, txn, roomId, receiptType, userId, eventId)
})
}

View file

@ -0,0 +1,77 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"time"
"github.com/pkg/errors"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
)
const receiptsSchema = `
-- Stores data about receipts
CREATE TABLE IF NOT EXISTS roomserver_receipts (
-- The ID
id INTEGER PRIMARY KEY AUTOINCREMENT,
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_id TEXT NOT NULL,
receipt_ts BIGINT NOT NULL,
CONSTRAINT roomserver_receipts_unique UNIQUE (room_id, receipt_type, user_id, event_id)
);
CREATE INDEX IF NOT EXISTS roomserver_receipts_user_id ON roomserver_receipts(user_id);
`
const upsertReceipt = "" +
"INSERT INTO roomserver_receipts" +
" (room_id, receipt_type, user_id, event_id, receipt_ts)" +
" VALUES ($1, $2, $3, $4, $5)" +
" ON CONFLICT (room_id, receipt_type, user_id, event_id)" +
" DO UPDATE SET receipt_ts = $5"
type receiptStatements struct {
db *sql.DB
upsertReceipt *sql.Stmt
}
func NewSqliteReceiptsTable(db *sql.DB) (tables.Receipts, error) {
_, err := db.Exec(receiptsSchema)
if err != nil {
return nil, err
}
r := &receiptStatements{
db: db,
}
if r.upsertReceipt, err = db.Prepare(upsertReceipt); err != nil {
return nil, errors.Wrap(err, "unable to prepare upsertReceipt statement")
}
return r, nil
}
func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string) error {
receiptTs := time.Now().UnixNano() / int64(time.Millisecond)
stmt := sqlutil.TxStmt(txn, r.upsertReceipt)
_, err := stmt.ExecContext(ctx, roomId, receiptType, userId, eventId, receiptTs)
return err
}

View file

@ -119,6 +119,10 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
if err != nil {
return nil, err
}
receipts, err := NewSqliteReceiptsTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{
DB: d.db,
Cache: cache,
@ -137,6 +141,7 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
MembershipTable: d.membership,
PublishedTable: published,
RedactionsTable: redactions,
ReceiptsTable: receipts,
}
return &d, nil
}

View file

@ -201,3 +201,7 @@ func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string {
// this returns the empty string if this is not a string type
return result.Str
}
type Receipts interface {
UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string) error
}

View file

@ -482,3 +482,4 @@ m.room.history_visibility == "joined" allows/forbids appropriately for Real user
POST rejects invalid utf-8 in JSON
Users cannot kick users who have already left a room
A prev_batch token from incremental sync can be used in the v1 messages API
POST /rooms/:room_id/receipt can create receipts