diff --git a/clientapi/routing/admin.go b/clientapi/routing/admin.go index 622b8e1d1..59032bc05 100644 --- a/clientapi/routing/admin.go +++ b/clientapi/routing/admin.go @@ -170,3 +170,43 @@ func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *userap }, } } + +func AdminDownloadState(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse { + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + roomID, ok := vars["roomID"] + if !ok { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingArgument("Expecting room ID."), + } + } + serverName, ok := vars["serverName"] + if !ok { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingArgument("Expecting remote server name."), + } + } + res := &roomserverAPI.PerformAdminDownloadStateResponse{} + if err := rsAPI.PerformAdminDownloadState( + req.Context(), + &roomserverAPI.PerformAdminDownloadStateRequest{ + UserID: device.UserID, + RoomID: roomID, + ServerName: gomatrixserverlib.ServerName(serverName), + }, + res, + ); err != nil { + return jsonerror.InternalAPIError(req.Context(), err) + } + if err := res.Error; err != nil { + return err.JSONResponse() + } + return util.JSONResponse{ + Code: 200, + JSON: map[string]interface{}{}, + } +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index b411df2c8..0be53fce3 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -167,6 +167,12 @@ func Setup( }), ).Methods(http.MethodPost, http.MethodOptions) + dendriteAdminRouter.Handle("/admin/downloadState/{serverName}/{roomID}", + httputil.MakeAdminAPI("admin_download_state", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + return AdminDownloadState(req, cfg, device, rsAPI) + }), + ).Methods(http.MethodGet, http.MethodOptions) + // server notifications if cfg.Matrix.ServerNotices.Enabled { logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice") diff --git a/roomserver/api/api.go b/roomserver/api/api.go index e50916ca6..d7a44b108 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -151,6 +151,7 @@ type ClientRoomserverAPI interface { PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse) error PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse) error PerformAdminPurgeRoom(ctx context.Context, req *PerformAdminPurgeRoomRequest, res *PerformAdminPurgeRoomResponse) error + PerformAdminDownloadState(ctx context.Context, req *PerformAdminDownloadStateRequest, res *PerformAdminDownloadStateResponse) error PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse) error PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse) error PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go index 2f5f847f3..c97a5ef67 100644 --- a/roomserver/api/api_trace.go +++ b/roomserver/api/api_trace.go @@ -141,6 +141,16 @@ func (t *RoomserverInternalAPITrace) PerformAdminPurgeRoom( return err } +func (t *RoomserverInternalAPITrace) PerformAdminDownloadState( + ctx context.Context, + req *PerformAdminDownloadStateRequest, + res *PerformAdminDownloadStateResponse, +) error { + err := t.Impl.PerformAdminDownloadState(ctx, req, res) + util.GetLogger(ctx).WithError(err).Infof("PerformAdminDownloadState req=%+v res=%+v", js(req), js(res)) + return err +} + func (t *RoomserverInternalAPITrace) PerformInboundPeek( ctx context.Context, req *PerformInboundPeekRequest, diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index ac4b14fdd..84b2b11b7 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -242,3 +242,13 @@ type PerformAdminPurgeRoomRequest struct { type PerformAdminPurgeRoomResponse struct { Error *PerformError `json:"error,omitempty"` } + +type PerformAdminDownloadStateRequest struct { + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + ServerName gomatrixserverlib.ServerName `json:"server_name"` +} + +type PerformAdminDownloadStateResponse struct { + Error *PerformError `json:"error,omitempty"` +} diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 32d63bb51..aa7dc4735 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -439,6 +439,7 @@ type QueryMembershipAtEventRequest struct { // QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest. type QueryMembershipAtEventResponse struct { - // Memberships is a map from eventID to a list of events (if any). + // Memberships is a map from eventID to a list of events (if any). Events that + // do not have known state will return an empty array here. Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"` } diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index a8a3e0248..c47793f0a 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -176,7 +176,8 @@ func (r *Inputer) Start() error { }, nats.HeadersOnly(), nats.DeliverAll(), - nats.AckAll(), + nats.AckExplicit(), + nats.ReplayInstant(), nats.BindStream(r.InputRoomEventTopic), ) return err diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go index 0f55b4c89..e3e6524b9 100644 --- a/roomserver/internal/perform/perform_admin.go +++ b/roomserver/internal/perform/perform_admin.go @@ -267,3 +267,146 @@ func (r *Admin) PerformAdminPurgeRoom( }, }) } + +func (r *Admin) PerformAdminDownloadState( + ctx context.Context, + req *api.PerformAdminDownloadStateRequest, + res *api.PerformAdminDownloadStateResponse, +) error { + roomInfo, err := r.DB.RoomInfo(ctx, req.RoomID) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.DB.RoomInfo: %s", err), + } + return nil + } + + if roomInfo == nil || roomInfo.IsStub() { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("room %q not found", req.RoomID), + } + return nil + } + + fwdExtremities, _, depth, err := r.DB.LatestEventIDs(ctx, roomInfo.RoomNID) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.DB.LatestEventIDs: %s", err), + } + return nil + } + + authEventMap := map[string]*gomatrixserverlib.Event{} + stateEventMap := map[string]*gomatrixserverlib.Event{} + + for _, fwdExtremity := range fwdExtremities { + var state gomatrixserverlib.RespState + state, err = r.Inputer.FSAPI.LookupState(ctx, req.ServerName, req.RoomID, fwdExtremity.EventID, roomInfo.RoomVersion) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.Inputer.FSAPI.LookupState (%q): %s", fwdExtremity.EventID, err), + } + return nil + } + for _, authEvent := range state.AuthEvents.UntrustedEvents(roomInfo.RoomVersion) { + if err = authEvent.VerifyEventSignatures(ctx, r.Inputer.KeyRing); err != nil { + continue + } + authEventMap[authEvent.EventID()] = authEvent + } + for _, stateEvent := range state.StateEvents.UntrustedEvents(roomInfo.RoomVersion) { + if err = stateEvent.VerifyEventSignatures(ctx, r.Inputer.KeyRing); err != nil { + continue + } + stateEventMap[stateEvent.EventID()] = stateEvent + } + } + + authEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(authEventMap)) + stateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEventMap)) + stateIDs := make([]string, 0, len(stateEventMap)) + + for _, authEvent := range authEventMap { + authEvents = append(authEvents, authEvent.Headered(roomInfo.RoomVersion)) + } + for _, stateEvent := range stateEventMap { + stateEvents = append(stateEvents, stateEvent.Headered(roomInfo.RoomVersion)) + stateIDs = append(stateIDs, stateEvent.EventID()) + } + + builder := &gomatrixserverlib.EventBuilder{ + Type: "org.matrix.dendrite.state_download", + Sender: req.UserID, + RoomID: req.RoomID, + Content: gomatrixserverlib.RawJSON("{}"), + } + + eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("gomatrixserverlib.StateNeededForEventBuilder: %s", err), + } + return nil + } + + queryRes := &api.QueryLatestEventsAndStateResponse{ + RoomExists: true, + RoomVersion: roomInfo.RoomVersion, + LatestEvents: fwdExtremities, + StateEvents: stateEvents, + Depth: depth, + } + + ev, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, time.Now(), &eventsNeeded, queryRes) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("eventutil.BuildEvent: %s", err), + } + return nil + } + + inputReq := &api.InputRoomEventsRequest{ + Asynchronous: false, + } + inputRes := &api.InputRoomEventsResponse{} + + for _, authEvent := range append(authEvents, stateEvents...) { + inputReq.InputRoomEvents = append(inputReq.InputRoomEvents, api.InputRoomEvent{ + Kind: api.KindOutlier, + Event: authEvent, + Origin: authEvent.Origin(), + }) + } + + inputReq.InputRoomEvents = append(inputReq.InputRoomEvents, api.InputRoomEvent{ + Kind: api.KindNew, + Event: ev, + Origin: r.Cfg.Matrix.ServerName, + HasState: true, + StateEventIDs: stateIDs, + SendAsServer: string(r.Cfg.Matrix.ServerName), + }) + + if err := r.Inputer.InputRoomEvents(ctx, inputReq, inputRes); err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.Inputer.InputRoomEvents: %s", err), + } + return nil + } + + if inputRes.ErrMsg != "" { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: inputRes.ErrMsg, + } + } + + return nil +} diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index a34ac35e7..b41a92e94 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -208,6 +208,9 @@ func (r *Queryer) QueryMembershipForUser( return err } +// QueryMembershipAtEvent returns the known memberships at a given event. +// If the state before an event is not known, an empty list will be returned +// for that event instead. func (r *Queryer) QueryMembershipAtEvent( ctx context.Context, request *api.QueryMembershipAtEventRequest, diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index 87746f6d4..61ea9ec8a 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -27,19 +27,20 @@ const ( RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents" // Perform operations - RoomserverPerformInvitePath = "/roomserver/performInvite" - RoomserverPerformPeekPath = "/roomserver/performPeek" - RoomserverPerformUnpeekPath = "/roomserver/performUnpeek" - RoomserverPerformRoomUpgradePath = "/roomserver/performRoomUpgrade" - RoomserverPerformJoinPath = "/roomserver/performJoin" - RoomserverPerformLeavePath = "/roomserver/performLeave" - RoomserverPerformBackfillPath = "/roomserver/performBackfill" - RoomserverPerformPublishPath = "/roomserver/performPublish" - RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek" - RoomserverPerformForgetPath = "/roomserver/performForget" - RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom" - RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser" - RoomserverPerformAdminPurgeRoomPath = "/roomserver/performAdminPurgeRoom" + RoomserverPerformInvitePath = "/roomserver/performInvite" + RoomserverPerformPeekPath = "/roomserver/performPeek" + RoomserverPerformUnpeekPath = "/roomserver/performUnpeek" + RoomserverPerformRoomUpgradePath = "/roomserver/performRoomUpgrade" + RoomserverPerformJoinPath = "/roomserver/performJoin" + RoomserverPerformLeavePath = "/roomserver/performLeave" + RoomserverPerformBackfillPath = "/roomserver/performBackfill" + RoomserverPerformPublishPath = "/roomserver/performPublish" + RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek" + RoomserverPerformForgetPath = "/roomserver/performForget" + RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom" + RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser" + RoomserverPerformAdminPurgeRoomPath = "/roomserver/performAdminPurgeRoom" + RoomserverPerformAdminDownloadStatePath = "/roomserver/performAdminDownloadState" // Query operations RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" @@ -262,6 +263,17 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateRoom( ) } +func (h *httpRoomserverInternalAPI) PerformAdminDownloadState( + ctx context.Context, + request *api.PerformAdminDownloadStateRequest, + response *api.PerformAdminDownloadStateResponse, +) error { + return httputil.CallInternalRPCAPI( + "PerformAdminDownloadState", h.roomserverURL+RoomserverPerformAdminDownloadStatePath, + h.httpClient, ctx, request, response, + ) +} + func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser( ctx context.Context, request *api.PerformAdminEvacuateUserRequest, diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go index b936cb99e..f95a3105f 100644 --- a/roomserver/inthttp/server.go +++ b/roomserver/inthttp/server.go @@ -70,6 +70,11 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) { httputil.MakeInternalRPCAPI("RoomserverPerformAdminPurgeRoom", r.PerformAdminPurgeRoom), ) + internalAPIMux.Handle( + RoomserverPerformAdminDownloadStatePath, + httputil.MakeInternalRPCAPI("RoomserverPerformAdminDownloadState", r.PerformAdminDownloadState), + ) + internalAPIMux.Handle( RoomserverQueryPublishedRoomsPath, httputil.MakeInternalRPCAPI("RoomserverQueryPublishedRooms", r.QueryPublishedRooms),