Merge branch 'main' into neilalexander/ristretto

This commit is contained in:
Neil Alexander 2022-07-01 10:55:21 +01:00
commit 86760e90db
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
35 changed files with 444 additions and 160 deletions

View file

@ -17,6 +17,7 @@ jobs:
name: WASM build test name: WASM build test
timeout-minutes: 5 timeout-minutes: 5
runs-on: ubuntu-latest runs-on: ubuntu-latest
if: ${{ false }} # disable for now
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2

View file

@ -21,4 +21,4 @@ mkdir -p bin
CGO_ENABLED=1 go build -trimpath -ldflags "$FLAGS" -v -o "bin/" ./cmd/... CGO_ENABLED=1 go build -trimpath -ldflags "$FLAGS" -v -o "bin/" ./cmd/...
CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone # CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone

View file

@ -47,3 +47,40 @@ func AdminEvacuateRoom(req *http.Request, device *userapi.Device, rsAPI roomserv
}, },
} }
} }
func AdminEvacuateUser(req *http.Request, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse {
if device.AccountType != userapi.AccountTypeAdmin {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("This API can only be used by admin users."),
}
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
userID, ok := vars["userID"]
if !ok {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("Expecting user ID."),
}
}
res := &roomserverAPI.PerformAdminEvacuateUserResponse{}
rsAPI.PerformAdminEvacuateUser(
req.Context(),
&roomserverAPI.PerformAdminEvacuateUserRequest{
UserID: userID,
},
res,
)
if err := res.Error; err != nil {
return err.JSONResponse()
}
return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{
"affected": res.Affected,
},
}
}

View file

@ -129,6 +129,12 @@ func Setup(
}), }),
).Methods(http.MethodGet, http.MethodOptions) ).Methods(http.MethodGet, http.MethodOptions)
dendriteAdminRouter.Handle("/admin/evacuateUser/{userID}",
httputil.MakeAuthAPI("admin_evacuate_user", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return AdminEvacuateUser(req, device, rsAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
// server notifications // server notifications
if cfg.Matrix.ServerNotices.Enabled { if cfg.Matrix.ServerNotices.Enabled {
logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice") logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice")

View file

@ -19,6 +19,12 @@ This endpoint will instruct Dendrite to part all local users from the given `roo
in the URL. It may take some time to complete. A JSON body will be returned containing in the URL. It may take some time to complete. A JSON body will be returned containing
the user IDs of all affected users. the user IDs of all affected users.
## `/_dendrite/admin/evacuateUser/{userID}`
This endpoint will instruct Dendrite to part the given local `userID` in the URL from
all rooms which they are currently joined. A JSON body will be returned containing
the room IDs of all affected rooms.
## `/_synapse/admin/v1/register` ## `/_synapse/admin/v1/register`
Shared secret registration — please see the [user creation page](createusers) for Shared secret registration — please see the [user creation page](createusers) for

View file

@ -133,7 +133,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) b
return true return true
} }
log.Debugf("sending presence EDU to %d servers", len(joined)) log.Tracef("sending presence EDU to %d servers", len(joined))
if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil { if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil {
log.WithError(err).Error("failed to send EDU") log.WithError(err).Error("failed to send EDU")
return false return false

View file

@ -159,7 +159,7 @@ func (p *SyncAPIProducer) SendPresence(
lastActiveTS := gomatrixserverlib.AsTimestamp(time.Now().Add(-(time.Duration(lastActiveAgo) * time.Millisecond))) lastActiveTS := gomatrixserverlib.AsTimestamp(time.Now().Add(-(time.Duration(lastActiveAgo) * time.Millisecond)))
m.Header.Set("last_active_ts", strconv.Itoa(int(lastActiveTS))) m.Header.Set("last_active_ts", strconv.Itoa(int(lastActiveTS)))
log.Debugf("Sending presence to syncAPI: %+v", m.Header) log.Tracef("Sending presence to syncAPI: %+v", m.Header)
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err return err
} }

2
go.mod
View file

@ -25,7 +25,7 @@ require (
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220616095912-8306e9da1829 github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.13 github.com/mattn/go-sqlite3 v1.14.13

6
go.sum
View file

@ -341,8 +341,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220616095912-8306e9da1829 h1:LkZSD1GXtmNHDwa7TfI8CrvoTNvFGhdGHSLDQ4Za/P8= github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f h1:XF2+J6sOq07yhK1I7ItwsgRwXorjj7gqiCvgZ4dn8W8=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220616095912-8306e9da1829/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE=
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
@ -392,8 +392,6 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f h1:Fc+TjdV1mOy0oISSzfoxNWdTqjg7tN/Vdgf+B2cwvdo= github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f h1:Fc+TjdV1mOy0oISSzfoxNWdTqjg7tN/Vdgf+B2cwvdo=
github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4= github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4=
github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e h1:kNIzIzj2OvnlreA+sTJ12nWJzTP3OSLNKDL/Iq9mF6Y=
github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673 h1:TcKfa3Tf0qwUotv63PQVu2d1bBoLi2iEA4RHVMGDh5M= github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673 h1:TcKfa3Tf0qwUotv63PQVu2d1bBoLi2iEA4RHVMGDh5M=
github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 h1:lrVQzBtkeQEGGYUHwSX1XPe1E5GL6U3KYCNe2G4bncQ= github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 h1:lrVQzBtkeQEGGYUHwSX1XPe1E5GL6U3KYCNe2G4bncQ=

View file

@ -140,11 +140,8 @@ type ClientRoomserverAPI interface {
// PerformRoomUpgrade upgrades a room to a newer version // PerformRoomUpgrade upgrades a room to a newer version
PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse) PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse)
PerformAdminEvacuateRoom( PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse)
ctx context.Context, PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse)
req *PerformAdminEvacuateRoomRequest,
res *PerformAdminEvacuateRoomResponse,
)
PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse) PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse)
PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse) PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse)
PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error
@ -161,6 +158,7 @@ type UserRoomserverAPI interface {
QueryLatestEventsAndStateAPI QueryLatestEventsAndStateAPI
QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error
QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error
PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse)
} }
type FederationRoomserverAPI interface { type FederationRoomserverAPI interface {

View file

@ -113,6 +113,15 @@ func (t *RoomserverInternalAPITrace) PerformAdminEvacuateRoom(
util.GetLogger(ctx).Infof("PerformAdminEvacuateRoom req=%+v res=%+v", js(req), js(res)) util.GetLogger(ctx).Infof("PerformAdminEvacuateRoom req=%+v res=%+v", js(req), js(res))
} }
func (t *RoomserverInternalAPITrace) PerformAdminEvacuateUser(
ctx context.Context,
req *PerformAdminEvacuateUserRequest,
res *PerformAdminEvacuateUserResponse,
) {
t.Impl.PerformAdminEvacuateUser(ctx, req, res)
util.GetLogger(ctx).Infof("PerformAdminEvacuateUser req=%+v res=%+v", js(req), js(res))
}
func (t *RoomserverInternalAPITrace) PerformInboundPeek( func (t *RoomserverInternalAPITrace) PerformInboundPeek(
ctx context.Context, ctx context.Context,
req *PerformInboundPeekRequest, req *PerformInboundPeekRequest,

View file

@ -223,3 +223,12 @@ type PerformAdminEvacuateRoomResponse struct {
Affected []string `json:"affected"` Affected []string `json:"affected"`
Error *PerformError Error *PerformError
} }
type PerformAdminEvacuateUserRequest struct {
UserID string `json:"user_id"`
}
type PerformAdminEvacuateUserResponse struct {
Affected []string `json:"affected"`
Error *PerformError
}

View file

@ -216,11 +216,10 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
return err return err
} }
err = api.SendEvents(ctx, r.RSAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false) err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
if err != nil { if err != nil {
return err return err
} }
} }
} }

View file

@ -12,8 +12,10 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/internal/perform" "github.com/matrix-org/dendrite/roomserver/internal/perform"
"github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -49,17 +51,21 @@ type RoomserverInternalAPI struct {
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
Durable string Durable string
InputRoomEventTopic string // JetStream topic for new input room events InputRoomEventTopic string // JetStream topic for new input room events
OutputRoomEventTopic string // JetStream topic for new output room events OutputProducer *producers.RoomEventProducer
PerspectiveServerNames []gomatrixserverlib.ServerName PerspectiveServerNames []gomatrixserverlib.ServerName
} }
func NewRoomserverAPI( func NewRoomserverAPI(
processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database, processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database,
consumer nats.JetStreamContext, nc *nats.Conn, js nats.JetStreamContext, nc *nats.Conn, inputRoomEventTopic string,
inputRoomEventTopic, outputRoomEventTopic string,
caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName, caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName,
) *RoomserverInternalAPI { ) *RoomserverInternalAPI {
serverACLs := acls.NewServerACLs(roomserverDB) serverACLs := acls.NewServerACLs(roomserverDB)
producer := &producers.RoomEventProducer{
Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent)),
JetStream: js,
ACLs: serverACLs,
}
a := &RoomserverInternalAPI{ a := &RoomserverInternalAPI{
ProcessContext: processCtx, ProcessContext: processCtx,
DB: roomserverDB, DB: roomserverDB,
@ -68,8 +74,8 @@ func NewRoomserverAPI(
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
PerspectiveServerNames: perspectiveServerNames, PerspectiveServerNames: perspectiveServerNames,
InputRoomEventTopic: inputRoomEventTopic, InputRoomEventTopic: inputRoomEventTopic,
OutputRoomEventTopic: outputRoomEventTopic, OutputProducer: producer,
JetStream: consumer, JetStream: js,
NATSClient: nc, NATSClient: nc,
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"), Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
ServerACLs: serverACLs, ServerACLs: serverACLs,
@ -92,19 +98,19 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
r.KeyRing = keyRing r.KeyRing = keyRing
r.Inputer = &input.Inputer{ r.Inputer = &input.Inputer{
Cfg: r.Cfg, Cfg: r.Cfg,
ProcessContext: r.ProcessContext, ProcessContext: r.ProcessContext,
DB: r.DB, DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic, InputRoomEventTopic: r.InputRoomEventTopic,
OutputRoomEventTopic: r.OutputRoomEventTopic, OutputProducer: r.OutputProducer,
JetStream: r.JetStream, JetStream: r.JetStream,
NATSClient: r.NATSClient, NATSClient: r.NATSClient,
Durable: nats.Durable(r.Durable), Durable: nats.Durable(r.Durable),
ServerName: r.Cfg.Matrix.ServerName, ServerName: r.Cfg.Matrix.ServerName,
FSAPI: fsAPI, FSAPI: fsAPI,
KeyRing: keyRing, KeyRing: keyRing,
ACLs: r.ServerACLs, ACLs: r.ServerACLs,
Queryer: r.Queryer, Queryer: r.Queryer,
} }
r.Inviter = &perform.Inviter{ r.Inviter = &perform.Inviter{
DB: r.DB, DB: r.DB,
@ -170,6 +176,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
Cfg: r.Cfg, Cfg: r.Cfg,
Inputer: r.Inputer, Inputer: r.Inputer,
Queryer: r.Queryer, Queryer: r.Queryer,
Leaver: r.Leaver,
} }
if err := r.Inputer.Start(); err != nil { if err := r.Inputer.Start(); err != nil {
@ -198,7 +205,7 @@ func (r *RoomserverInternalAPI) PerformInvite(
if len(outputEvents) == 0 { if len(outputEvents) == 0 {
return nil return nil
} }
return r.WriteOutputEvents(req.Event.RoomID(), outputEvents) return r.OutputProducer.ProduceRoomEvents(req.Event.RoomID(), outputEvents)
} }
func (r *RoomserverInternalAPI) PerformLeave( func (r *RoomserverInternalAPI) PerformLeave(
@ -214,7 +221,7 @@ func (r *RoomserverInternalAPI) PerformLeave(
if len(outputEvents) == 0 { if len(outputEvents) == 0 {
return nil return nil
} }
return r.WriteOutputEvents(req.RoomID, outputEvents) return r.OutputProducer.ProduceRoomEvents(req.RoomID, outputEvents)
} }
func (r *RoomserverInternalAPI) PerformForget( func (r *RoomserverInternalAPI) PerformForget(

View file

@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
@ -37,16 +38,8 @@ import (
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
) )
var keyContentFields = map[string]string{
"m.room.join_rules": "join_rule",
"m.room.history_visibility": "history_visibility",
"m.room.member": "membership",
}
// Inputer is responsible for consuming from the roomserver input // Inputer is responsible for consuming from the roomserver input
// streams and processing the events. All input events are queued // streams and processing the events. All input events are queued
// into a single NATS stream and the order is preserved strictly. // into a single NATS stream and the order is preserved strictly.
@ -75,19 +68,19 @@ var keyContentFields = map[string]string{
// up, so they will do nothing until a new event comes in for B // up, so they will do nothing until a new event comes in for B
// or C. // or C.
type Inputer struct { type Inputer struct {
Cfg *config.RoomServer Cfg *config.RoomServer
ProcessContext *process.ProcessContext ProcessContext *process.ProcessContext
DB storage.Database DB storage.Database
NATSClient *nats.Conn NATSClient *nats.Conn
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
Durable nats.SubOpt Durable nats.SubOpt
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
FSAPI fedapi.RoomserverFederationAPI FSAPI fedapi.RoomserverFederationAPI
KeyRing gomatrixserverlib.JSONVerifier KeyRing gomatrixserverlib.JSONVerifier
ACLs *acls.ServerACLs ACLs *acls.ServerACLs
InputRoomEventTopic string InputRoomEventTopic string
OutputRoomEventTopic string OutputProducer *producers.RoomEventProducer
workers sync.Map // room ID -> *worker workers sync.Map // room ID -> *worker
Queryer *query.Queryer Queryer *query.Queryer
} }
@ -370,58 +363,6 @@ func (r *Inputer) InputRoomEvents(
} }
} }
// WriteOutputEvents implements OutputRoomEventWriter
func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
var err error
for _, update := range updates {
msg := &nats.Msg{
Subject: r.OutputRoomEventTopic,
Header: nats.Header{},
}
msg.Header.Set(jetstream.RoomID, roomID)
msg.Data, err = json.Marshal(update)
if err != nil {
return err
}
logger := log.WithFields(log.Fields{
"room_id": roomID,
"type": update.Type,
})
if update.NewRoomEvent != nil {
eventType := update.NewRoomEvent.Event.Type()
logger = logger.WithFields(log.Fields{
"event_type": eventType,
"event_id": update.NewRoomEvent.Event.EventID(),
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
"send_as_server": update.NewRoomEvent.SendAsServer,
"sender": update.NewRoomEvent.Event.Sender(),
})
if update.NewRoomEvent.Event.StateKey() != nil {
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
}
contentKey := keyContentFields[eventType]
if contentKey != "" {
value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
if value.Exists() {
logger = logger.WithField("content_value", value.String())
}
}
if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
ev := update.NewRoomEvent.Event.Unwrap()
defer r.ACLs.OnServerACLUpdate(ev)
}
}
logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic)
if _, err := r.JetStream.PublishMsg(msg); err != nil {
logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err)
return err
}
}
return nil
}
var roomserverInputBackpressure = prometheus.NewGaugeVec( var roomserverInputBackpressure = prometheus.NewGaugeVec(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Namespace: "dendrite", Namespace: "dendrite",

View file

@ -381,7 +381,7 @@ func (r *Inputer) processRoomEvent(
return fmt.Errorf("r.updateLatestEvents: %w", err) return fmt.Errorf("r.updateLatestEvents: %w", err)
} }
case api.KindOld: case api.KindOld:
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
{ {
Type: api.OutputTypeOldRoomEvent, Type: api.OutputTypeOldRoomEvent,
OldRoomEvent: &api.OutputOldRoomEvent{ OldRoomEvent: &api.OutputOldRoomEvent{
@ -400,7 +400,7 @@ func (r *Inputer) processRoomEvent(
// so notify downstream components to redact this event - they should have it if they've // so notify downstream components to redact this event - they should have it if they've
// been tracking our output log. // been tracking our output log.
if redactedEventID != "" { if redactedEventID != "" {
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
{ {
Type: api.OutputTypeRedactedEvent, Type: api.OutputTypeRedactedEvent,
RedactedEvent: &api.OutputRedactedEvent{ RedactedEvent: &api.OutputRedactedEvent{

View file

@ -192,7 +192,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in // send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now. // necessary bookkeeping we'll keep the event sending synchronous for now.
if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID(), updates); err != nil {
return fmt.Errorf("u.api.WriteOutputEvents: %w", err) return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
} }

View file

@ -16,6 +16,7 @@ package perform
import ( import (
"context" "context"
"database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time" "time"
@ -34,6 +35,7 @@ type Admin struct {
Cfg *config.RoomServer Cfg *config.RoomServer
Queryer *query.Queryer Queryer *query.Queryer
Inputer *input.Inputer Inputer *input.Inputer
Leaver *Leaver
} }
// PerformEvacuateRoom will remove all local users from the given room. // PerformEvacuateRoom will remove all local users from the given room.
@ -160,3 +162,71 @@ func (r *Admin) PerformAdminEvacuateRoom(
inputRes := &api.InputRoomEventsResponse{} inputRes := &api.InputRoomEventsResponse{}
r.Inputer.InputRoomEvents(ctx, inputReq, inputRes) r.Inputer.InputRoomEvents(ctx, inputReq, inputRes)
} }
func (r *Admin) PerformAdminEvacuateUser(
ctx context.Context,
req *api.PerformAdminEvacuateUserRequest,
res *api.PerformAdminEvacuateUserResponse,
) {
_, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("Malformed user ID: %s", err),
}
return
}
if domain != r.Cfg.Matrix.ServerName {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: "Can only evacuate local users using this endpoint",
}
return
}
roomIDs, err := r.DB.GetRoomsByMembership(ctx, req.UserID, gomatrixserverlib.Join)
if err != nil && err != sql.ErrNoRows {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.DB.GetRoomsByMembership: %s", err),
}
return
}
inviteRoomIDs, err := r.DB.GetRoomsByMembership(ctx, req.UserID, gomatrixserverlib.Invite)
if err != nil && err != sql.ErrNoRows {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.DB.GetRoomsByMembership: %s", err),
}
return
}
for _, roomID := range append(roomIDs, inviteRoomIDs...) {
leaveReq := &api.PerformLeaveRequest{
RoomID: roomID,
UserID: req.UserID,
}
leaveRes := &api.PerformLeaveResponse{}
outputEvents, err := r.Leaver.PerformLeave(ctx, leaveReq, leaveRes)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.Leaver.PerformLeave: %s", err),
}
return
}
if len(outputEvents) == 0 {
continue
}
if err := r.Inputer.OutputProducer.ProduceRoomEvents(roomID, outputEvents); err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err),
}
return
}
res.Affected = append(res.Affected, roomID)
}
}

View file

@ -18,6 +18,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/getsentry/sentry-go"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -206,8 +207,17 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom
} }
logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result) logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result)
for _, res := range result { for _, res := range result {
if res.Error != nil { switch err := res.Error.(type) {
logger.WithError(res.Error).Warn("event failed PDU checks") case nil:
case gomatrixserverlib.SignatureErr:
// The signature of the event might not be valid anymore, for example if
// the key ID was reused with a different signature.
logger.WithError(err).Errorf("event failed PDU checks, storing anyway")
case gomatrixserverlib.AuthChainErr, gomatrixserverlib.AuthRulesErr:
logger.WithError(err).Warn("event failed PDU checks")
continue
default:
logger.WithError(err).Warn("event failed PDU checks")
continue continue
} }
missingMap[id] = res.Event missingMap[id] = res.Event
@ -306,6 +316,7 @@ FederationHit:
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res
return res, nil return res, nil
} }
sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit
return nil, lastErr return nil, lastErr
} }
@ -366,19 +377,25 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr
} }
} }
c := gomatrixserverlib.FederatedStateProvider{ var lastErr error
FedClient: b.fsAPI, for _, srv := range b.servers {
RememberAuthEvents: false, c := gomatrixserverlib.FederatedStateProvider{
Server: b.servers[0], FedClient: b.fsAPI,
RememberAuthEvents: false,
Server: srv,
}
result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs)
if err != nil {
lastErr = err
continue
}
for eventID, ev := range result {
b.eventIDMap[eventID] = ev
}
return result, nil
} }
result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs) sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit
if err != nil { return nil, lastErr
return nil, err
}
for eventID, ev := range result {
b.eventIDMap[eventID] = ev
}
return result, nil
} }
// ServersAtEvent is called when trying to determine which server to request from. // ServersAtEvent is called when trying to determine which server to request from.

View file

@ -113,7 +113,7 @@ func (r *InboundPeeker) PerformInboundPeek(
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion)) response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
} }
err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{ err = r.Inputer.OutputProducer.ProduceRoomEvents(request.RoomID, []api.OutputEvent{
{ {
Type: api.OutputTypeNewInboundPeek, Type: api.OutputTypeNewInboundPeek,
NewInboundPeek: &api.OutputNewInboundPeek{ NewInboundPeek: &api.OutputNewInboundPeek{

View file

@ -56,7 +56,14 @@ func (r *Inviter) PerformInvite(
return nil, fmt.Errorf("failed to load RoomInfo: %w", err) return nil, fmt.Errorf("failed to load RoomInfo: %w", err)
} }
_, domain, _ := gomatrixserverlib.SplitID('@', targetUserID) _, domain, err := gomatrixserverlib.SplitID('@', targetUserID)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("The user ID %q is invalid!", targetUserID),
}
return nil, nil
}
isTargetLocal := domain == r.Cfg.Matrix.ServerName isTargetLocal := domain == r.Cfg.Matrix.ServerName
isOriginLocal := event.Origin() == r.Cfg.Matrix.ServerName isOriginLocal := event.Origin() == r.Cfg.Matrix.ServerName

View file

@ -207,7 +207,7 @@ func (r *Peeker) performPeekRoomByID(
// TODO: handle federated peeks // TODO: handle federated peeks
err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{ err = r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{
{ {
Type: api.OutputTypeNewPeek, Type: api.OutputTypeNewPeek,
NewPeek: &api.OutputNewPeek{ NewPeek: &api.OutputNewPeek{

View file

@ -96,7 +96,7 @@ func (r *Unpeeker) performUnpeekRoomByID(
// TODO: handle federated peeks // TODO: handle federated peeks
err = r.Inputer.WriteOutputEvents(req.RoomID, []api.OutputEvent{ err = r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{
{ {
Type: api.OutputTypeRetirePeek, Type: api.OutputTypeRetirePeek,
RetirePeek: &api.OutputRetirePeek{ RetirePeek: &api.OutputRetirePeek{

View file

@ -40,6 +40,7 @@ const (
RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek" RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
RoomserverPerformForgetPath = "/roomserver/performForget" RoomserverPerformForgetPath = "/roomserver/performForget"
RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom" RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom"
RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser"
// Query operations // Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@ -305,6 +306,23 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateRoom(
} }
} }
func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser(
ctx context.Context,
req *api.PerformAdminEvacuateUserRequest,
res *api.PerformAdminEvacuateUserResponse,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformAdminEvacuateUser")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverPerformAdminEvacuateUserPath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
if err != nil {
res.Error = &api.PerformError{
Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err),
}
}
}
// QueryLatestEventsAndState implements RoomserverQueryAPI // QueryLatestEventsAndState implements RoomserverQueryAPI
func (h *httpRoomserverInternalAPI) QueryLatestEventsAndState( func (h *httpRoomserverInternalAPI) QueryLatestEventsAndState(
ctx context.Context, ctx context.Context,

View file

@ -129,6 +129,17 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(RoomserverPerformAdminEvacuateUserPath,
httputil.MakeInternalAPI("performAdminEvacuateUser", func(req *http.Request) util.JSONResponse {
var request api.PerformAdminEvacuateUserRequest
var response api.PerformAdminEvacuateUserResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
r.PerformAdminEvacuateUser(req.Context(), &request, &response)
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle( internalAPIMux.Handle(
RoomserverQueryPublishedRoomsPath, RoomserverQueryPublishedRoomsPath,
httputil.MakeInternalAPI("queryPublishedRooms", func(req *http.Request) util.JSONResponse { httputil.MakeInternalAPI("queryPublishedRooms", func(req *http.Request) util.JSONResponse {

View file

@ -0,0 +1,89 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"encoding/json"
"github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
var keyContentFields = map[string]string{
"m.room.join_rules": "join_rule",
"m.room.history_visibility": "history_visibility",
"m.room.member": "membership",
}
type RoomEventProducer struct {
Topic string
ACLs *acls.ServerACLs
JetStream nats.JetStreamContext
}
func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error {
var err error
for _, update := range updates {
msg := &nats.Msg{
Subject: r.Topic,
Header: nats.Header{},
}
msg.Header.Set(jetstream.RoomID, roomID)
msg.Data, err = json.Marshal(update)
if err != nil {
return err
}
logger := log.WithFields(log.Fields{
"room_id": roomID,
"type": update.Type,
})
if update.NewRoomEvent != nil {
eventType := update.NewRoomEvent.Event.Type()
logger = logger.WithFields(log.Fields{
"event_type": eventType,
"event_id": update.NewRoomEvent.Event.EventID(),
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
"send_as_server": update.NewRoomEvent.SendAsServer,
"sender": update.NewRoomEvent.Event.Sender(),
})
if update.NewRoomEvent.Event.StateKey() != nil {
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
}
contentKey := keyContentFields[eventType]
if contentKey != "" {
value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
if value.Exists() {
logger = logger.WithField("content_value", value.String())
}
}
if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
ev := update.NewRoomEvent.Event.Unwrap()
defer r.ACLs.OnServerACLUpdate(ev)
}
}
logger.Tracef("Producing to topic '%s'", r.Topic)
if _, err := r.JetStream.PublishMsg(msg); err != nil {
logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.Topic, err)
return err
}
}
return nil
}

View file

@ -55,7 +55,6 @@ func NewInternalAPI(
return internal.NewRoomserverAPI( return internal.NewRoomserverAPI(
base.ProcessContext, cfg, roomserverDB, js, nc, base.ProcessContext, cfg, roomserverDB, js, nc,
cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames, base.Caches, perspectiveServerNames,
) )
} }

View file

@ -138,7 +138,7 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
presence := msg.Header.Get("presence") presence := msg.Header.Get("presence")
timestamp := msg.Header.Get("last_active_ts") timestamp := msg.Header.Get("last_active_ts")
fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync")) fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync"))
logrus.Debugf("syncAPI received presence event: %+v", msg.Header) logrus.Tracef("syncAPI received presence event: %+v", msg.Header)
if fromSync { // do not process local presence changes; we already did this synchronously. if fromSync { // do not process local presence changes; we already did this synchronously.
return true return true

View file

@ -15,6 +15,7 @@
package routing package routing
import ( import (
"context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -25,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
roomserver "github.com/matrix-org/dendrite/roomserver/api" roomserver "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -149,13 +151,30 @@ func Context(
if len(response.State) > filter.Limit { if len(response.State) > filter.Limit {
response.State = response.State[len(response.State)-filter.Limit:] response.State = response.State[len(response.State)-filter.Limit:]
} }
start, end, err := getStartEnd(ctx, syncDB, eventsBefore, eventsAfter)
if err == nil {
response.End = end.String()
response.Start = start.String()
}
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: response, JSON: response,
} }
} }
func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
if len(startEvents) > 0 {
start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID())
if err != nil {
return
}
}
if len(endEvents) > 0 {
end, err = syncDB.EventPositionInTopology(ctx, endEvents[0].EventID())
}
return
}
func applyLazyLoadMembers( func applyLazyLoadMembers(
device *userapi.Device, device *userapi.Device,
filter *gomatrixserverlib.RoomEventFilter, filter *gomatrixserverlib.RoomEventFilter,

View file

@ -50,7 +50,7 @@ type messagesReq struct {
type messagesResp struct { type messagesResp struct {
Start string `json:"start"` Start string `json:"start"`
StartStream string `json:"start_stream,omitempty"` // NOTSPEC: used by Cerulean, so clients can hit /messages then immediately /sync with a latest sync token StartStream string `json:"start_stream,omitempty"` // NOTSPEC: used by Cerulean, so clients can hit /messages then immediately /sync with a latest sync token
End string `json:"end"` End string `json:"end,omitempty"`
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
State []gomatrixserverlib.ClientEvent `json:"state"` State []gomatrixserverlib.ClientEvent `json:"state"`
} }
@ -200,30 +200,6 @@ func OnIncomingMessagesRequest(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
// at least fetch the membership events for the users returned in chunk if LazyLoadMembers is set
state := []gomatrixserverlib.ClientEvent{}
if filter.LazyLoadMembers {
membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
for _, evt := range clientEvents {
// Don't add membership events the client should already know about
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached {
continue
}
membership, err := db.GetStateEvent(req.Context(), roomID, gomatrixserverlib.MRoomMember, evt.Sender)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("failed to get membership event for user")
continue
}
if membership != nil {
membershipToUser[evt.Sender] = membership
lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID())
}
}
for _, evt := range membershipToUser {
state = append(state, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync))
}
}
util.GetLogger(req.Context()).WithFields(logrus.Fields{ util.GetLogger(req.Context()).WithFields(logrus.Fields{
"from": from.String(), "from": from.String(),
"to": to.String(), "to": to.String(),
@ -237,7 +213,13 @@ func OnIncomingMessagesRequest(
Chunk: clientEvents, Chunk: clientEvents,
Start: start.String(), Start: start.String(),
End: end.String(), End: end.String(),
State: state, }
res.applyLazyLoadMembers(req.Context(), db, roomID, device, filter.LazyLoadMembers, lazyLoadCache)
// If we didn't return any events, set the end to an empty string, so it will be omitted
// in the response JSON.
if len(res.Chunk) == 0 {
res.End = ""
} }
if fromStream != nil { if fromStream != nil {
res.StartStream = fromStream.String() res.StartStream = fromStream.String()
@ -250,6 +232,40 @@ func OnIncomingMessagesRequest(
} }
} }
// applyLazyLoadMembers loads membership events for users returned in Chunk, if the filter has
// LazyLoadMembers enabled.
func (m *messagesResp) applyLazyLoadMembers(
ctx context.Context,
db storage.Database,
roomID string,
device *userapi.Device,
lazyLoad bool,
lazyLoadCache caching.LazyLoadCache,
) {
if !lazyLoad {
return
}
membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
for _, evt := range m.Chunk {
// Don't add membership events the client should already know about
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached {
continue
}
membership, err := db.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, evt.Sender)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("failed to get membership event for user")
continue
}
if membership != nil {
membershipToUser[evt.Sender] = membership
lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID())
}
}
for _, evt := range membershipToUser {
m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync))
}
}
func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (forgotten bool, exists bool, err error) { func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (forgotten bool, exists bool, err error) {
req := api.QueryMembershipForUserRequest{ req := api.QueryMembershipForUserRequest{
RoomID: roomID, RoomID: roomID,

View file

@ -111,7 +111,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
currentlyActive := prevPresence.CurrentlyActive() currentlyActive := prevPresence.CurrentlyActive()
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
if skip { if skip {
req.Log.Debugf("Skipping presence, no change (%s)", presence.UserID) req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID)
continue continue
} }
} }

View file

@ -48,3 +48,4 @@ Notifications can be viewed with GET /notifications
# More flakey # More flakey
If remote user leaves room we no longer receive device updates If remote user leaves room we no longer receive device updates
Guest users can join guest_access rooms

View file

@ -241,7 +241,6 @@ Inbound federation can receive v2 /send_join
Message history can be paginated Message history can be paginated
Backfill works correctly with history visibility set to joined Backfill works correctly with history visibility set to joined
Guest user cannot call /events globally Guest user cannot call /events globally
Guest users can join guest_access rooms
Guest user can set display names Guest user can set display names
Guest user cannot upgrade other users Guest user cannot upgrade other users
Guest non-joined user cannot call /events on shared room Guest non-joined user cannot call /events on shared room

View file

@ -33,6 +33,7 @@ import (
"github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/producers"
@ -49,6 +50,7 @@ type UserInternalAPI struct {
// AppServices is the list of all registered AS // AppServices is the list of all registered AS
AppServices []config.ApplicationService AppServices []config.ApplicationService
KeyAPI keyapi.UserKeyAPI KeyAPI keyapi.UserKeyAPI
RSAPI rsapi.UserRoomserverAPI
} }
func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error { func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error {
@ -452,6 +454,30 @@ func (a *UserInternalAPI) queryAppServiceToken(ctx context.Context, token, appSe
// PerformAccountDeactivation deactivates the user's account, removing all ability for the user to login again. // PerformAccountDeactivation deactivates the user's account, removing all ability for the user to login again.
func (a *UserInternalAPI) PerformAccountDeactivation(ctx context.Context, req *api.PerformAccountDeactivationRequest, res *api.PerformAccountDeactivationResponse) error { func (a *UserInternalAPI) PerformAccountDeactivation(ctx context.Context, req *api.PerformAccountDeactivationRequest, res *api.PerformAccountDeactivationResponse) error {
evacuateReq := &rsapi.PerformAdminEvacuateUserRequest{
UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName),
}
evacuateRes := &rsapi.PerformAdminEvacuateUserResponse{}
a.RSAPI.PerformAdminEvacuateUser(ctx, evacuateReq, evacuateRes)
if err := evacuateRes.Error; err != nil {
logrus.WithError(err).Errorf("Failed to evacuate user after account deactivation")
}
deviceReq := &api.PerformDeviceDeletionRequest{
UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName),
}
deviceRes := &api.PerformDeviceDeletionResponse{}
if err := a.PerformDeviceDeletion(ctx, deviceReq, deviceRes); err != nil {
return err
}
pusherReq := &api.PerformPusherDeletionRequest{
Localpart: req.Localpart,
}
if err := a.PerformPusherDeletion(ctx, pusherReq, &struct{}{}); err != nil {
return err
}
err := a.DB.DeactivateAccount(ctx, req.Localpart) err := a.DB.DeactivateAccount(ctx, req.Localpart)
res.AccountDeactivated = err == nil res.AccountDeactivated = err == nil
return err return err

View file

@ -78,6 +78,7 @@ func NewInternalAPI(
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
AppServices: appServices, AppServices: appServices,
KeyAPI: keyAPI, KeyAPI: keyAPI,
RSAPI: rsAPI,
DisableTLSValidation: cfg.PushGatewayDisableTLSValidation, DisableTLSValidation: cfg.PushGatewayDisableTLSValidation,
} }