Use StreamingToken as the since value

This commit is contained in:
Till Faelligen 2020-10-15 09:52:10 +02:00
parent ceb7ca7c7d
commit fe418f2fc8
3 changed files with 24 additions and 14 deletions

View file

@ -150,7 +150,7 @@ type Database interface {
// RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event // RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event
RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error
// StoreReceipt stores new receipt events ofr // StoreReceipt stores new receipt events ofr
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) error StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
// GetRoomReceipts gets all receipts for a given roomID // GetRoomReceipts gets all receipts for a given roomID
GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error)
} }

View file

@ -21,6 +21,8 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/pkg/errors"
eduAPI "github.com/matrix-org/dendrite/eduserver/api" eduAPI "github.com/matrix-org/dendrite/eduserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
@ -565,11 +567,17 @@ func (d *Database) addTypingDeltaToResponse(
return nil return nil
} }
func (d *Database) addReceiptDeltaToResponse(since types.StreamPosition, joinedRoomIDs []string, res *types.Response) error { // addReceiptDeltaToResponse adds all receipt information to a sync response
// since the specified position
func (d *Database) addReceiptDeltaToResponse(
since types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
) error {
var jr types.JoinResponse var jr types.JoinResponse
// check all joinedRooms for receipts // check all joinedRooms for receipts
for _, roomID := range joinedRoomIDs { for _, roomID := range joinedRoomIDs {
receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), roomID, since) receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), roomID, since.EDUPosition())
if err != nil { if err != nil {
return err return err
} }
@ -609,19 +617,20 @@ func (d *Database) addEDUDeltaToResponse(
fromPos, toPos types.StreamingToken, fromPos, toPos types.StreamingToken,
joinedRoomIDs []string, joinedRoomIDs []string,
res *types.Response, res *types.Response,
) (err error) { ) error {
if fromPos.EDUPosition() != toPos.EDUPosition() { if fromPos.EDUPosition() != toPos.EDUPosition() {
// add typing deltas // add typing deltas
if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil { if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
return return errors.Wrap(err, "unable to apply typing delta to response")
}
// add receipt deltas
if err := d.addReceiptDeltaToResponse(fromPos.EDUPosition(), joinedRoomIDs, res); err != nil {
return
} }
} }
return // always check for receipt deltas; otherwise an initial sync won't receive receipts
if err := d.addReceiptDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
return errors.Wrap(err, "unable to apply receipts to response")
}
return nil
} }
func (d *Database) GetFilter( func (d *Database) GetFilter(
@ -1459,11 +1468,12 @@ type stateDelta struct {
} }
// StoreReceipt stores user receipts // StoreReceipt stores user receipts
func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) error { func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
_, err := d.Receipts.UpsertReceipt(ctx, txn, roomId, receiptType, userId, eventId, timestamp) pos, err = d.Receipts.UpsertReceipt(ctx, txn, roomId, receiptType, userId, eventId, timestamp)
return err return err
}) })
return
} }
func (d *Database) GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) { func (d *Database) GetRoomReceipts(ctx context.Context, roomId string, streamPos types.StreamPosition) ([]eduAPI.InputReceiptEvent, error) {

View file

@ -49,7 +49,7 @@ const upsertReceipt = "" +
" (id, room_id, receipt_type, user_id, event_id, receipt_ts)" + " (id, room_id, receipt_type, user_id, event_id, receipt_ts)" +
" VALUES ($1, $2, $3, $4, $5, $6)" + " VALUES ($1, $2, $3, $4, $5, $6)" +
" ON CONFLICT (room_id, receipt_type, user_id)" + " ON CONFLICT (room_id, receipt_type, user_id)" +
" DO UPDATE SET id = $1, event_id = $4, receipt_ts = $5" " DO UPDATE SET id = $1, event_id = $5, receipt_ts = $6"
const selectRoomReceipts = "" + const selectRoomReceipts = "" +
"SELECT room_id, receipt_type, user_id, event_id, receipt_ts" + "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" +