Add /_dendrite/admin/downloadState/{serverName}/{roomID} admin endpoint

This commit is contained in:
Neil Alexander 2022-10-31 09:13:28 +00:00
parent 69aff372f3
commit f10c6f26e5
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
8 changed files with 238 additions and 12 deletions

View file

@ -191,3 +191,43 @@ func AdminMarkAsStale(req *http.Request, cfg *config.ClientAPI, keyAPI api.Clien
JSON: struct{}{},
}
}
func AdminDownloadState(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
roomID, ok := vars["roomID"]
if !ok {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("Expecting room ID."),
}
}
serverName, ok := vars["serverName"]
if !ok {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("Expecting remote server name."),
}
}
res := &roomserverAPI.PerformAdminDownloadStateResponse{}
if err := rsAPI.PerformAdminDownloadState(
req.Context(),
&roomserverAPI.PerformAdminDownloadStateRequest{
UserID: device.UserID,
RoomID: roomID,
ServerName: gomatrixserverlib.ServerName(serverName),
},
res,
); err != nil {
return jsonerror.InternalAPIError(req.Context(), err)
}
if err := res.Error; err != nil {
return err.JSONResponse()
}
return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{},
}
}

View file

@ -163,6 +163,12 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
dendriteAdminRouter.Handle("/admin/downloadState/{serverName}/{roomID}",
httputil.MakeAdminAPI("admin_download_state", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return AdminDownloadState(req, cfg, device, rsAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
dendriteAdminRouter.Handle("/admin/fulltext/reindex",
httputil.MakeAdminAPI("admin_fultext_reindex", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return AdminReindex(req, cfg, device, natsClient)

View file

@ -150,6 +150,7 @@ type ClientRoomserverAPI interface {
PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse) error
PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse) error
PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse) error
PerformAdminDownloadState(ctx context.Context, req *PerformAdminDownloadStateRequest, res *PerformAdminDownloadStateResponse) error
PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse) error
PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse) error
PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error

View file

@ -131,6 +131,16 @@ func (t *RoomserverInternalAPITrace) PerformAdminEvacuateUser(
return err
}
func (t *RoomserverInternalAPITrace) PerformAdminDownloadState(
ctx context.Context,
req *PerformAdminDownloadStateRequest,
res *PerformAdminDownloadStateResponse,
) error {
err := t.Impl.PerformAdminDownloadState(ctx, req, res)
util.GetLogger(ctx).WithError(err).Infof("PerformAdminDownloadState req=%+v res=%+v", js(req), js(res))
return err
}
func (t *RoomserverInternalAPITrace) PerformInboundPeek(
ctx context.Context,
req *PerformInboundPeekRequest,

View file

@ -237,3 +237,13 @@ type PerformAdminEvacuateUserResponse struct {
Affected []string `json:"affected"`
Error *PerformError
}
type PerformAdminDownloadStateRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
ServerName gomatrixserverlib.ServerName `json:"server_name"`
}
type PerformAdminDownloadStateResponse struct {
Error *PerformError `json:"error,omitempty"`
}

View file

@ -236,3 +236,145 @@ func (r *Admin) PerformAdminEvacuateUser(
}
return nil
}
func (r *Admin) PerformAdminDownloadState(
ctx context.Context,
req *api.PerformAdminDownloadStateRequest,
res *api.PerformAdminDownloadStateResponse,
) error {
roomInfo, err := r.DB.RoomInfo(ctx, req.RoomID)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.DB.RoomInfo: %s", err),
}
return nil
}
if roomInfo == nil || roomInfo.IsStub() {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("room %q not found", req.RoomID),
}
return nil
}
fwdExtremities, _, depth, err := r.DB.LatestEventIDs(ctx, roomInfo.RoomNID)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.DB.LatestEventIDs: %s", err),
}
return nil
}
authEventMap := map[string]*gomatrixserverlib.Event{}
stateEventMap := map[string]*gomatrixserverlib.Event{}
for _, fwdExtremity := range fwdExtremities {
var state gomatrixserverlib.RespState
state, err = r.Inputer.FSAPI.LookupState(ctx, req.ServerName, req.RoomID, fwdExtremity.EventID, roomInfo.RoomVersion)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.Inputer.FSAPI.LookupState (%q): %s", fwdExtremity.EventID, err),
}
return nil
}
for _, authEvent := range state.AuthEvents.UntrustedEvents(roomInfo.RoomVersion) {
if err = authEvent.VerifyEventSignatures(ctx, r.Inputer.KeyRing); err != nil {
continue
}
authEventMap[authEvent.EventID()] = authEvent
}
for _, stateEvent := range state.StateEvents.UntrustedEvents(roomInfo.RoomVersion) {
if err = stateEvent.VerifyEventSignatures(ctx, r.Inputer.KeyRing); err != nil {
continue
}
stateEventMap[stateEvent.EventID()] = stateEvent
}
}
authEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(authEventMap))
stateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEventMap))
stateIDs := make([]string, 0, len(stateEventMap))
for _, authEvent := range authEventMap {
authEvents = append(authEvents, authEvent.Headered(roomInfo.RoomVersion))
}
for _, stateEvent := range stateEventMap {
stateEvents = append(stateEvents, stateEvent.Headered(roomInfo.RoomVersion))
stateIDs = append(stateIDs, stateEvent.EventID())
}
builder := &gomatrixserverlib.EventBuilder{
Type: "org.matrix.dendrite.state_download",
Sender: req.UserID,
RoomID: req.RoomID,
Content: gomatrixserverlib.RawJSON("{}"),
}
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("gomatrixserverlib.StateNeededForEventBuilder: %s", err),
}
return nil
}
queryRes := &api.QueryLatestEventsAndStateResponse{
RoomExists: true,
RoomVersion: roomInfo.RoomVersion,
LatestEvents: fwdExtremities,
StateEvents: stateEvents,
Depth: depth,
}
ev, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, time.Now(), &eventsNeeded, queryRes)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("eventutil.BuildEvent: %s", err),
}
return nil
}
inputReq := &api.InputRoomEventsRequest{
Asynchronous: false,
}
inputRes := &api.InputRoomEventsResponse{}
for _, authEvent := range append(authEvents, stateEvents...) {
inputReq.InputRoomEvents = append(inputReq.InputRoomEvents, api.InputRoomEvent{
Kind: api.KindOutlier,
Event: authEvent,
})
}
inputReq.InputRoomEvents = append(inputReq.InputRoomEvents, api.InputRoomEvent{
Kind: api.KindNew,
Event: ev,
Origin: r.Cfg.Matrix.ServerName,
HasState: true,
StateEventIDs: stateIDs,
SendAsServer: string(r.Cfg.Matrix.ServerName),
})
if err := r.Inputer.InputRoomEvents(ctx, inputReq, inputRes); err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.Inputer.InputRoomEvents: %s", err),
}
return nil
}
if inputRes.ErrMsg != "" {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: inputRes.ErrMsg,
}
}
return nil
}

View file

@ -39,6 +39,7 @@ const (
RoomserverPerformForgetPath = "/roomserver/performForget"
RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom"
RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser"
RoomserverPerformAdminDownloadStatePath = "/roomserver/performAdminDownloadState"
// Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@ -261,6 +262,17 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateRoom(
)
}
func (h *httpRoomserverInternalAPI) PerformAdminDownloadState(
ctx context.Context,
request *api.PerformAdminDownloadStateRequest,
response *api.PerformAdminDownloadStateResponse,
) error {
return httputil.CallInternalRPCAPI(
"PerformAdminDownloadState", h.roomserverURL+RoomserverPerformAdminDownloadStatePath,
h.httpClient, ctx, request, response,
)
}
func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser(
ctx context.Context,
request *api.PerformAdminEvacuateUserRequest,

View file

@ -65,6 +65,11 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
httputil.MakeInternalRPCAPI("RoomserverPerformAdminEvacuateUser", r.PerformAdminEvacuateUser),
)
internalAPIMux.Handle(
RoomserverPerformAdminDownloadStatePath,
httputil.MakeInternalRPCAPI("RoomserverPerformAdminDownloadState", r.PerformAdminDownloadState),
)
internalAPIMux.Handle(
RoomserverQueryPublishedRoomsPath,
httputil.MakeInternalRPCAPI("RoomserverQueryPublishedRooms", r.QueryPublishedRooms),