Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/fedeventvis

This commit is contained in:
Till Faelligen 2022-10-11 14:07:08 +02:00
commit f76c9fb2cc
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
9 changed files with 109 additions and 55 deletions

View file

@ -154,33 +154,31 @@ func SaveReadMarker(
return *resErr return *resErr
} }
if r.FullyRead == "" { if r.FullyRead != "" {
return util.JSONResponse{ data, err := json.Marshal(fullyReadEvent{EventID: r.FullyRead})
Code: http.StatusBadRequest, if err != nil {
JSON: jsonerror.BadJSON("Missing m.fully_read mandatory field"), return jsonerror.InternalServerError()
}
dataReq := api.InputAccountDataRequest{
UserID: device.UserID,
DataType: "m.fully_read",
RoomID: roomID,
AccountData: data,
}
dataRes := api.InputAccountDataResponse{}
if err := userAPI.InputAccountData(req.Context(), &dataReq, &dataRes); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("userAPI.InputAccountData failed")
return util.ErrorResponse(err)
} }
} }
data, err := json.Marshal(fullyReadEvent{EventID: r.FullyRead}) // Handle the read receipts that may be included in the read marker.
if err != nil {
return jsonerror.InternalServerError()
}
dataReq := api.InputAccountDataRequest{
UserID: device.UserID,
DataType: "m.fully_read",
RoomID: roomID,
AccountData: data,
}
dataRes := api.InputAccountDataResponse{}
if err := userAPI.InputAccountData(req.Context(), &dataReq, &dataRes); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("userAPI.InputAccountData failed")
return util.ErrorResponse(err)
}
// Handle the read receipt that may be included in the read marker
if r.Read != "" { if r.Read != "" {
return SetReceipt(req, syncProducer, device, roomID, "m.read", r.Read) return SetReceipt(req, userAPI, syncProducer, device, roomID, "m.read", r.Read)
}
if r.ReadPrivate != "" {
return SetReceipt(req, userAPI, syncProducer, device, roomID, "m.read.private", r.ReadPrivate)
} }
return util.JSONResponse{ return util.JSONResponse{

View file

@ -15,19 +15,22 @@
package routing package routing
import ( import (
"encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"time" "time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/userapi/api"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func SetReceipt(req *http.Request, syncProducer *producers.SyncAPIProducer, device *userapi.Device, roomID, receiptType, eventID string) util.JSONResponse { func SetReceipt(req *http.Request, userAPI api.ClientUserAPI, syncProducer *producers.SyncAPIProducer, device *userapi.Device, roomID, receiptType, eventID string) util.JSONResponse {
timestamp := gomatrixserverlib.AsTimestamp(time.Now()) timestamp := gomatrixserverlib.AsTimestamp(time.Now())
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"roomID": roomID, "roomID": roomID,
@ -37,13 +40,32 @@ func SetReceipt(req *http.Request, syncProducer *producers.SyncAPIProducer, devi
"timestamp": timestamp, "timestamp": timestamp,
}).Debug("Setting receipt") }).Debug("Setting receipt")
// currently only m.read is accepted switch receiptType {
if receiptType != "m.read" { case "m.read", "m.read.private":
return util.MessageResponse(400, fmt.Sprintf("receipt type must be m.read not '%s'", receiptType)) if err := syncProducer.SendReceipt(req.Context(), device.UserID, roomID, eventID, receiptType, timestamp); err != nil {
} return util.ErrorResponse(err)
}
if err := syncProducer.SendReceipt(req.Context(), device.UserID, roomID, eventID, receiptType, timestamp); err != nil { case "m.fully_read":
return util.ErrorResponse(err) data, err := json.Marshal(fullyReadEvent{EventID: eventID})
if err != nil {
return jsonerror.InternalServerError()
}
dataReq := api.InputAccountDataRequest{
UserID: device.UserID,
DataType: "m.fully_read",
RoomID: roomID,
AccountData: data,
}
dataRes := api.InputAccountDataResponse{}
if err := userAPI.InputAccountData(req.Context(), &dataReq, &dataRes); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("userAPI.InputAccountData failed")
return util.ErrorResponse(err)
}
default:
return util.MessageResponse(400, fmt.Sprintf("Receipt type '%s' not known", receiptType))
} }
return util.JSONResponse{ return util.JSONResponse{

View file

@ -1343,7 +1343,7 @@ func Setup(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return SetReceipt(req, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"]) return SetReceipt(req, userAPI, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"])
}), }),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/presence/{userId}/status", v3mux.Handle("/presence/{userId}/status",

View file

@ -81,6 +81,14 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
Type: msg.Header.Get("type"), Type: msg.Header.Get("type"),
} }
switch receipt.Type {
case "m.read":
// These are allowed to be sent over federation
case "m.read.private", "m.fully_read":
// These must not be sent over federation
return true
}
// only send receipt events which originated from us // only send receipt events which originated from us
_, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID) _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
if err != nil { if err != nil {

View file

@ -20,6 +20,7 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -95,7 +96,10 @@ func fetchEvent(ctx context.Context, rsAPI api.FederationRoomserverAPI, eventID
} }
if len(eventsResponse.Events) == 0 { if len(eventsResponse.Events) == 0 {
return nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil} return nil, &util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("Event not found"),
}
} }
return eventsResponse.Events[0].Event, nil return eventsResponse.Events[0].Event, nil

View file

@ -135,23 +135,24 @@ func getState(
return nil, nil, &resErr return nil, nil, &resErr
} }
if !response.StateKnown { switch {
case !response.RoomExists:
return nil, nil, &util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("Room not found"),
}
case !response.StateKnown:
return nil, nil, &util.JSONResponse{ return nil, nil, &util.JSONResponse{
Code: http.StatusNotFound, Code: http.StatusNotFound,
JSON: jsonerror.NotFound("State not known"), JSON: jsonerror.NotFound("State not known"),
} }
} case response.IsRejected:
if response.IsRejected {
return nil, nil, &util.JSONResponse{ return nil, nil, &util.JSONResponse{
Code: http.StatusNotFound, Code: http.StatusNotFound,
JSON: jsonerror.NotFound("Event not found"), JSON: jsonerror.NotFound("Event not found"),
} }
} }
if !response.RoomExists {
return nil, nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil}
}
return response.StateEvents, response.AuthChainEvents, nil return response.StateEvents, response.AuthChainEvents, nil
} }

View file

@ -35,8 +35,9 @@ type AccountData struct {
} }
type ReadMarkerJSON struct { type ReadMarkerJSON struct {
FullyRead string `json:"m.fully_read"` FullyRead string `json:"m.fully_read"`
Read string `json:"m.read"` Read string `json:"m.read"`
ReadPrivate string `json:"m.read.private"`
} }
// NotificationData contains statistics about notifications, sent from // NotificationData contains statistics about notifications, sent from

View file

@ -360,34 +360,50 @@ func (d *DatabaseTransaction) GetStateDeltas(
newlyJoinedRooms := make(map[string]bool, len(state)) newlyJoinedRooms := make(map[string]bool, len(state))
for roomID, stateStreamEvents := range state { for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents { for _, ev := range stateStreamEvents {
if membership, prevMembership := getMembershipFromEvent(ev.Event, userID); membership != "" { // Look for our membership in the state events and skip over any
if membership == gomatrixserverlib.Join && prevMembership != membership { // membership events that are not related to us.
// send full room state down instead of a delta membership, prevMembership := getMembershipFromEvent(ev.Event, userID)
if membership == "" {
continue
}
if membership == gomatrixserverlib.Join {
// If our membership is now join but the previous membership wasn't
// then this is a "join transition", so we'll insert this room.
if prevMembership != membership {
// Get the full room state, as we'll send that down for a newly
// joined room instead of a delta.
var s []types.StreamEvent var s []types.StreamEvent
s, err = d.currentStateStreamEventsForRoom(ctx, roomID, stateFilter) if s, err = d.currentStateStreamEventsForRoom(ctx, roomID, stateFilter); err != nil {
if err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
continue continue
} }
return nil, nil, err return nil, nil, err
} }
// Add the information for this room into the state so that
// it will get added with all of the rest of the joined rooms.
state[roomID] = s state[roomID] = s
newlyJoinedRooms[roomID] = true newlyJoinedRooms[roomID] = true
continue // we'll add this room in when we do joined rooms
} }
deltas = append(deltas, types.StateDelta{ // We won't add joined rooms into the delta at this point as they
Membership: membership, // are added later on.
MembershipPos: ev.StreamPosition, continue
StateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
RoomID: roomID,
})
break
} }
deltas = append(deltas, types.StateDelta{
Membership: membership,
MembershipPos: ev.StreamPosition,
StateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
RoomID: roomID,
})
break
} }
} }
// Add in currently joined rooms // Finally, add in currently joined rooms, including those from the
// join transitions above.
for _, joinedRoomID := range joinedRoomIDs { for _, joinedRoomID := range joinedRoomIDs {
deltas = append(deltas, types.StateDelta{ deltas = append(deltas, types.StateDelta{
Membership: gomatrixserverlib.Join, Membership: gomatrixserverlib.Join,

View file

@ -67,6 +67,10 @@ func (p *ReceiptStreamProvider) IncrementalSync(
if _, ok := req.IgnoredUsers.List[receipt.UserID]; ok { if _, ok := req.IgnoredUsers.List[receipt.UserID]; ok {
continue continue
} }
// Don't send private read receipts to other users
if receipt.Type == "m.read.private" && req.Device.UserID != receipt.UserID {
continue
}
receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt) receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
} }