diff --git a/.github/workflows/dendrite.yml b/.github/workflows/dendrite.yml index 5d60301c7..6ebef4e13 100644 --- a/.github/workflows/dendrite.yml +++ b/.github/workflows/dendrite.yml @@ -17,6 +17,7 @@ jobs: name: WASM build test timeout-minutes: 5 runs-on: ubuntu-latest + if: ${{ false }} # disable for now steps: - uses: actions/checkout@v2 diff --git a/build.sh b/build.sh index 700e6434f..f8b5001bf 100755 --- a/build.sh +++ b/build.sh @@ -21,4 +21,4 @@ mkdir -p bin CGO_ENABLED=1 go build -trimpath -ldflags "$FLAGS" -v -o "bin/" ./cmd/... -CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone +# CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone diff --git a/clientapi/routing/admin.go b/clientapi/routing/admin.go index 125b3847d..523b88c99 100644 --- a/clientapi/routing/admin.go +++ b/clientapi/routing/admin.go @@ -47,3 +47,40 @@ func AdminEvacuateRoom(req *http.Request, device *userapi.Device, rsAPI roomserv }, } } + +func AdminEvacuateUser(req *http.Request, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse { + if device.AccountType != userapi.AccountTypeAdmin { + return util.JSONResponse{ + Code: http.StatusForbidden, + JSON: jsonerror.Forbidden("This API can only be used by admin users."), + } + } + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + userID, ok := vars["userID"] + if !ok { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingArgument("Expecting user ID."), + } + } + res := &roomserverAPI.PerformAdminEvacuateUserResponse{} + rsAPI.PerformAdminEvacuateUser( + req.Context(), + &roomserverAPI.PerformAdminEvacuateUserRequest{ + UserID: userID, + }, + res, + ) + if err := res.Error; err != nil { + return err.JSONResponse() + } + return util.JSONResponse{ + Code: 200, + JSON: map[string]interface{}{ + "affected": res.Affected, + }, + } +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index aa4b5a235..0460850ef 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -129,6 +129,12 @@ func Setup( }), ).Methods(http.MethodGet, http.MethodOptions) + dendriteAdminRouter.Handle("/admin/evacuateUser/{userID}", + httputil.MakeAuthAPI("admin_evacuate_user", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + return AdminEvacuateUser(req, device, rsAPI) + }), + ).Methods(http.MethodGet, http.MethodOptions) + // server notifications if cfg.Matrix.ServerNotices.Enabled { logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice") diff --git a/docs/administration/4_adminapi.md b/docs/administration/4_adminapi.md index e33482ec9..51f56374b 100644 --- a/docs/administration/4_adminapi.md +++ b/docs/administration/4_adminapi.md @@ -19,6 +19,12 @@ This endpoint will instruct Dendrite to part all local users from the given `roo in the URL. It may take some time to complete. A JSON body will be returned containing the user IDs of all affected users. +## `/_dendrite/admin/evacuateUser/{userID}` + +This endpoint will instruct Dendrite to part the given local `userID` in the URL from +all rooms which they are currently joined. A JSON body will be returned containing +the room IDs of all affected rooms. + ## `/_synapse/admin/v1/register` Shared secret registration — please see the [user creation page](createusers) for diff --git a/federationapi/consumers/presence.go b/federationapi/consumers/presence.go index bfce1b28b..a65d2aa04 100644 --- a/federationapi/consumers/presence.go +++ b/federationapi/consumers/presence.go @@ -133,7 +133,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) b return true } - log.Debugf("sending presence EDU to %d servers", len(joined)) + log.Tracef("sending presence EDU to %d servers", len(joined)) if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil { log.WithError(err).Error("failed to send EDU") return false diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go index 6453d026c..e371baaaa 100644 --- a/federationapi/producers/syncapi.go +++ b/federationapi/producers/syncapi.go @@ -159,7 +159,7 @@ func (p *SyncAPIProducer) SendPresence( lastActiveTS := gomatrixserverlib.AsTimestamp(time.Now().Add(-(time.Duration(lastActiveAgo) * time.Millisecond))) m.Header.Set("last_active_ts", strconv.Itoa(int(lastActiveTS))) - log.Debugf("Sending presence to syncAPI: %+v", m.Header) + log.Tracef("Sending presence to syncAPI: %+v", m.Header) _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) return err } diff --git a/go.mod b/go.mod index 0217a8d7f..12e0294f6 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20220616095912-8306e9da1829 + github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 diff --git a/go.sum b/go.sum index 073cc0c1b..0f02f59bb 100644 --- a/go.sum +++ b/go.sum @@ -341,8 +341,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220616095912-8306e9da1829 h1:LkZSD1GXtmNHDwa7TfI8CrvoTNvFGhdGHSLDQ4Za/P8= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220616095912-8306e9da1829/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f h1:XF2+J6sOq07yhK1I7ItwsgRwXorjj7gqiCvgZ4dn8W8= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= @@ -392,8 +392,6 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f h1:Fc+TjdV1mOy0oISSzfoxNWdTqjg7tN/Vdgf+B2cwvdo= github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4= -github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e h1:kNIzIzj2OvnlreA+sTJ12nWJzTP3OSLNKDL/Iq9mF6Y= -github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673 h1:TcKfa3Tf0qwUotv63PQVu2d1bBoLi2iEA4RHVMGDh5M= github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 h1:lrVQzBtkeQEGGYUHwSX1XPe1E5GL6U3KYCNe2G4bncQ= diff --git a/roomserver/api/api.go b/roomserver/api/api.go index f87ff2962..38baa617f 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -140,11 +140,8 @@ type ClientRoomserverAPI interface { // PerformRoomUpgrade upgrades a room to a newer version PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse) - PerformAdminEvacuateRoom( - ctx context.Context, - req *PerformAdminEvacuateRoomRequest, - res *PerformAdminEvacuateRoomResponse, - ) + PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse) + PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse) PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse) PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse) PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error @@ -161,6 +158,7 @@ type UserRoomserverAPI interface { QueryLatestEventsAndStateAPI QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error + PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse) } type FederationRoomserverAPI interface { diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go index 92c5c1b1d..211f320ff 100644 --- a/roomserver/api/api_trace.go +++ b/roomserver/api/api_trace.go @@ -113,6 +113,15 @@ func (t *RoomserverInternalAPITrace) PerformAdminEvacuateRoom( util.GetLogger(ctx).Infof("PerformAdminEvacuateRoom req=%+v res=%+v", js(req), js(res)) } +func (t *RoomserverInternalAPITrace) PerformAdminEvacuateUser( + ctx context.Context, + req *PerformAdminEvacuateUserRequest, + res *PerformAdminEvacuateUserResponse, +) { + t.Impl.PerformAdminEvacuateUser(ctx, req, res) + util.GetLogger(ctx).Infof("PerformAdminEvacuateUser req=%+v res=%+v", js(req), js(res)) +} + func (t *RoomserverInternalAPITrace) PerformInboundPeek( ctx context.Context, req *PerformInboundPeekRequest, diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index 30aa2cf1b..d9ea9dd1c 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -223,3 +223,12 @@ type PerformAdminEvacuateRoomResponse struct { Affected []string `json:"affected"` Error *PerformError } + +type PerformAdminEvacuateUserRequest struct { + UserID string `json:"user_id"` +} + +type PerformAdminEvacuateUserResponse struct { + Affected []string `json:"affected"` + Error *PerformError +} diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go index f47ae47fe..175bb9310 100644 --- a/roomserver/internal/alias.go +++ b/roomserver/internal/alias.go @@ -216,11 +216,10 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( return err } - err = api.SendEvents(ctx, r.RSAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false) + err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false) if err != nil { return err } - } } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index afef52da4..d59b8be7a 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -12,8 +12,10 @@ import ( "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/internal/perform" "github.com/matrix-org/dendrite/roomserver/internal/query" + "github.com/matrix-org/dendrite/roomserver/producers" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -49,17 +51,21 @@ type RoomserverInternalAPI struct { JetStream nats.JetStreamContext Durable string InputRoomEventTopic string // JetStream topic for new input room events - OutputRoomEventTopic string // JetStream topic for new output room events + OutputProducer *producers.RoomEventProducer PerspectiveServerNames []gomatrixserverlib.ServerName } func NewRoomserverAPI( processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database, - consumer nats.JetStreamContext, nc *nats.Conn, - inputRoomEventTopic, outputRoomEventTopic string, + js nats.JetStreamContext, nc *nats.Conn, inputRoomEventTopic string, caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName, ) *RoomserverInternalAPI { serverACLs := acls.NewServerACLs(roomserverDB) + producer := &producers.RoomEventProducer{ + Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent)), + JetStream: js, + ACLs: serverACLs, + } a := &RoomserverInternalAPI{ ProcessContext: processCtx, DB: roomserverDB, @@ -68,8 +74,8 @@ func NewRoomserverAPI( ServerName: cfg.Matrix.ServerName, PerspectiveServerNames: perspectiveServerNames, InputRoomEventTopic: inputRoomEventTopic, - OutputRoomEventTopic: outputRoomEventTopic, - JetStream: consumer, + OutputProducer: producer, + JetStream: js, NATSClient: nc, Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"), ServerACLs: serverACLs, @@ -92,19 +98,19 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio r.KeyRing = keyRing r.Inputer = &input.Inputer{ - Cfg: r.Cfg, - ProcessContext: r.ProcessContext, - DB: r.DB, - InputRoomEventTopic: r.InputRoomEventTopic, - OutputRoomEventTopic: r.OutputRoomEventTopic, - JetStream: r.JetStream, - NATSClient: r.NATSClient, - Durable: nats.Durable(r.Durable), - ServerName: r.Cfg.Matrix.ServerName, - FSAPI: fsAPI, - KeyRing: keyRing, - ACLs: r.ServerACLs, - Queryer: r.Queryer, + Cfg: r.Cfg, + ProcessContext: r.ProcessContext, + DB: r.DB, + InputRoomEventTopic: r.InputRoomEventTopic, + OutputProducer: r.OutputProducer, + JetStream: r.JetStream, + NATSClient: r.NATSClient, + Durable: nats.Durable(r.Durable), + ServerName: r.Cfg.Matrix.ServerName, + FSAPI: fsAPI, + KeyRing: keyRing, + ACLs: r.ServerACLs, + Queryer: r.Queryer, } r.Inviter = &perform.Inviter{ DB: r.DB, @@ -170,6 +176,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio Cfg: r.Cfg, Inputer: r.Inputer, Queryer: r.Queryer, + Leaver: r.Leaver, } if err := r.Inputer.Start(); err != nil { @@ -198,7 +205,7 @@ func (r *RoomserverInternalAPI) PerformInvite( if len(outputEvents) == 0 { return nil } - return r.WriteOutputEvents(req.Event.RoomID(), outputEvents) + return r.OutputProducer.ProduceRoomEvents(req.Event.RoomID(), outputEvents) } func (r *RoomserverInternalAPI) PerformLeave( @@ -214,7 +221,7 @@ func (r *RoomserverInternalAPI) PerformLeave( if len(outputEvents) == 0 { return nil } - return r.WriteOutputEvents(req.RoomID, outputEvents) + return r.OutputProducer.ProduceRoomEvents(req.RoomID, outputEvents) } func (r *RoomserverInternalAPI) PerformForget( diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 600994c5a..fa07c1d2b 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -29,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/query" + "github.com/matrix-org/dendrite/roomserver/producers" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" @@ -37,16 +38,8 @@ import ( "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" - log "github.com/sirupsen/logrus" - "github.com/tidwall/gjson" ) -var keyContentFields = map[string]string{ - "m.room.join_rules": "join_rule", - "m.room.history_visibility": "history_visibility", - "m.room.member": "membership", -} - // Inputer is responsible for consuming from the roomserver input // streams and processing the events. All input events are queued // into a single NATS stream and the order is preserved strictly. @@ -75,19 +68,19 @@ var keyContentFields = map[string]string{ // up, so they will do nothing until a new event comes in for B // or C. type Inputer struct { - Cfg *config.RoomServer - ProcessContext *process.ProcessContext - DB storage.Database - NATSClient *nats.Conn - JetStream nats.JetStreamContext - Durable nats.SubOpt - ServerName gomatrixserverlib.ServerName - FSAPI fedapi.RoomserverFederationAPI - KeyRing gomatrixserverlib.JSONVerifier - ACLs *acls.ServerACLs - InputRoomEventTopic string - OutputRoomEventTopic string - workers sync.Map // room ID -> *worker + Cfg *config.RoomServer + ProcessContext *process.ProcessContext + DB storage.Database + NATSClient *nats.Conn + JetStream nats.JetStreamContext + Durable nats.SubOpt + ServerName gomatrixserverlib.ServerName + FSAPI fedapi.RoomserverFederationAPI + KeyRing gomatrixserverlib.JSONVerifier + ACLs *acls.ServerACLs + InputRoomEventTopic string + OutputProducer *producers.RoomEventProducer + workers sync.Map // room ID -> *worker Queryer *query.Queryer } @@ -370,58 +363,6 @@ func (r *Inputer) InputRoomEvents( } } -// WriteOutputEvents implements OutputRoomEventWriter -func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { - var err error - for _, update := range updates { - msg := &nats.Msg{ - Subject: r.OutputRoomEventTopic, - Header: nats.Header{}, - } - msg.Header.Set(jetstream.RoomID, roomID) - msg.Data, err = json.Marshal(update) - if err != nil { - return err - } - logger := log.WithFields(log.Fields{ - "room_id": roomID, - "type": update.Type, - }) - if update.NewRoomEvent != nil { - eventType := update.NewRoomEvent.Event.Type() - logger = logger.WithFields(log.Fields{ - "event_type": eventType, - "event_id": update.NewRoomEvent.Event.EventID(), - "adds_state": len(update.NewRoomEvent.AddsStateEventIDs), - "removes_state": len(update.NewRoomEvent.RemovesStateEventIDs), - "send_as_server": update.NewRoomEvent.SendAsServer, - "sender": update.NewRoomEvent.Event.Sender(), - }) - if update.NewRoomEvent.Event.StateKey() != nil { - logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey()) - } - contentKey := keyContentFields[eventType] - if contentKey != "" { - value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey) - if value.Exists() { - logger = logger.WithField("content_value", value.String()) - } - } - - if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") { - ev := update.NewRoomEvent.Event.Unwrap() - defer r.ACLs.OnServerACLUpdate(ev) - } - } - logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic) - if _, err := r.JetStream.PublishMsg(msg); err != nil { - logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err) - return err - } - } - return nil -} - var roomserverInputBackpressure = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dendrite", diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index ff05f798c..743b1efe6 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -381,7 +381,7 @@ func (r *Inputer) processRoomEvent( return fmt.Errorf("r.updateLatestEvents: %w", err) } case api.KindOld: - err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ + err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{ { Type: api.OutputTypeOldRoomEvent, OldRoomEvent: &api.OutputOldRoomEvent{ @@ -400,7 +400,7 @@ func (r *Inputer) processRoomEvent( // so notify downstream components to redact this event - they should have it if they've // been tracking our output log. if redactedEventID != "" { - err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ + err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{ { Type: api.OutputTypeRedactedEvent, RedactedEvent: &api.OutputRedactedEvent{ diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index e76f4ba8d..f7d15fdb5 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -192,7 +192,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // send the event asynchronously but we would need to ensure that 1) the events are written to the log in // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { + if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID(), updates); err != nil { return fmt.Errorf("u.api.WriteOutputEvents: %w", err) } diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go index 2de6477cc..1cb52966a 100644 --- a/roomserver/internal/perform/perform_admin.go +++ b/roomserver/internal/perform/perform_admin.go @@ -16,6 +16,7 @@ package perform import ( "context" + "database/sql" "encoding/json" "fmt" "time" @@ -34,6 +35,7 @@ type Admin struct { Cfg *config.RoomServer Queryer *query.Queryer Inputer *input.Inputer + Leaver *Leaver } // PerformEvacuateRoom will remove all local users from the given room. @@ -160,3 +162,71 @@ func (r *Admin) PerformAdminEvacuateRoom( inputRes := &api.InputRoomEventsResponse{} r.Inputer.InputRoomEvents(ctx, inputReq, inputRes) } + +func (r *Admin) PerformAdminEvacuateUser( + ctx context.Context, + req *api.PerformAdminEvacuateUserRequest, + res *api.PerformAdminEvacuateUserResponse, +) { + _, domain, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("Malformed user ID: %s", err), + } + return + } + if domain != r.Cfg.Matrix.ServerName { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: "Can only evacuate local users using this endpoint", + } + return + } + + roomIDs, err := r.DB.GetRoomsByMembership(ctx, req.UserID, gomatrixserverlib.Join) + if err != nil && err != sql.ErrNoRows { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.DB.GetRoomsByMembership: %s", err), + } + return + } + + inviteRoomIDs, err := r.DB.GetRoomsByMembership(ctx, req.UserID, gomatrixserverlib.Invite) + if err != nil && err != sql.ErrNoRows { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.DB.GetRoomsByMembership: %s", err), + } + return + } + + for _, roomID := range append(roomIDs, inviteRoomIDs...) { + leaveReq := &api.PerformLeaveRequest{ + RoomID: roomID, + UserID: req.UserID, + } + leaveRes := &api.PerformLeaveResponse{} + outputEvents, err := r.Leaver.PerformLeave(ctx, leaveReq, leaveRes) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.Leaver.PerformLeave: %s", err), + } + return + } + if len(outputEvents) == 0 { + continue + } + if err := r.Inputer.OutputProducer.ProduceRoomEvents(roomID, outputEvents); err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err), + } + return + } + + res.Affected = append(res.Affected, roomID) + } +} diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index 1bc4c75ce..9eddca733 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -18,6 +18,7 @@ import ( "context" "fmt" + "github.com/getsentry/sentry-go" federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" @@ -206,8 +207,17 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom } logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result) for _, res := range result { - if res.Error != nil { - logger.WithError(res.Error).Warn("event failed PDU checks") + switch err := res.Error.(type) { + case nil: + case gomatrixserverlib.SignatureErr: + // The signature of the event might not be valid anymore, for example if + // the key ID was reused with a different signature. + logger.WithError(err).Errorf("event failed PDU checks, storing anyway") + case gomatrixserverlib.AuthChainErr, gomatrixserverlib.AuthRulesErr: + logger.WithError(err).Warn("event failed PDU checks") + continue + default: + logger.WithError(err).Warn("event failed PDU checks") continue } missingMap[id] = res.Event @@ -306,6 +316,7 @@ FederationHit: b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res return res, nil } + sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit return nil, lastErr } @@ -366,19 +377,25 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr } } - c := gomatrixserverlib.FederatedStateProvider{ - FedClient: b.fsAPI, - RememberAuthEvents: false, - Server: b.servers[0], + var lastErr error + for _, srv := range b.servers { + c := gomatrixserverlib.FederatedStateProvider{ + FedClient: b.fsAPI, + RememberAuthEvents: false, + Server: srv, + } + result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs) + if err != nil { + lastErr = err + continue + } + for eventID, ev := range result { + b.eventIDMap[eventID] = ev + } + return result, nil } - result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs) - if err != nil { - return nil, err - } - for eventID, ev := range result { - b.eventIDMap[eventID] = ev - } - return result, nil + sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit + return nil, lastErr } // ServersAtEvent is called when trying to determine which server to request from. diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go index d19fc8386..32c81e849 100644 --- a/roomserver/internal/perform/perform_inbound_peek.go +++ b/roomserver/internal/perform/perform_inbound_peek.go @@ -113,7 +113,7 @@ func (r *InboundPeeker) PerformInboundPeek( response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion)) } - err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{ + err = r.Inputer.OutputProducer.ProduceRoomEvents(request.RoomID, []api.OutputEvent{ { Type: api.OutputTypeNewInboundPeek, NewInboundPeek: &api.OutputNewInboundPeek{ diff --git a/roomserver/internal/perform/perform_invite.go b/roomserver/internal/perform/perform_invite.go index b0148a314..644c954b6 100644 --- a/roomserver/internal/perform/perform_invite.go +++ b/roomserver/internal/perform/perform_invite.go @@ -56,7 +56,14 @@ func (r *Inviter) PerformInvite( return nil, fmt.Errorf("failed to load RoomInfo: %w", err) } - _, domain, _ := gomatrixserverlib.SplitID('@', targetUserID) + _, domain, err := gomatrixserverlib.SplitID('@', targetUserID) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("The user ID %q is invalid!", targetUserID), + } + return nil, nil + } isTargetLocal := domain == r.Cfg.Matrix.ServerName isOriginLocal := event.Origin() == r.Cfg.Matrix.ServerName diff --git a/roomserver/internal/perform/perform_peek.go b/roomserver/internal/perform/perform_peek.go index 45e63888d..5560916b2 100644 --- a/roomserver/internal/perform/perform_peek.go +++ b/roomserver/internal/perform/perform_peek.go @@ -207,7 +207,7 @@ func (r *Peeker) performPeekRoomByID( // TODO: handle federated peeks - err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{ + err = r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{ { Type: api.OutputTypeNewPeek, NewPeek: &api.OutputNewPeek{ diff --git a/roomserver/internal/perform/perform_unpeek.go b/roomserver/internal/perform/perform_unpeek.go index 1057499cb..1fe8d5a0f 100644 --- a/roomserver/internal/perform/perform_unpeek.go +++ b/roomserver/internal/perform/perform_unpeek.go @@ -96,7 +96,7 @@ func (r *Unpeeker) performUnpeekRoomByID( // TODO: handle federated peeks - err = r.Inputer.WriteOutputEvents(req.RoomID, []api.OutputEvent{ + err = r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{ { Type: api.OutputTypeRetirePeek, RetirePeek: &api.OutputRetirePeek{ diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index 7b10ae657..2fa8afc49 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -40,6 +40,7 @@ const ( RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek" RoomserverPerformForgetPath = "/roomserver/performForget" RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom" + RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser" // Query operations RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" @@ -305,6 +306,23 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateRoom( } } +func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser( + ctx context.Context, + req *api.PerformAdminEvacuateUserRequest, + res *api.PerformAdminEvacuateUserResponse, +) { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformAdminEvacuateUser") + defer span.Finish() + + apiURL := h.roomserverURL + RoomserverPerformAdminEvacuateUserPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) + if err != nil { + res.Error = &api.PerformError{ + Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err), + } + } +} + // QueryLatestEventsAndState implements RoomserverQueryAPI func (h *httpRoomserverInternalAPI) QueryLatestEventsAndState( ctx context.Context, diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go index ad4fdc469..993381585 100644 --- a/roomserver/inthttp/server.go +++ b/roomserver/inthttp/server.go @@ -129,6 +129,17 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(RoomserverPerformAdminEvacuateUserPath, + httputil.MakeInternalAPI("performAdminEvacuateUser", func(req *http.Request) util.JSONResponse { + var request api.PerformAdminEvacuateUserRequest + var response api.PerformAdminEvacuateUserResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + r.PerformAdminEvacuateUser(req.Context(), &request, &response) + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) internalAPIMux.Handle( RoomserverQueryPublishedRoomsPath, httputil.MakeInternalAPI("queryPublishedRooms", func(req *http.Request) util.JSONResponse { diff --git a/roomserver/producers/roomevent.go b/roomserver/producers/roomevent.go new file mode 100644 index 000000000..987e6c942 --- /dev/null +++ b/roomserver/producers/roomevent.go @@ -0,0 +1,89 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + "github.com/matrix-org/dendrite/roomserver/acls" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" +) + +var keyContentFields = map[string]string{ + "m.room.join_rules": "join_rule", + "m.room.history_visibility": "history_visibility", + "m.room.member": "membership", +} + +type RoomEventProducer struct { + Topic string + ACLs *acls.ServerACLs + JetStream nats.JetStreamContext +} + +func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error { + var err error + for _, update := range updates { + msg := &nats.Msg{ + Subject: r.Topic, + Header: nats.Header{}, + } + msg.Header.Set(jetstream.RoomID, roomID) + msg.Data, err = json.Marshal(update) + if err != nil { + return err + } + logger := log.WithFields(log.Fields{ + "room_id": roomID, + "type": update.Type, + }) + if update.NewRoomEvent != nil { + eventType := update.NewRoomEvent.Event.Type() + logger = logger.WithFields(log.Fields{ + "event_type": eventType, + "event_id": update.NewRoomEvent.Event.EventID(), + "adds_state": len(update.NewRoomEvent.AddsStateEventIDs), + "removes_state": len(update.NewRoomEvent.RemovesStateEventIDs), + "send_as_server": update.NewRoomEvent.SendAsServer, + "sender": update.NewRoomEvent.Event.Sender(), + }) + if update.NewRoomEvent.Event.StateKey() != nil { + logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey()) + } + contentKey := keyContentFields[eventType] + if contentKey != "" { + value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey) + if value.Exists() { + logger = logger.WithField("content_value", value.String()) + } + } + + if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") { + ev := update.NewRoomEvent.Event.Unwrap() + defer r.ACLs.OnServerACLUpdate(ev) + } + } + logger.Tracef("Producing to topic '%s'", r.Topic) + if _, err := r.JetStream.PublishMsg(msg); err != nil { + logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.Topic, err) + return err + } + } + return nil +} diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 1480e8942..eb68100fe 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -55,7 +55,6 @@ func NewInternalAPI( return internal.NewRoomserverAPI( base.ProcessContext, cfg, roomserverDB, js, nc, cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), - cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), base.Caches, perspectiveServerNames, ) } diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go index bfd72d604..0217e1956 100644 --- a/syncapi/consumers/presence.go +++ b/syncapi/consumers/presence.go @@ -138,7 +138,7 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { presence := msg.Header.Get("presence") timestamp := msg.Header.Get("last_active_ts") fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync")) - logrus.Debugf("syncAPI received presence event: %+v", msg.Header) + logrus.Tracef("syncAPI received presence event: %+v", msg.Header) if fromSync { // do not process local presence changes; we already did this synchronously. return true diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index d021d365d..f6b4d15e0 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -15,6 +15,7 @@ package routing import ( + "context" "database/sql" "encoding/json" "fmt" @@ -25,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/internal/caching" roomserver "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -149,13 +151,30 @@ func Context( if len(response.State) > filter.Limit { response.State = response.State[len(response.State)-filter.Limit:] } - + start, end, err := getStartEnd(ctx, syncDB, eventsBefore, eventsAfter) + if err == nil { + response.End = end.String() + response.Start = start.String() + } return util.JSONResponse{ Code: http.StatusOK, JSON: response, } } +func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { + if len(startEvents) > 0 { + start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID()) + if err != nil { + return + } + } + if len(endEvents) > 0 { + end, err = syncDB.EventPositionInTopology(ctx, endEvents[0].EventID()) + } + return +} + func applyLazyLoadMembers( device *userapi.Device, filter *gomatrixserverlib.RoomEventFilter, diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index e55c661d6..24745cd55 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -50,7 +50,7 @@ type messagesReq struct { type messagesResp struct { Start string `json:"start"` StartStream string `json:"start_stream,omitempty"` // NOTSPEC: used by Cerulean, so clients can hit /messages then immediately /sync with a latest sync token - End string `json:"end"` + End string `json:"end,omitempty"` Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` State []gomatrixserverlib.ClientEvent `json:"state"` } @@ -200,30 +200,6 @@ func OnIncomingMessagesRequest( return jsonerror.InternalServerError() } - // at least fetch the membership events for the users returned in chunk if LazyLoadMembers is set - state := []gomatrixserverlib.ClientEvent{} - if filter.LazyLoadMembers { - membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent) - for _, evt := range clientEvents { - // Don't add membership events the client should already know about - if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached { - continue - } - membership, err := db.GetStateEvent(req.Context(), roomID, gomatrixserverlib.MRoomMember, evt.Sender) - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("failed to get membership event for user") - continue - } - if membership != nil { - membershipToUser[evt.Sender] = membership - lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID()) - } - } - for _, evt := range membershipToUser { - state = append(state, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync)) - } - } - util.GetLogger(req.Context()).WithFields(logrus.Fields{ "from": from.String(), "to": to.String(), @@ -237,7 +213,13 @@ func OnIncomingMessagesRequest( Chunk: clientEvents, Start: start.String(), End: end.String(), - State: state, + } + res.applyLazyLoadMembers(req.Context(), db, roomID, device, filter.LazyLoadMembers, lazyLoadCache) + + // If we didn't return any events, set the end to an empty string, so it will be omitted + // in the response JSON. + if len(res.Chunk) == 0 { + res.End = "" } if fromStream != nil { res.StartStream = fromStream.String() @@ -250,6 +232,40 @@ func OnIncomingMessagesRequest( } } +// applyLazyLoadMembers loads membership events for users returned in Chunk, if the filter has +// LazyLoadMembers enabled. +func (m *messagesResp) applyLazyLoadMembers( + ctx context.Context, + db storage.Database, + roomID string, + device *userapi.Device, + lazyLoad bool, + lazyLoadCache caching.LazyLoadCache, +) { + if !lazyLoad { + return + } + membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent) + for _, evt := range m.Chunk { + // Don't add membership events the client should already know about + if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached { + continue + } + membership, err := db.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, evt.Sender) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("failed to get membership event for user") + continue + } + if membership != nil { + membershipToUser[evt.Sender] = membership + lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID()) + } + } + for _, evt := range membershipToUser { + m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync)) + } +} + func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (forgotten bool, exists bool, err error) { req := api.QueryMembershipForUserRequest{ RoomID: roomID, diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 35ce53cb6..877bcf141 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -111,7 +111,7 @@ func (p *PresenceStreamProvider) IncrementalSync( currentlyActive := prevPresence.CurrentlyActive() skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID if skip { - req.Log.Debugf("Skipping presence, no change (%s)", presence.UserID) + req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID) continue } } diff --git a/sytest-blacklist b/sytest-blacklist index be0826eee..bcc345f6e 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -48,3 +48,4 @@ Notifications can be viewed with GET /notifications # More flakey If remote user leaves room we no longer receive device updates +Guest users can join guest_access rooms diff --git a/sytest-whitelist b/sytest-whitelist index 60a3b73f6..ea25c75d0 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -241,7 +241,6 @@ Inbound federation can receive v2 /send_join Message history can be paginated Backfill works correctly with history visibility set to joined Guest user cannot call /events globally -Guest users can join guest_access rooms Guest user can set display names Guest user cannot upgrade other users Guest non-joined user cannot call /events on shared room diff --git a/userapi/internal/api.go b/userapi/internal/api.go index 9d2f63c72..27ed15a01 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -33,6 +33,7 @@ import ( "github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/sqlutil" keyapi "github.com/matrix-org/dendrite/keyserver/api" + rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/producers" @@ -49,6 +50,7 @@ type UserInternalAPI struct { // AppServices is the list of all registered AS AppServices []config.ApplicationService KeyAPI keyapi.UserKeyAPI + RSAPI rsapi.UserRoomserverAPI } func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error { @@ -452,6 +454,30 @@ func (a *UserInternalAPI) queryAppServiceToken(ctx context.Context, token, appSe // PerformAccountDeactivation deactivates the user's account, removing all ability for the user to login again. func (a *UserInternalAPI) PerformAccountDeactivation(ctx context.Context, req *api.PerformAccountDeactivationRequest, res *api.PerformAccountDeactivationResponse) error { + evacuateReq := &rsapi.PerformAdminEvacuateUserRequest{ + UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName), + } + evacuateRes := &rsapi.PerformAdminEvacuateUserResponse{} + a.RSAPI.PerformAdminEvacuateUser(ctx, evacuateReq, evacuateRes) + if err := evacuateRes.Error; err != nil { + logrus.WithError(err).Errorf("Failed to evacuate user after account deactivation") + } + + deviceReq := &api.PerformDeviceDeletionRequest{ + UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName), + } + deviceRes := &api.PerformDeviceDeletionResponse{} + if err := a.PerformDeviceDeletion(ctx, deviceReq, deviceRes); err != nil { + return err + } + + pusherReq := &api.PerformPusherDeletionRequest{ + Localpart: req.Localpart, + } + if err := a.PerformPusherDeletion(ctx, pusherReq, &struct{}{}); err != nil { + return err + } + err := a.DB.DeactivateAccount(ctx, req.Localpart) res.AccountDeactivated = err == nil return err diff --git a/userapi/userapi.go b/userapi/userapi.go index 603b416bf..2e86d6aa7 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -78,6 +78,7 @@ func NewInternalAPI( ServerName: cfg.Matrix.ServerName, AppServices: appServices, KeyAPI: keyAPI, + RSAPI: rsAPI, DisableTLSValidation: cfg.PushGatewayDisableTLSValidation, }