mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-27 08:43:10 -06:00
Call federated spaces at the right time
This commit is contained in:
parent
1177c6eb01
commit
e5e5d8db86
|
|
@ -71,7 +71,7 @@ func Enable(
|
||||||
})
|
})
|
||||||
|
|
||||||
base.PublicClientAPIMux.Handle("/unstable/rooms/{roomID}/spaces",
|
base.PublicClientAPIMux.Handle("/unstable/rooms/{roomID}/spaces",
|
||||||
httputil.MakeAuthAPI("spaces", userAPI, spacesHandler(db, rsAPI)),
|
httputil.MakeAuthAPI("spaces", userAPI, spacesHandler(db, rsAPI, fsAPI, base.Cfg.Global.ServerName)),
|
||||||
).Methods(http.MethodPost, http.MethodOptions)
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
|
|
||||||
base.PublicFederationAPIMux.Handle("/unstable/spaces/{roomID}", httputil.MakeExternalAPI(
|
base.PublicFederationAPIMux.Handle("/unstable/spaces/{roomID}", httputil.MakeExternalAPI(
|
||||||
|
|
@ -88,7 +88,7 @@ func Enable(
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
roomID := params["roomID"]
|
roomID := params["roomID"]
|
||||||
return federatedSpacesHandler(req.Context(), fedReq, roomID, db, rsAPI, fsAPI)
|
return federatedSpacesHandler(req.Context(), fedReq, roomID, db, rsAPI, fsAPI, base.Cfg.Global.ServerName)
|
||||||
},
|
},
|
||||||
)).Methods(http.MethodPost, http.MethodOptions)
|
)).Methods(http.MethodPost, http.MethodOptions)
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -97,6 +97,7 @@ func Enable(
|
||||||
func federatedSpacesHandler(
|
func federatedSpacesHandler(
|
||||||
ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, roomID string, db Database,
|
ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, roomID string, db Database,
|
||||||
rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
|
rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
|
||||||
|
thisServer gomatrixserverlib.ServerName,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
inMemoryBatchCache := make(map[string]set)
|
inMemoryBatchCache := make(map[string]set)
|
||||||
var r gomatrixserverlib.MSC2946SpacesRequest
|
var r gomatrixserverlib.MSC2946SpacesRequest
|
||||||
|
|
@ -114,6 +115,7 @@ func federatedSpacesHandler(
|
||||||
req: &r,
|
req: &r,
|
||||||
rootRoomID: roomID,
|
rootRoomID: roomID,
|
||||||
serverName: fedReq.Origin(),
|
serverName: fedReq.Origin(),
|
||||||
|
thisServer: thisServer,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
|
||||||
db: db,
|
db: db,
|
||||||
|
|
@ -128,7 +130,10 @@ func federatedSpacesHandler(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func spacesHandler(db Database, rsAPI roomserver.RoomserverInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse {
|
func spacesHandler(
|
||||||
|
db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
|
||||||
|
thisServer gomatrixserverlib.ServerName,
|
||||||
|
) func(*http.Request, *userapi.Device) util.JSONResponse {
|
||||||
return func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
return func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||||
inMemoryBatchCache := make(map[string]set)
|
inMemoryBatchCache := make(map[string]set)
|
||||||
// Extract the room ID from the request. Sanity check request data.
|
// Extract the room ID from the request. Sanity check request data.
|
||||||
|
|
@ -149,10 +154,12 @@ func spacesHandler(db Database, rsAPI roomserver.RoomserverInternalAPI) func(*ht
|
||||||
req: &r,
|
req: &r,
|
||||||
rootRoomID: roomID,
|
rootRoomID: roomID,
|
||||||
caller: device,
|
caller: device,
|
||||||
|
thisServer: thisServer,
|
||||||
ctx: req.Context(),
|
ctx: req.Context(),
|
||||||
|
|
||||||
db: db,
|
db: db,
|
||||||
rsAPI: rsAPI,
|
rsAPI: rsAPI,
|
||||||
|
fsAPI: fsAPI,
|
||||||
inMemoryBatchCache: inMemoryBatchCache,
|
inMemoryBatchCache: inMemoryBatchCache,
|
||||||
}
|
}
|
||||||
res := w.walk()
|
res := w.walk()
|
||||||
|
|
@ -168,6 +175,7 @@ type walker struct {
|
||||||
rootRoomID string
|
rootRoomID string
|
||||||
caller *userapi.Device
|
caller *userapi.Device
|
||||||
serverName gomatrixserverlib.ServerName
|
serverName gomatrixserverlib.ServerName
|
||||||
|
thisServer gomatrixserverlib.ServerName
|
||||||
db Database
|
db Database
|
||||||
rsAPI roomserver.RoomserverInternalAPI
|
rsAPI roomserver.RoomserverInternalAPI
|
||||||
fsAPI fs.FederationSenderInternalAPI
|
fsAPI fs.FederationSenderInternalAPI
|
||||||
|
|
@ -230,9 +238,20 @@ func (w *walker) walk() *gomatrixserverlib.MSC2946SpacesResponse {
|
||||||
}
|
}
|
||||||
// Mark this room as processed.
|
// Mark this room as processed.
|
||||||
processed[roomID] = true
|
processed[roomID] = true
|
||||||
|
|
||||||
// Is the caller currently joined to the room or is the room `world_readable`
|
// Is the caller currently joined to the room or is the room `world_readable`
|
||||||
// If no, skip this room. If yes, continue.
|
// If no, skip this room. If yes, continue.
|
||||||
if !w.authorised(roomID) {
|
if !w.roomExists(roomID) || !w.authorised(roomID) {
|
||||||
|
// attempt to query this room over federation, as either we've never heard of it before
|
||||||
|
// or we've left it and hence are not authorised (but info may be exposed regardless)
|
||||||
|
fedRes, err := w.federatedRoomInfo(roomID)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(w.ctx).WithError(err).WithField("room_id", roomID).Errorf("failed to query federated spaces")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if fedRes != nil {
|
||||||
|
res = combineResponses(res, *fedRes)
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Get all `m.space.child` and `m.space.parent` state events for the room. *In addition*, get
|
// Get all `m.space.child` and `m.space.parent` state events for the room. *In addition*, get
|
||||||
|
|
@ -351,6 +370,68 @@ func (w *walker) publicRoomsChunk(roomID string) *gomatrixserverlib.PublicRoom {
|
||||||
return &pubRooms[0]
|
return &pubRooms[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// federatedRoomInfo returns more of the spaces graph from another server. Returns nil if this was
|
||||||
|
// unsuccessful.
|
||||||
|
func (w *walker) federatedRoomInfo(roomID string) (*gomatrixserverlib.MSC2946SpacesResponse, error) {
|
||||||
|
// only do federated requests for client requests
|
||||||
|
if w.caller == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
// extract events which point to this room ID and extract their vias
|
||||||
|
events, err := w.db.References(w.ctx, roomID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get References events: %w", err)
|
||||||
|
}
|
||||||
|
vias := make(set)
|
||||||
|
for _, ev := range events {
|
||||||
|
if ev.StateKeyEquals(roomID) {
|
||||||
|
// event points at this room, extract vias
|
||||||
|
content := struct {
|
||||||
|
Vias []string `json:"via"`
|
||||||
|
}{}
|
||||||
|
if err = json.Unmarshal(ev.Content(), &content); err != nil {
|
||||||
|
continue // silently ignore corrupted state events
|
||||||
|
}
|
||||||
|
for _, v := range content.Vias {
|
||||||
|
vias[v] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
util.GetLogger(w.ctx).Infof("Querying federatedRoomInfo via %+v", vias)
|
||||||
|
ctx := context.Background()
|
||||||
|
// query more of the spaces graph using these servers
|
||||||
|
for serverName := range vias {
|
||||||
|
if serverName == string(w.thisServer) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
res, err := w.fsAPI.MSC2946Spaces(ctx, gomatrixserverlib.ServerName(serverName), roomID, gomatrixserverlib.MSC2946SpacesRequest{
|
||||||
|
Limit: w.req.Limit,
|
||||||
|
MaxRoomsPerSpace: w.req.MaxRoomsPerSpace,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(w.ctx).WithError(err).Warnf("failed to call MSC2946Spaces on server %s", serverName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return &res, nil
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *walker) roomExists(roomID string) bool {
|
||||||
|
var queryRes roomserver.QueryServerJoinedToRoomResponse
|
||||||
|
err := w.rsAPI.QueryServerJoinedToRoom(w.ctx, &roomserver.QueryServerJoinedToRoomRequest{
|
||||||
|
RoomID: roomID,
|
||||||
|
ServerName: w.thisServer,
|
||||||
|
}, &queryRes)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(w.ctx).WithError(err).Error("failed to QueryServerJoinedToRoom")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// if the room exists but we aren't in the room then we might have stale data so we want to fetch
|
||||||
|
// it fresh via federation
|
||||||
|
return queryRes.RoomExists && queryRes.IsInRoom
|
||||||
|
}
|
||||||
|
|
||||||
// authorised returns true iff the user is joined this room or the room is world_readable
|
// authorised returns true iff the user is joined this room or the room is world_readable
|
||||||
func (w *walker) authorised(roomID string) bool {
|
func (w *walker) authorised(roomID string) bool {
|
||||||
if w.caller != nil {
|
if w.caller != nil {
|
||||||
|
|
@ -499,3 +580,28 @@ func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEve
|
||||||
RoomID: ev.RoomID(),
|
RoomID: ev.RoomID(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func combineResponses(local, remote gomatrixserverlib.MSC2946SpacesResponse) gomatrixserverlib.MSC2946SpacesResponse {
|
||||||
|
knownRooms := make(set)
|
||||||
|
for _, room := range local.Rooms {
|
||||||
|
knownRooms[room.RoomID] = true
|
||||||
|
}
|
||||||
|
knownEvents := make(set)
|
||||||
|
for _, event := range local.Events {
|
||||||
|
knownEvents[event.RoomID+event.Type+event.StateKey] = true
|
||||||
|
}
|
||||||
|
// mux in remote entries if and only if they aren't present already
|
||||||
|
for _, room := range remote.Rooms {
|
||||||
|
if knownRooms[room.RoomID] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
local.Rooms = append(local.Rooms, room)
|
||||||
|
}
|
||||||
|
for _, event := range remote.Events {
|
||||||
|
if knownEvents[event.RoomID+event.Type+event.StateKey] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
local.Events = append(local.Events, event)
|
||||||
|
}
|
||||||
|
return local
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -401,6 +401,12 @@ type testRoomserverAPI struct {
|
||||||
pubRoomState map[string]map[gomatrixserverlib.StateKeyTuple]string
|
pubRoomState map[string]map[gomatrixserverlib.StateKeyTuple]string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *testRoomserverAPI) QueryServerJoinedToRoom(ctx context.Context, req *roomserver.QueryServerJoinedToRoomRequest, res *roomserver.QueryServerJoinedToRoomResponse) error {
|
||||||
|
res.IsInRoom = true
|
||||||
|
res.RoomExists = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *testRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *roomserver.QueryBulkStateContentRequest, res *roomserver.QueryBulkStateContentResponse) error {
|
func (r *testRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *roomserver.QueryBulkStateContentRequest, res *roomserver.QueryBulkStateContentResponse) error {
|
||||||
res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
|
res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
|
||||||
for _, roomID := range req.RoomIDs {
|
for _, roomID := range req.RoomIDs {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue