dendrite/syncapi/streams/stream_receipt.go
Till 8c5b166784
Use the stream positions of the notifier (#2768)
Use the stream positions of the notifier, which might have advanced
since setting it at the beginning of the loop. This possibly helps in
reducing roundtrips to the SyncAPI, just because we didn't fetch the
latest data.
Also fixes a minor oversight in the receipts stream.
2022-10-06 11:57:13 +01:00

120 lines
2.8 KiB
Go

package streams
import (
"context"
"encoding/json"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type ReceiptStreamProvider struct {
DefaultStreamProvider
}
func (p *ReceiptStreamProvider) Setup(
ctx context.Context, snapshot storage.DatabaseTransaction,
) {
p.DefaultStreamProvider.Setup(ctx, snapshot)
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := snapshot.MaxStreamPositionForReceipts(ctx)
if err != nil {
panic(err)
}
p.latest = id
}
func (p *ReceiptStreamProvider) CompleteSync(
ctx context.Context,
snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
}
func (p *ReceiptStreamProvider) IncrementalSync(
ctx context.Context,
snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
var joinedRooms []string
for roomID, membership := range req.Rooms {
if membership == gomatrixserverlib.Join {
joinedRooms = append(joinedRooms, roomID)
}
}
lastPos, receipts, err := snapshot.RoomReceiptsAfter(ctx, joinedRooms, from)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed")
return from
}
if len(receipts) == 0 || lastPos == 0 {
return to
}
// Group receipts by room, so we can create one ClientEvent for every room
receiptsByRoom := make(map[string][]types.OutputReceiptEvent)
for _, receipt := range receipts {
// skip ignored user events
if _, ok := req.IgnoredUsers.List[receipt.UserID]; ok {
continue
}
receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
}
for roomID, receipts := range receiptsByRoom {
// For a complete sync, make sure we're only including this room if
// that room was present in the joined rooms.
if from == 0 && !req.IsRoomPresent(roomID) {
continue
}
jr, ok := req.Response.Rooms.Join[roomID]
if !ok {
jr = types.NewJoinResponse()
}
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MReceipt,
RoomID: roomID,
}
content := make(map[string]ReceiptMRead)
for _, receipt := range receipts {
read, ok := content[receipt.EventID]
if !ok {
read = ReceiptMRead{
User: make(map[string]ReceiptTS),
}
}
read.User[receipt.UserID] = ReceiptTS{TS: receipt.Timestamp}
content[receipt.EventID] = read
}
ev.Content, err = json.Marshal(content)
if err != nil {
req.Log.WithError(err).Error("json.Marshal failed")
return from
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
req.Response.Rooms.Join[roomID] = jr
}
return lastPos
}
type ReceiptMRead struct {
User map[string]ReceiptTS `json:"m.read"`
}
type ReceiptTS struct {
TS gomatrixserverlib.Timestamp `json:"ts"`
}