mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-07 06:03:09 -06:00
Merge branch 'main' into takwaiw/3782-login-publickey
This commit is contained in:
commit
50d219c237
|
|
@ -47,3 +47,40 @@ func AdminEvacuateRoom(req *http.Request, device *userapi.Device, rsAPI roomserv
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
func AdminEvacuateUser(req *http.Request, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse {
|
||||
if device.AccountType != userapi.AccountTypeAdmin {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusForbidden,
|
||||
JSON: jsonerror.Forbidden("This API can only be used by admin users."),
|
||||
}
|
||||
}
|
||||
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
userID, ok := vars["userID"]
|
||||
if !ok {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.MissingArgument("Expecting user ID."),
|
||||
}
|
||||
}
|
||||
res := &roomserverAPI.PerformAdminEvacuateUserResponse{}
|
||||
rsAPI.PerformAdminEvacuateUser(
|
||||
req.Context(),
|
||||
&roomserverAPI.PerformAdminEvacuateUserRequest{
|
||||
UserID: userID,
|
||||
},
|
||||
res,
|
||||
)
|
||||
if err := res.Error; err != nil {
|
||||
return err.JSONResponse()
|
||||
}
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: map[string]interface{}{
|
||||
"affected": res.Affected,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -129,6 +129,12 @@ func Setup(
|
|||
}),
|
||||
).Methods(http.MethodGet, http.MethodOptions)
|
||||
|
||||
dendriteAdminRouter.Handle("/admin/evacuateUser/{userID}",
|
||||
httputil.MakeAuthAPI("admin_evacuate_user", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
return AdminEvacuateUser(req, device, rsAPI)
|
||||
}),
|
||||
).Methods(http.MethodGet, http.MethodOptions)
|
||||
|
||||
// server notifications
|
||||
if cfg.Matrix.ServerNotices.Enabled {
|
||||
logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice")
|
||||
|
|
|
|||
|
|
@ -19,6 +19,12 @@ This endpoint will instruct Dendrite to part all local users from the given `roo
|
|||
in the URL. It may take some time to complete. A JSON body will be returned containing
|
||||
the user IDs of all affected users.
|
||||
|
||||
## `/_dendrite/admin/evacuateUser/{userID}`
|
||||
|
||||
This endpoint will instruct Dendrite to part the given local `userID` in the URL from
|
||||
all rooms which they are currently joined. A JSON body will be returned containing
|
||||
the room IDs of all affected rooms.
|
||||
|
||||
## `/_synapse/admin/v1/register`
|
||||
|
||||
Shared secret registration — please see the [user creation page](createusers) for
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) b
|
|||
return true
|
||||
}
|
||||
|
||||
log.Debugf("sending presence EDU to %d servers", len(joined))
|
||||
log.Tracef("sending presence EDU to %d servers", len(joined))
|
||||
if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil {
|
||||
log.WithError(err).Error("failed to send EDU")
|
||||
return false
|
||||
|
|
|
|||
|
|
@ -63,6 +63,7 @@ func AddPublicRoutes(
|
|||
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
||||
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
|
||||
TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
||||
TopicDeviceListUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
|
||||
ServerName: cfg.Matrix.ServerName,
|
||||
UserAPI: userAPI,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package producers
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
|
|
@ -34,6 +35,7 @@ type SyncAPIProducer struct {
|
|||
TopicSendToDeviceEvent string
|
||||
TopicTypingEvent string
|
||||
TopicPresenceEvent string
|
||||
TopicDeviceListUpdate string
|
||||
JetStream nats.JetStreamContext
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
UserAPI userapi.UserInternalAPI
|
||||
|
|
@ -157,7 +159,22 @@ func (p *SyncAPIProducer) SendPresence(
|
|||
lastActiveTS := gomatrixserverlib.AsTimestamp(time.Now().Add(-(time.Duration(lastActiveAgo) * time.Millisecond)))
|
||||
|
||||
m.Header.Set("last_active_ts", strconv.Itoa(int(lastActiveTS)))
|
||||
log.Debugf("Sending presence to syncAPI: %+v", m.Header)
|
||||
log.Tracef("Sending presence to syncAPI: %+v", m.Header)
|
||||
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *SyncAPIProducer) SendDeviceListUpdate(
|
||||
ctx context.Context, deviceListUpdate *gomatrixserverlib.DeviceListUpdateEvent,
|
||||
) (err error) {
|
||||
m := nats.NewMsg(p.TopicDeviceListUpdate)
|
||||
m.Header.Set(jetstream.UserID, deviceListUpdate.UserID)
|
||||
m.Data, err = json.Marshal(deviceListUpdate)
|
||||
if err != nil {
|
||||
return fmt.Errorf("json.Marshal: %w", err)
|
||||
}
|
||||
|
||||
log.Debugf("Sending device list update: %+v", m.Header)
|
||||
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,6 +85,9 @@ func GetUserDevices(
|
|||
if targetKey, ok := targetUser[gomatrixserverlib.KeyID(dev.DeviceID)]; ok {
|
||||
for sourceUserID, forSourceUser := range targetKey {
|
||||
for sourceKeyID, sourceKey := range forSourceUser {
|
||||
if device.Keys.Signatures == nil {
|
||||
device.Keys.Signatures = map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{}
|
||||
}
|
||||
if _, ok := device.Keys.Signatures[sourceUserID]; !ok {
|
||||
device.Keys.Signatures[sourceUserID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -501,11 +501,7 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
|
|||
} else if serverName != t.Origin {
|
||||
return
|
||||
}
|
||||
var inputRes keyapi.InputDeviceListUpdateResponse
|
||||
t.keyAPI.InputDeviceListUpdate(context.Background(), &keyapi.InputDeviceListUpdateRequest{
|
||||
Event: payload,
|
||||
}, &inputRes)
|
||||
if inputRes.Error != nil {
|
||||
util.GetLogger(ctx).WithError(inputRes.Error).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
|
||||
if err := t.producer.SendDeviceListUpdate(ctx, &payload); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -2,7 +2,7 @@ module github.com/matrix-org/dendrite
|
|||
|
||||
replace github.com/nats-io/nats-server/v2 => github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f
|
||||
|
||||
replace github.com/nats-io/nats.go => github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e
|
||||
replace github.com/nats-io/nats.go => github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673
|
||||
|
||||
require (
|
||||
github.com/Arceliar/ironwood v0.0.0-20220306165321-319147a02d98
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -661,6 +661,8 @@ github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f h1:
|
|||
github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4=
|
||||
github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e h1:kNIzIzj2OvnlreA+sTJ12nWJzTP3OSLNKDL/Iq9mF6Y=
|
||||
github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673 h1:TcKfa3Tf0qwUotv63PQVu2d1bBoLi2iEA4RHVMGDh5M=
|
||||
github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 h1:lrVQzBtkeQEGGYUHwSX1XPe1E5GL6U3KYCNe2G4bncQ=
|
||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9/go.mod h1:NPHGhPc0/wudcaCqL/H5AOddkRf8GPRhzOujuUKGQu8=
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
|
||||
|
|
|
|||
|
|
@ -62,8 +62,6 @@ type FederationKeyAPI interface {
|
|||
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
|
||||
QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse)
|
||||
QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse)
|
||||
// InputDeviceListUpdate from a federated server EDU
|
||||
InputDeviceListUpdate(ctx context.Context, req *InputDeviceListUpdateRequest, res *InputDeviceListUpdateResponse)
|
||||
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
|
||||
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
|
||||
}
|
||||
|
|
@ -337,11 +335,3 @@ type QuerySignaturesResponse struct {
|
|||
// The request error, if any
|
||||
Error *KeyError
|
||||
}
|
||||
|
||||
type InputDeviceListUpdateRequest struct {
|
||||
Event gomatrixserverlib.DeviceListUpdateEvent
|
||||
}
|
||||
|
||||
type InputDeviceListUpdateResponse struct {
|
||||
Error *KeyError
|
||||
}
|
||||
|
|
|
|||
82
keyserver/consumers/devicelistupdate.go
Normal file
82
keyserver/consumers/devicelistupdate.go
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package consumers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/keyserver/internal"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// DeviceListUpdateConsumer consumes device list updates that came in over federation.
|
||||
type DeviceListUpdateConsumer struct {
|
||||
ctx context.Context
|
||||
jetstream nats.JetStreamContext
|
||||
durable string
|
||||
topic string
|
||||
updater *internal.DeviceListUpdater
|
||||
}
|
||||
|
||||
// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers.
|
||||
func NewDeviceListUpdateConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.KeyServer,
|
||||
js nats.JetStreamContext,
|
||||
updater *internal.DeviceListUpdater,
|
||||
) *DeviceListUpdateConsumer {
|
||||
return &DeviceListUpdateConsumer{
|
||||
ctx: process.Context(),
|
||||
jetstream: js,
|
||||
durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"),
|
||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
|
||||
updater: updater,
|
||||
}
|
||||
}
|
||||
|
||||
// Start consuming from key servers
|
||||
func (t *DeviceListUpdateConsumer) Start() error {
|
||||
return jetstream.JetStreamConsumer(
|
||||
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
|
||||
nats.DeliverAll(), nats.ManualAck(),
|
||||
)
|
||||
}
|
||||
|
||||
// onMessage is called in response to a message received on the
|
||||
// key change events topic from the key server.
|
||||
func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
||||
var m gomatrixserverlib.DeviceListUpdateEvent
|
||||
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
||||
logrus.WithError(err).Errorf("Failed to read from device list update input topic")
|
||||
return true
|
||||
}
|
||||
err := t.updater.Update(ctx, m)
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"user_id": m.UserID,
|
||||
"device_id": m.DeviceID,
|
||||
"stream_id": m.StreamID,
|
||||
"prev_id": m.PrevID,
|
||||
}).WithError(err).Errorf("Failed to update device list")
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
@ -47,17 +47,6 @@ func (a *KeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) {
|
|||
a.UserAPI = i
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) InputDeviceListUpdate(
|
||||
ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse,
|
||||
) {
|
||||
err := a.Updater.Update(ctx, req.Event)
|
||||
if err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: fmt.Sprintf("failed to update device list: %s", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
|
||||
userIDs, latest, err := a.DB.KeyChanges(ctx, req.Offset, req.ToOffset)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -63,20 +63,6 @@ type httpKeyInternalAPI struct {
|
|||
func (h *httpKeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) {
|
||||
// no-op: doesn't need it
|
||||
}
|
||||
func (h *httpKeyInternalAPI) InputDeviceListUpdate(
|
||||
ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse,
|
||||
) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "InputDeviceListUpdate")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.apiURL + InputDeviceListUpdatePath
|
||||
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
||||
if err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: err.Error(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpKeyInternalAPI) PerformClaimKeys(
|
||||
ctx context.Context,
|
||||
|
|
|
|||
|
|
@ -25,17 +25,6 @@ import (
|
|||
)
|
||||
|
||||
func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
|
||||
internalAPIMux.Handle(InputDeviceListUpdatePath,
|
||||
httputil.MakeInternalAPI("inputDeviceListUpdate", func(req *http.Request) util.JSONResponse {
|
||||
request := api.InputDeviceListUpdateRequest{}
|
||||
response := api.InputDeviceListUpdateResponse{}
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
s.InputDeviceListUpdate(req.Context(), &request, &response)
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(PerformClaimKeysPath,
|
||||
httputil.MakeInternalAPI("performClaimKeys", func(req *http.Request) util.JSONResponse {
|
||||
request := api.PerformClaimKeysRequest{}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/gorilla/mux"
|
||||
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/consumers"
|
||||
"github.com/matrix-org/dendrite/keyserver/internal"
|
||||
"github.com/matrix-org/dendrite/keyserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/keyserver/producers"
|
||||
|
|
@ -59,10 +60,17 @@ func NewInternalAPI(
|
|||
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
||||
ap.Updater = updater
|
||||
go func() {
|
||||
if err := updater.Start(); err != nil {
|
||||
if err = updater.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start device list updater")
|
||||
}
|
||||
}()
|
||||
|
||||
dlConsumer := consumers.NewDeviceListUpdateConsumer(
|
||||
base.ProcessContext, cfg, js, updater,
|
||||
)
|
||||
if err = dlConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panic("failed to start device list consumer")
|
||||
}
|
||||
|
||||
return ap
|
||||
}
|
||||
|
|
|
|||
|
|
@ -140,11 +140,8 @@ type ClientRoomserverAPI interface {
|
|||
|
||||
// PerformRoomUpgrade upgrades a room to a newer version
|
||||
PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse)
|
||||
PerformAdminEvacuateRoom(
|
||||
ctx context.Context,
|
||||
req *PerformAdminEvacuateRoomRequest,
|
||||
res *PerformAdminEvacuateRoomResponse,
|
||||
)
|
||||
PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse)
|
||||
PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse)
|
||||
PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse)
|
||||
PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse)
|
||||
PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error
|
||||
|
|
@ -161,6 +158,7 @@ type UserRoomserverAPI interface {
|
|||
QueryLatestEventsAndStateAPI
|
||||
QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error
|
||||
QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error
|
||||
PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse)
|
||||
}
|
||||
|
||||
type FederationRoomserverAPI interface {
|
||||
|
|
|
|||
|
|
@ -113,6 +113,15 @@ func (t *RoomserverInternalAPITrace) PerformAdminEvacuateRoom(
|
|||
util.GetLogger(ctx).Infof("PerformAdminEvacuateRoom req=%+v res=%+v", js(req), js(res))
|
||||
}
|
||||
|
||||
func (t *RoomserverInternalAPITrace) PerformAdminEvacuateUser(
|
||||
ctx context.Context,
|
||||
req *PerformAdminEvacuateUserRequest,
|
||||
res *PerformAdminEvacuateUserResponse,
|
||||
) {
|
||||
t.Impl.PerformAdminEvacuateUser(ctx, req, res)
|
||||
util.GetLogger(ctx).Infof("PerformAdminEvacuateUser req=%+v res=%+v", js(req), js(res))
|
||||
}
|
||||
|
||||
func (t *RoomserverInternalAPITrace) PerformInboundPeek(
|
||||
ctx context.Context,
|
||||
req *PerformInboundPeekRequest,
|
||||
|
|
|
|||
|
|
@ -223,3 +223,12 @@ type PerformAdminEvacuateRoomResponse struct {
|
|||
Affected []string `json:"affected"`
|
||||
Error *PerformError
|
||||
}
|
||||
|
||||
type PerformAdminEvacuateUserRequest struct {
|
||||
UserID string `json:"user_id"`
|
||||
}
|
||||
|
||||
type PerformAdminEvacuateUserResponse struct {
|
||||
Affected []string `json:"affected"`
|
||||
Error *PerformError
|
||||
}
|
||||
|
|
|
|||
|
|
@ -170,6 +170,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
|
|||
Cfg: r.Cfg,
|
||||
Inputer: r.Inputer,
|
||||
Queryer: r.Queryer,
|
||||
Leaver: r.Leaver,
|
||||
}
|
||||
|
||||
if err := r.Inputer.Start(); err != nil {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package perform
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
|
@ -34,6 +35,7 @@ type Admin struct {
|
|||
Cfg *config.RoomServer
|
||||
Queryer *query.Queryer
|
||||
Inputer *input.Inputer
|
||||
Leaver *Leaver
|
||||
}
|
||||
|
||||
// PerformEvacuateRoom will remove all local users from the given room.
|
||||
|
|
@ -160,3 +162,71 @@ func (r *Admin) PerformAdminEvacuateRoom(
|
|||
inputRes := &api.InputRoomEventsResponse{}
|
||||
r.Inputer.InputRoomEvents(ctx, inputReq, inputRes)
|
||||
}
|
||||
|
||||
func (r *Admin) PerformAdminEvacuateUser(
|
||||
ctx context.Context,
|
||||
req *api.PerformAdminEvacuateUserRequest,
|
||||
res *api.PerformAdminEvacuateUserResponse,
|
||||
) {
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||
if err != nil {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("Malformed user ID: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
if domain != r.Cfg.Matrix.ServerName {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: "Can only evacuate local users using this endpoint",
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
roomIDs, err := r.DB.GetRoomsByMembership(ctx, req.UserID, gomatrixserverlib.Join)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("r.DB.GetRoomsByMembership: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
inviteRoomIDs, err := r.DB.GetRoomsByMembership(ctx, req.UserID, gomatrixserverlib.Invite)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("r.DB.GetRoomsByMembership: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for _, roomID := range append(roomIDs, inviteRoomIDs...) {
|
||||
leaveReq := &api.PerformLeaveRequest{
|
||||
RoomID: roomID,
|
||||
UserID: req.UserID,
|
||||
}
|
||||
leaveRes := &api.PerformLeaveResponse{}
|
||||
outputEvents, err := r.Leaver.PerformLeave(ctx, leaveReq, leaveRes)
|
||||
if err != nil {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("r.Leaver.PerformLeave: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(outputEvents) == 0 {
|
||||
continue
|
||||
}
|
||||
if err := r.Inputer.WriteOutputEvents(roomID, outputEvents); err != nil {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
res.Affected = append(res.Affected, roomID)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,7 +56,14 @@ func (r *Inviter) PerformInvite(
|
|||
return nil, fmt.Errorf("failed to load RoomInfo: %w", err)
|
||||
}
|
||||
|
||||
_, domain, _ := gomatrixserverlib.SplitID('@', targetUserID)
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', targetUserID)
|
||||
if err != nil {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("The user ID %q is invalid!", targetUserID),
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
isTargetLocal := domain == r.Cfg.Matrix.ServerName
|
||||
isOriginLocal := event.Origin() == r.Cfg.Matrix.ServerName
|
||||
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ const (
|
|||
RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
|
||||
RoomserverPerformForgetPath = "/roomserver/performForget"
|
||||
RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom"
|
||||
RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser"
|
||||
|
||||
// Query operations
|
||||
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
|
||||
|
|
@ -305,6 +306,23 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateRoom(
|
|||
}
|
||||
}
|
||||
|
||||
func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser(
|
||||
ctx context.Context,
|
||||
req *api.PerformAdminEvacuateUserRequest,
|
||||
res *api.PerformAdminEvacuateUserResponse,
|
||||
) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformAdminEvacuateUser")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.roomserverURL + RoomserverPerformAdminEvacuateUserPath
|
||||
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
||||
if err != nil {
|
||||
res.Error = &api.PerformError{
|
||||
Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// QueryLatestEventsAndState implements RoomserverQueryAPI
|
||||
func (h *httpRoomserverInternalAPI) QueryLatestEventsAndState(
|
||||
ctx context.Context,
|
||||
|
|
|
|||
|
|
@ -129,6 +129,17 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
|
|||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(RoomserverPerformAdminEvacuateUserPath,
|
||||
httputil.MakeInternalAPI("performAdminEvacuateUser", func(req *http.Request) util.JSONResponse {
|
||||
var request api.PerformAdminEvacuateUserRequest
|
||||
var response api.PerformAdminEvacuateUserResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
r.PerformAdminEvacuateUser(req.Context(), &request, &response)
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(
|
||||
RoomserverQueryPublishedRoomsPath,
|
||||
httputil.MakeInternalAPI("queryPublishedRooms", func(req *http.Request) util.JSONResponse {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ const (
|
|||
|
||||
var (
|
||||
InputRoomEvent = "InputRoomEvent"
|
||||
InputDeviceListUpdate = "InputDeviceListUpdate"
|
||||
OutputRoomEvent = "OutputRoomEvent"
|
||||
OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
|
||||
OutputKeyChangeEvent = "OutputKeyChangeEvent"
|
||||
|
|
@ -45,6 +46,11 @@ var streams = []*nats.StreamConfig{
|
|||
Retention: nats.InterestPolicy,
|
||||
Storage: nats.FileStorage,
|
||||
},
|
||||
{
|
||||
Name: InputDeviceListUpdate,
|
||||
Retention: nats.InterestPolicy,
|
||||
Storage: nats.FileStorage,
|
||||
},
|
||||
{
|
||||
Name: OutputRoomEvent,
|
||||
Retention: nats.InterestPolicy,
|
||||
|
|
|
|||
|
|
@ -138,7 +138,7 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
|||
presence := msg.Header.Get("presence")
|
||||
timestamp := msg.Header.Get("last_active_ts")
|
||||
fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync"))
|
||||
logrus.Debugf("syncAPI received presence event: %+v", msg.Header)
|
||||
logrus.Tracef("syncAPI received presence event: %+v", msg.Header)
|
||||
|
||||
if fromSync { // do not process local presence changes; we already did this synchronously.
|
||||
return true
|
||||
|
|
|
|||
|
|
@ -43,9 +43,6 @@ func (k *mockKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneT
|
|||
}
|
||||
func (k *mockKeyAPI) QueryDeviceMessages(ctx context.Context, req *keyapi.QueryDeviceMessagesRequest, res *keyapi.QueryDeviceMessagesResponse) {
|
||||
|
||||
}
|
||||
func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.InputDeviceListUpdateRequest, res *keyapi.InputDeviceListUpdateResponse) {
|
||||
|
||||
}
|
||||
func (k *mockKeyAPI) QuerySignatures(ctx context.Context, req *keyapi.QuerySignaturesRequest, res *keyapi.QuerySignaturesResponse) {
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ type messagesReq struct {
|
|||
type messagesResp struct {
|
||||
Start string `json:"start"`
|
||||
StartStream string `json:"start_stream,omitempty"` // NOTSPEC: used by Cerulean, so clients can hit /messages then immediately /sync with a latest sync token
|
||||
End string `json:"end"`
|
||||
End string `json:"end,omitempty"`
|
||||
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
|
||||
State []gomatrixserverlib.ClientEvent `json:"state"`
|
||||
}
|
||||
|
|
@ -200,30 +200,6 @@ func OnIncomingMessagesRequest(
|
|||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
// at least fetch the membership events for the users returned in chunk if LazyLoadMembers is set
|
||||
state := []gomatrixserverlib.ClientEvent{}
|
||||
if filter.LazyLoadMembers {
|
||||
membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
|
||||
for _, evt := range clientEvents {
|
||||
// Don't add membership events the client should already know about
|
||||
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached {
|
||||
continue
|
||||
}
|
||||
membership, err := db.GetStateEvent(req.Context(), roomID, gomatrixserverlib.MRoomMember, evt.Sender)
|
||||
if err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("failed to get membership event for user")
|
||||
continue
|
||||
}
|
||||
if membership != nil {
|
||||
membershipToUser[evt.Sender] = membership
|
||||
lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID())
|
||||
}
|
||||
}
|
||||
for _, evt := range membershipToUser {
|
||||
state = append(state, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync))
|
||||
}
|
||||
}
|
||||
|
||||
util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
||||
"from": from.String(),
|
||||
"to": to.String(),
|
||||
|
|
@ -237,7 +213,13 @@ func OnIncomingMessagesRequest(
|
|||
Chunk: clientEvents,
|
||||
Start: start.String(),
|
||||
End: end.String(),
|
||||
State: state,
|
||||
}
|
||||
res.applyLazyLoadMembers(req.Context(), db, roomID, device, filter.LazyLoadMembers, lazyLoadCache)
|
||||
|
||||
// If we didn't return any events, set the end to an empty string, so it will be omitted
|
||||
// in the response JSON.
|
||||
if len(res.Chunk) == 0 {
|
||||
res.End = ""
|
||||
}
|
||||
if fromStream != nil {
|
||||
res.StartStream = fromStream.String()
|
||||
|
|
@ -250,6 +232,40 @@ func OnIncomingMessagesRequest(
|
|||
}
|
||||
}
|
||||
|
||||
// applyLazyLoadMembers loads membership events for users returned in Chunk, if the filter has
|
||||
// LazyLoadMembers enabled.
|
||||
func (m *messagesResp) applyLazyLoadMembers(
|
||||
ctx context.Context,
|
||||
db storage.Database,
|
||||
roomID string,
|
||||
device *userapi.Device,
|
||||
lazyLoad bool,
|
||||
lazyLoadCache caching.LazyLoadCache,
|
||||
) {
|
||||
if !lazyLoad {
|
||||
return
|
||||
}
|
||||
membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
|
||||
for _, evt := range m.Chunk {
|
||||
// Don't add membership events the client should already know about
|
||||
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached {
|
||||
continue
|
||||
}
|
||||
membership, err := db.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, evt.Sender)
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("failed to get membership event for user")
|
||||
continue
|
||||
}
|
||||
if membership != nil {
|
||||
membershipToUser[evt.Sender] = membership
|
||||
lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID())
|
||||
}
|
||||
}
|
||||
for _, evt := range membershipToUser {
|
||||
m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync))
|
||||
}
|
||||
}
|
||||
|
||||
func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (forgotten bool, exists bool, err error) {
|
||||
req := api.QueryMembershipForUserRequest{
|
||||
RoomID: roomID,
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
|||
currentlyActive := prevPresence.CurrentlyActive()
|
||||
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
|
||||
if skip {
|
||||
req.Log.Debugf("Skipping presence, no change (%s)", presence.UserID)
|
||||
req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/pushrules"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/dendrite/userapi/producers"
|
||||
|
|
@ -49,6 +50,7 @@ type UserInternalAPI struct {
|
|||
// AppServices is the list of all registered AS
|
||||
AppServices []config.ApplicationService
|
||||
KeyAPI keyapi.UserKeyAPI
|
||||
RSAPI rsapi.UserRoomserverAPI
|
||||
}
|
||||
|
||||
func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error {
|
||||
|
|
@ -452,6 +454,30 @@ func (a *UserInternalAPI) queryAppServiceToken(ctx context.Context, token, appSe
|
|||
|
||||
// PerformAccountDeactivation deactivates the user's account, removing all ability for the user to login again.
|
||||
func (a *UserInternalAPI) PerformAccountDeactivation(ctx context.Context, req *api.PerformAccountDeactivationRequest, res *api.PerformAccountDeactivationResponse) error {
|
||||
evacuateReq := &rsapi.PerformAdminEvacuateUserRequest{
|
||||
UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName),
|
||||
}
|
||||
evacuateRes := &rsapi.PerformAdminEvacuateUserResponse{}
|
||||
a.RSAPI.PerformAdminEvacuateUser(ctx, evacuateReq, evacuateRes)
|
||||
if err := evacuateRes.Error; err != nil {
|
||||
logrus.WithError(err).Errorf("Failed to evacuate user after account deactivation")
|
||||
}
|
||||
|
||||
deviceReq := &api.PerformDeviceDeletionRequest{
|
||||
UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName),
|
||||
}
|
||||
deviceRes := &api.PerformDeviceDeletionResponse{}
|
||||
if err := a.PerformDeviceDeletion(ctx, deviceReq, deviceRes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pusherReq := &api.PerformPusherDeletionRequest{
|
||||
Localpart: req.Localpart,
|
||||
}
|
||||
if err := a.PerformPusherDeletion(ctx, pusherReq, &struct{}{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := a.DB.DeactivateAccount(ctx, req.Localpart)
|
||||
res.AccountDeactivated = err == nil
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -78,6 +78,7 @@ func NewInternalAPI(
|
|||
ServerName: cfg.Matrix.ServerName,
|
||||
AppServices: appServices,
|
||||
KeyAPI: keyAPI,
|
||||
RSAPI: rsAPI,
|
||||
DisableTLSValidation: cfg.PushGatewayDisableTLSValidation,
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue