diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 783fdc3b8..42c45d0e0 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -30,6 +30,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -322,12 +323,83 @@ func (t *txnReq) processEDUs(ctx context.Context) { } case gomatrixserverlib.MDeviceListUpdate: t.processDeviceListUpdate(ctx, e) + case "m.receipt": // to be replaced by gomatrixserverlib.MReceipt + // https://matrix.org/docs/spec/server_server/r0.1.4#receipts + payload := receiptPayload{} + + if err := json.Unmarshal(e.Content, &payload.Data); err != nil { + util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal receipt event") + continue + } + + for roomID, receipt := range payload.Data { + for userID := range receipt.MRead { + _, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("Failed to split domain from receipt event sender") + continue + } + if t.Origin != domain { + util.GetLogger(ctx).Warnf("Dropping receipt event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin) + continue + } + ts := receipt.MRead[userID].Data.TS + events := receipt.MRead[userID].EventIDs + if err := t.processReceiptEvent(ctx, userID, roomID, "m.read", ts, events); err != nil { + util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ + "sender": t.Origin, + "user_id": userID, + "room_id": roomID, + "events": receipt.MRead[userID].EventIDs, + }).Error("Failed to send receipt event to edu server") + continue + } + } + } default: util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") } } } +// ReceiptPayload is the receipt payload passed between servers +type receiptPayload struct { + Data map[string]struct { + MRead map[string]struct { + Data struct { + TS gomatrixserverlib.Timestamp `json:"ts"` + } `json:"data"` + EventIDs []string `json:"event_ids"` + } `json:"m.read"` + } `json:"data"` +} + +// processReceiptEvent sends receipt events to the edu server +func (t *txnReq) processReceiptEvent(ctx context.Context, + userID, roomID, receiptType string, + timestamp gomatrixserverlib.Timestamp, + eventIDs []string, +) error { + // store every event + for _, eventID := range eventIDs { + req := eduserverAPI.InputReceiptEventRequest{ + InputReceiptEvent: eduserverAPI.InputReceiptEvent{ + UserID: userID, + RoomID: roomID, + EventID: eventID, + Type: receiptType, + Timestamp: timestamp, + }, + } + resp := eduserverAPI.InputReceiptEventResponse{} + if err := t.eduAPI.InputReceiptEvent(ctx, &req, &resp); err != nil { + return errors.Wrap(err, "unable to set receipt event") + } + } + + return nil +} + func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverlib.EDU) { var payload gomatrixserverlib.DeviceListUpdateEvent if err := json.Unmarshal(e.Content, &payload); err != nil { diff --git a/sytest-whitelist b/sytest-whitelist index f31b5f0b7..f5ba5fce1 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -488,4 +488,5 @@ POST /rooms/:room_id/receipt can create receipts Receipts must be m.read Read receipts appear in initial v2 /sync New read receipts appear in incremental v2 /sync -Outbound federation sends receipts \ No newline at end of file +Outbound federation sends receipts +Inbound federation rejects receipts from wrong remote \ No newline at end of file