mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-07 06:03:09 -06:00
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/migrate
This commit is contained in:
commit
a3216c78c3
1
.github/workflows/dendrite.yml
vendored
1
.github/workflows/dendrite.yml
vendored
|
|
@ -17,6 +17,7 @@ jobs:
|
|||
name: WASM build test
|
||||
timeout-minutes: 5
|
||||
runs-on: ubuntu-latest
|
||||
if: ${{ false }} # disable for now
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
|
|
|
|||
17
CHANGES.md
17
CHANGES.md
|
|
@ -1,5 +1,22 @@
|
|||
# Changelog
|
||||
|
||||
## Dendrite 0.8.9 (2022-07-01)
|
||||
|
||||
### Features
|
||||
|
||||
* Incoming device list updates over federation are now queued in JetStream for processing so that they will no longer block incoming federation transactions and should never end up dropped, which will hopefully help E2EE reliability
|
||||
* The `/context` endpoint now returns `"start"` and `"end"` parameters to allow pagination from a context call
|
||||
* The `/messages` endpoint will no longer return `"end"` when there are no more messages remaining
|
||||
* Deactivated user accounts will now leave all rooms automatically
|
||||
* New admin endpoint `/_dendrite/admin/evacuateUser/{userID}` has been added for forcing a local user to leave all joined rooms
|
||||
* Dendrite will now automatically attempt to raise the file descriptor limit at startup if it is too low
|
||||
|
||||
### Fixes
|
||||
|
||||
* A rare crash when retrieving remote device lists has been fixed
|
||||
* Fixes a bug where events were not redacted properly over federation
|
||||
* The `/invite` endpoints will now return an error instead of silently proceeding if the user ID is obviously malformed
|
||||
|
||||
## Dendrite 0.8.8 (2022-06-09)
|
||||
|
||||
### Features
|
||||
|
|
|
|||
2
build.sh
2
build.sh
|
|
@ -21,4 +21,4 @@ mkdir -p bin
|
|||
|
||||
CGO_ENABLED=1 go build -trimpath -ldflags "$FLAGS" -v -o "bin/" ./cmd/...
|
||||
|
||||
CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone
|
||||
# CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package producers
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
|
|
@ -83,7 +84,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
|||
m.Header.Set(jetstream.RoomID, roomID)
|
||||
m.Header.Set(jetstream.EventID, eventID)
|
||||
m.Header.Set("type", receiptType)
|
||||
m.Header.Set("timestamp", strconv.Itoa(int(timestamp)))
|
||||
m.Header.Set("timestamp", fmt.Sprintf("%d", timestamp))
|
||||
|
||||
log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent)
|
||||
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||
|
|
|
|||
|
|
@ -47,3 +47,40 @@ func AdminEvacuateRoom(req *http.Request, device *userapi.Device, rsAPI roomserv
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
func AdminEvacuateUser(req *http.Request, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse {
|
||||
if device.AccountType != userapi.AccountTypeAdmin {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusForbidden,
|
||||
JSON: jsonerror.Forbidden("This API can only be used by admin users."),
|
||||
}
|
||||
}
|
||||
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
userID, ok := vars["userID"]
|
||||
if !ok {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.MissingArgument("Expecting user ID."),
|
||||
}
|
||||
}
|
||||
res := &roomserverAPI.PerformAdminEvacuateUserResponse{}
|
||||
rsAPI.PerformAdminEvacuateUser(
|
||||
req.Context(),
|
||||
&roomserverAPI.PerformAdminEvacuateUserRequest{
|
||||
UserID: userID,
|
||||
},
|
||||
res,
|
||||
)
|
||||
if err := res.Error; err != nil {
|
||||
return err.JSONResponse()
|
||||
}
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: map[string]interface{}{
|
||||
"affected": res.Affected,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -129,6 +129,12 @@ func Setup(
|
|||
}),
|
||||
).Methods(http.MethodGet, http.MethodOptions)
|
||||
|
||||
dendriteAdminRouter.Handle("/admin/evacuateUser/{userID}",
|
||||
httputil.MakeAuthAPI("admin_evacuate_user", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
return AdminEvacuateUser(req, device, rsAPI)
|
||||
}),
|
||||
).Methods(http.MethodGet, http.MethodOptions)
|
||||
|
||||
// server notifications
|
||||
if cfg.Matrix.ServerNotices.Enabled {
|
||||
logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice")
|
||||
|
|
|
|||
|
|
@ -19,6 +19,12 @@ This endpoint will instruct Dendrite to part all local users from the given `roo
|
|||
in the URL. It may take some time to complete. A JSON body will be returned containing
|
||||
the user IDs of all affected users.
|
||||
|
||||
## `/_dendrite/admin/evacuateUser/{userID}`
|
||||
|
||||
This endpoint will instruct Dendrite to part the given local `userID` in the URL from
|
||||
all rooms which they are currently joined. A JSON body will be returned containing
|
||||
the room IDs of all affected rooms.
|
||||
|
||||
## `/_synapse/admin/v1/register`
|
||||
|
||||
Shared secret registration — please see the [user creation page](createusers) for
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) b
|
|||
return true
|
||||
}
|
||||
|
||||
log.Debugf("sending presence EDU to %d servers", len(joined))
|
||||
log.Tracef("sending presence EDU to %d servers", len(joined))
|
||||
if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil {
|
||||
log.WithError(err).Error("failed to send EDU")
|
||||
return false
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msg *nats.Msg) bo
|
|||
return true
|
||||
}
|
||||
|
||||
timestamp, err := strconv.Atoi(msg.Header.Get("timestamp"))
|
||||
timestamp, err := strconv.ParseUint(msg.Header.Get("timestamp"), 10, 64)
|
||||
if err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("EDU output log: message parse failure")
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
|||
m.Header.Set(jetstream.RoomID, roomID)
|
||||
m.Header.Set(jetstream.EventID, eventID)
|
||||
m.Header.Set("type", receiptType)
|
||||
m.Header.Set("timestamp", strconv.Itoa(int(timestamp)))
|
||||
m.Header.Set("timestamp", fmt.Sprintf("%d", timestamp))
|
||||
|
||||
log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent)
|
||||
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||
|
|
@ -159,7 +159,7 @@ func (p *SyncAPIProducer) SendPresence(
|
|||
lastActiveTS := gomatrixserverlib.AsTimestamp(time.Now().Add(-(time.Duration(lastActiveAgo) * time.Millisecond)))
|
||||
|
||||
m.Header.Set("last_active_ts", strconv.Itoa(int(lastActiveTS)))
|
||||
log.Debugf("Sending presence to syncAPI: %+v", m.Header)
|
||||
log.Tracef("Sending presence to syncAPI: %+v", m.Header)
|
||||
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
4
go.mod
4
go.mod
|
|
@ -2,7 +2,7 @@ module github.com/matrix-org/dendrite
|
|||
|
||||
replace github.com/nats-io/nats-server/v2 => github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f
|
||||
|
||||
replace github.com/nats-io/nats.go => github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e
|
||||
replace github.com/nats-io/nats.go => github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673
|
||||
|
||||
require (
|
||||
github.com/Arceliar/ironwood v0.0.0-20220306165321-319147a02d98
|
||||
|
|
@ -34,7 +34,7 @@ require (
|
|||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f
|
||||
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||
github.com/mattn/go-sqlite3 v1.14.13
|
||||
|
|
|
|||
8
go.sum
8
go.sum
|
|
@ -418,8 +418,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
|
|||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a h1:jOkrb6twViAGTHHadA51sQwdloHT0Vx1MCptk9InTHo=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f h1:XF2+J6sOq07yhK1I7ItwsgRwXorjj7gqiCvgZ4dn8W8=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
|
||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||
|
|
@ -482,8 +482,8 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE
|
|||
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
|
||||
github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f h1:Fc+TjdV1mOy0oISSzfoxNWdTqjg7tN/Vdgf+B2cwvdo=
|
||||
github.com/neilalexander/nats-server/v2 v2.8.3-0.20220513095553-73a9a246d34f/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4=
|
||||
github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e h1:kNIzIzj2OvnlreA+sTJ12nWJzTP3OSLNKDL/Iq9mF6Y=
|
||||
github.com/neilalexander/nats.go v1.13.1-0.20220419101051-b262d9f0be1e/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673 h1:TcKfa3Tf0qwUotv63PQVu2d1bBoLi2iEA4RHVMGDh5M=
|
||||
github.com/neilalexander/nats.go v1.13.1-0.20220621084451-ac518c356673/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 h1:lrVQzBtkeQEGGYUHwSX1XPe1E5GL6U3KYCNe2G4bncQ=
|
||||
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9/go.mod h1:NPHGhPc0/wudcaCqL/H5AOddkRf8GPRhzOujuUKGQu8=
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ var build string
|
|||
const (
|
||||
VersionMajor = 0
|
||||
VersionMinor = 8
|
||||
VersionPatch = 8
|
||||
VersionPatch = 9
|
||||
VersionTag = "" // example: "rc1"
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,36 +1,26 @@
|
|||
package storage
|
||||
package storage_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||
"github.com/matrix-org/dendrite/keyserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/matrix-org/dendrite/test/testrig"
|
||||
)
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
func MustCreateDatabase(t *testing.T) (Database, func()) {
|
||||
tmpfile, err := ioutil.TempFile("", "keyserver_storage_test")
|
||||
func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
|
||||
base, close := testrig.CreateBaseDendrite(t, dbType)
|
||||
db, err := storage.NewDatabase(base, &base.Cfg.KeyServer.Database)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
t.Logf("Database %s", tmpfile.Name())
|
||||
db, err := NewDatabase(nil, &config.DatabaseOptions{
|
||||
ConnectionString: config.DataSource(fmt.Sprintf("file://%s", tmpfile.Name())),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to NewDatabase: %s", err)
|
||||
}
|
||||
return db, func() {
|
||||
os.Remove(tmpfile.Name())
|
||||
t.Fatalf("failed to create new database: %v", err)
|
||||
}
|
||||
return db, close
|
||||
}
|
||||
|
||||
func MustNotError(t *testing.T, err error) {
|
||||
|
|
@ -42,151 +32,159 @@ func MustNotError(t *testing.T, err error) {
|
|||
}
|
||||
|
||||
func TestKeyChanges(t *testing.T) {
|
||||
db, clean := MustCreateDatabase(t)
|
||||
defer clean()
|
||||
_, err := db.StoreKeyChange(ctx, "@alice:localhost")
|
||||
MustNotError(t, err)
|
||||
deviceChangeIDB, err := db.StoreKeyChange(ctx, "@bob:localhost")
|
||||
MustNotError(t, err)
|
||||
deviceChangeIDC, err := db.StoreKeyChange(ctx, "@charlie:localhost")
|
||||
MustNotError(t, err)
|
||||
userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, types.OffsetNewest)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to KeyChanges: %s", err)
|
||||
}
|
||||
if latest != deviceChangeIDC {
|
||||
t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeIDC)
|
||||
}
|
||||
if !reflect.DeepEqual(userIDs, []string{"@charlie:localhost"}) {
|
||||
t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
|
||||
}
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
db, clean := MustCreateDatabase(t, dbType)
|
||||
defer clean()
|
||||
_, err := db.StoreKeyChange(ctx, "@alice:localhost")
|
||||
MustNotError(t, err)
|
||||
deviceChangeIDB, err := db.StoreKeyChange(ctx, "@bob:localhost")
|
||||
MustNotError(t, err)
|
||||
deviceChangeIDC, err := db.StoreKeyChange(ctx, "@charlie:localhost")
|
||||
MustNotError(t, err)
|
||||
userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, types.OffsetNewest)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to KeyChanges: %s", err)
|
||||
}
|
||||
if latest != deviceChangeIDC {
|
||||
t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeIDC)
|
||||
}
|
||||
if !reflect.DeepEqual(userIDs, []string{"@charlie:localhost"}) {
|
||||
t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestKeyChangesNoDupes(t *testing.T) {
|
||||
db, clean := MustCreateDatabase(t)
|
||||
defer clean()
|
||||
deviceChangeIDA, err := db.StoreKeyChange(ctx, "@alice:localhost")
|
||||
MustNotError(t, err)
|
||||
deviceChangeIDB, err := db.StoreKeyChange(ctx, "@alice:localhost")
|
||||
MustNotError(t, err)
|
||||
if deviceChangeIDA == deviceChangeIDB {
|
||||
t.Fatalf("Expected change ID to be different even when inserting key change for the same user, got %d for both changes", deviceChangeIDA)
|
||||
}
|
||||
deviceChangeID, err := db.StoreKeyChange(ctx, "@alice:localhost")
|
||||
MustNotError(t, err)
|
||||
userIDs, latest, err := db.KeyChanges(ctx, 0, types.OffsetNewest)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to KeyChanges: %s", err)
|
||||
}
|
||||
if latest != deviceChangeID {
|
||||
t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeID)
|
||||
}
|
||||
if !reflect.DeepEqual(userIDs, []string{"@alice:localhost"}) {
|
||||
t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
|
||||
}
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
db, clean := MustCreateDatabase(t, dbType)
|
||||
defer clean()
|
||||
deviceChangeIDA, err := db.StoreKeyChange(ctx, "@alice:localhost")
|
||||
MustNotError(t, err)
|
||||
deviceChangeIDB, err := db.StoreKeyChange(ctx, "@alice:localhost")
|
||||
MustNotError(t, err)
|
||||
if deviceChangeIDA == deviceChangeIDB {
|
||||
t.Fatalf("Expected change ID to be different even when inserting key change for the same user, got %d for both changes", deviceChangeIDA)
|
||||
}
|
||||
deviceChangeID, err := db.StoreKeyChange(ctx, "@alice:localhost")
|
||||
MustNotError(t, err)
|
||||
userIDs, latest, err := db.KeyChanges(ctx, 0, types.OffsetNewest)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to KeyChanges: %s", err)
|
||||
}
|
||||
if latest != deviceChangeID {
|
||||
t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeID)
|
||||
}
|
||||
if !reflect.DeepEqual(userIDs, []string{"@alice:localhost"}) {
|
||||
t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestKeyChangesUpperLimit(t *testing.T) {
|
||||
db, clean := MustCreateDatabase(t)
|
||||
defer clean()
|
||||
deviceChangeIDA, err := db.StoreKeyChange(ctx, "@alice:localhost")
|
||||
MustNotError(t, err)
|
||||
deviceChangeIDB, err := db.StoreKeyChange(ctx, "@bob:localhost")
|
||||
MustNotError(t, err)
|
||||
_, err = db.StoreKeyChange(ctx, "@charlie:localhost")
|
||||
MustNotError(t, err)
|
||||
userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDA, deviceChangeIDB)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to KeyChanges: %s", err)
|
||||
}
|
||||
if latest != deviceChangeIDB {
|
||||
t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeIDB)
|
||||
}
|
||||
if !reflect.DeepEqual(userIDs, []string{"@bob:localhost"}) {
|
||||
t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
|
||||
}
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
db, clean := MustCreateDatabase(t, dbType)
|
||||
defer clean()
|
||||
deviceChangeIDA, err := db.StoreKeyChange(ctx, "@alice:localhost")
|
||||
MustNotError(t, err)
|
||||
deviceChangeIDB, err := db.StoreKeyChange(ctx, "@bob:localhost")
|
||||
MustNotError(t, err)
|
||||
_, err = db.StoreKeyChange(ctx, "@charlie:localhost")
|
||||
MustNotError(t, err)
|
||||
userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDA, deviceChangeIDB)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to KeyChanges: %s", err)
|
||||
}
|
||||
if latest != deviceChangeIDB {
|
||||
t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeIDB)
|
||||
}
|
||||
if !reflect.DeepEqual(userIDs, []string{"@bob:localhost"}) {
|
||||
t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// The purpose of this test is to make sure that the storage layer is generating sequential stream IDs per user,
|
||||
// and that they are returned correctly when querying for device keys.
|
||||
func TestDeviceKeysStreamIDGeneration(t *testing.T) {
|
||||
var err error
|
||||
db, clean := MustCreateDatabase(t)
|
||||
defer clean()
|
||||
alice := "@alice:TestDeviceKeysStreamIDGeneration"
|
||||
bob := "@bob:TestDeviceKeysStreamIDGeneration"
|
||||
msgs := []api.DeviceMessage{
|
||||
{
|
||||
Type: api.TypeDeviceKeyUpdate,
|
||||
DeviceKeys: &api.DeviceKeys{
|
||||
DeviceID: "AAA",
|
||||
UserID: alice,
|
||||
KeyJSON: []byte(`{"key":"v1"}`),
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
db, clean := MustCreateDatabase(t, dbType)
|
||||
defer clean()
|
||||
alice := "@alice:TestDeviceKeysStreamIDGeneration"
|
||||
bob := "@bob:TestDeviceKeysStreamIDGeneration"
|
||||
msgs := []api.DeviceMessage{
|
||||
{
|
||||
Type: api.TypeDeviceKeyUpdate,
|
||||
DeviceKeys: &api.DeviceKeys{
|
||||
DeviceID: "AAA",
|
||||
UserID: alice,
|
||||
KeyJSON: []byte(`{"key":"v1"}`),
|
||||
},
|
||||
// StreamID: 1
|
||||
},
|
||||
// StreamID: 1
|
||||
},
|
||||
{
|
||||
Type: api.TypeDeviceKeyUpdate,
|
||||
DeviceKeys: &api.DeviceKeys{
|
||||
DeviceID: "AAA",
|
||||
UserID: bob,
|
||||
KeyJSON: []byte(`{"key":"v1"}`),
|
||||
{
|
||||
Type: api.TypeDeviceKeyUpdate,
|
||||
DeviceKeys: &api.DeviceKeys{
|
||||
DeviceID: "AAA",
|
||||
UserID: bob,
|
||||
KeyJSON: []byte(`{"key":"v1"}`),
|
||||
},
|
||||
// StreamID: 1 as this is a different user
|
||||
},
|
||||
// StreamID: 1 as this is a different user
|
||||
},
|
||||
{
|
||||
Type: api.TypeDeviceKeyUpdate,
|
||||
DeviceKeys: &api.DeviceKeys{
|
||||
DeviceID: "another_device",
|
||||
UserID: alice,
|
||||
KeyJSON: []byte(`{"key":"v1"}`),
|
||||
{
|
||||
Type: api.TypeDeviceKeyUpdate,
|
||||
DeviceKeys: &api.DeviceKeys{
|
||||
DeviceID: "another_device",
|
||||
UserID: alice,
|
||||
KeyJSON: []byte(`{"key":"v1"}`),
|
||||
},
|
||||
// StreamID: 2 as this is a 2nd device key
|
||||
},
|
||||
// StreamID: 2 as this is a 2nd device key
|
||||
},
|
||||
}
|
||||
MustNotError(t, db.StoreLocalDeviceKeys(ctx, msgs))
|
||||
if msgs[0].StreamID != 1 {
|
||||
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=1 but got %d", msgs[0].StreamID)
|
||||
}
|
||||
if msgs[1].StreamID != 1 {
|
||||
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=1 (different user) but got %d", msgs[1].StreamID)
|
||||
}
|
||||
if msgs[2].StreamID != 2 {
|
||||
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=2 (another device) but got %d", msgs[2].StreamID)
|
||||
}
|
||||
|
||||
// updating a device sets the next stream ID for that user
|
||||
msgs = []api.DeviceMessage{
|
||||
{
|
||||
Type: api.TypeDeviceKeyUpdate,
|
||||
DeviceKeys: &api.DeviceKeys{
|
||||
DeviceID: "AAA",
|
||||
UserID: alice,
|
||||
KeyJSON: []byte(`{"key":"v2"}`),
|
||||
},
|
||||
// StreamID: 3
|
||||
},
|
||||
}
|
||||
MustNotError(t, db.StoreLocalDeviceKeys(ctx, msgs))
|
||||
if msgs[0].StreamID != 3 {
|
||||
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID)
|
||||
}
|
||||
|
||||
// Querying for device keys returns the latest stream IDs
|
||||
msgs, err = db.DeviceKeysForUser(ctx, alice, []string{"AAA", "another_device"}, false)
|
||||
if err != nil {
|
||||
t.Fatalf("DeviceKeysForUser returned error: %s", err)
|
||||
}
|
||||
wantStreamIDs := map[string]int64{
|
||||
"AAA": 3,
|
||||
"another_device": 2,
|
||||
}
|
||||
if len(msgs) != len(wantStreamIDs) {
|
||||
t.Fatalf("DeviceKeysForUser: wrong number of devices, got %d want %d", len(msgs), len(wantStreamIDs))
|
||||
}
|
||||
for _, m := range msgs {
|
||||
if m.StreamID != wantStreamIDs[m.DeviceID] {
|
||||
t.Errorf("DeviceKeysForUser: wrong returned stream ID for key, got %d want %d", m.StreamID, wantStreamIDs[m.DeviceID])
|
||||
}
|
||||
}
|
||||
MustNotError(t, db.StoreLocalDeviceKeys(ctx, msgs))
|
||||
if msgs[0].StreamID != 1 {
|
||||
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=1 but got %d", msgs[0].StreamID)
|
||||
}
|
||||
if msgs[1].StreamID != 1 {
|
||||
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=1 (different user) but got %d", msgs[1].StreamID)
|
||||
}
|
||||
if msgs[2].StreamID != 2 {
|
||||
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=2 (another device) but got %d", msgs[2].StreamID)
|
||||
}
|
||||
|
||||
// updating a device sets the next stream ID for that user
|
||||
msgs = []api.DeviceMessage{
|
||||
{
|
||||
Type: api.TypeDeviceKeyUpdate,
|
||||
DeviceKeys: &api.DeviceKeys{
|
||||
DeviceID: "AAA",
|
||||
UserID: alice,
|
||||
KeyJSON: []byte(`{"key":"v2"}`),
|
||||
},
|
||||
// StreamID: 3
|
||||
},
|
||||
}
|
||||
MustNotError(t, db.StoreLocalDeviceKeys(ctx, msgs))
|
||||
if msgs[0].StreamID != 3 {
|
||||
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID)
|
||||
}
|
||||
|
||||
// Querying for device keys returns the latest stream IDs
|
||||
msgs, err = db.DeviceKeysForUser(ctx, alice, []string{"AAA", "another_device"}, false)
|
||||
if err != nil {
|
||||
t.Fatalf("DeviceKeysForUser returned error: %s", err)
|
||||
}
|
||||
wantStreamIDs := map[string]int64{
|
||||
"AAA": 3,
|
||||
"another_device": 2,
|
||||
}
|
||||
if len(msgs) != len(wantStreamIDs) {
|
||||
t.Fatalf("DeviceKeysForUser: wrong number of devices, got %d want %d", len(msgs), len(wantStreamIDs))
|
||||
}
|
||||
for _, m := range msgs {
|
||||
if m.StreamID != wantStreamIDs[m.DeviceID] {
|
||||
t.Errorf("DeviceKeysForUser: wrong returned stream ID for key, got %d want %d", m.StreamID, wantStreamIDs[m.DeviceID])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -140,11 +140,8 @@ type ClientRoomserverAPI interface {
|
|||
|
||||
// PerformRoomUpgrade upgrades a room to a newer version
|
||||
PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse)
|
||||
PerformAdminEvacuateRoom(
|
||||
ctx context.Context,
|
||||
req *PerformAdminEvacuateRoomRequest,
|
||||
res *PerformAdminEvacuateRoomResponse,
|
||||
)
|
||||
PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse)
|
||||
PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse)
|
||||
PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse)
|
||||
PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse)
|
||||
PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error
|
||||
|
|
@ -161,6 +158,7 @@ type UserRoomserverAPI interface {
|
|||
QueryLatestEventsAndStateAPI
|
||||
QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error
|
||||
QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error
|
||||
PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse)
|
||||
}
|
||||
|
||||
type FederationRoomserverAPI interface {
|
||||
|
|
|
|||
|
|
@ -113,6 +113,15 @@ func (t *RoomserverInternalAPITrace) PerformAdminEvacuateRoom(
|
|||
util.GetLogger(ctx).Infof("PerformAdminEvacuateRoom req=%+v res=%+v", js(req), js(res))
|
||||
}
|
||||
|
||||
func (t *RoomserverInternalAPITrace) PerformAdminEvacuateUser(
|
||||
ctx context.Context,
|
||||
req *PerformAdminEvacuateUserRequest,
|
||||
res *PerformAdminEvacuateUserResponse,
|
||||
) {
|
||||
t.Impl.PerformAdminEvacuateUser(ctx, req, res)
|
||||
util.GetLogger(ctx).Infof("PerformAdminEvacuateUser req=%+v res=%+v", js(req), js(res))
|
||||
}
|
||||
|
||||
func (t *RoomserverInternalAPITrace) PerformInboundPeek(
|
||||
ctx context.Context,
|
||||
req *PerformInboundPeekRequest,
|
||||
|
|
|
|||
|
|
@ -223,3 +223,12 @@ type PerformAdminEvacuateRoomResponse struct {
|
|||
Affected []string `json:"affected"`
|
||||
Error *PerformError
|
||||
}
|
||||
|
||||
type PerformAdminEvacuateUserRequest struct {
|
||||
UserID string `json:"user_id"`
|
||||
}
|
||||
|
||||
type PerformAdminEvacuateUserResponse struct {
|
||||
Affected []string `json:"affected"`
|
||||
Error *PerformError
|
||||
}
|
||||
|
|
|
|||
|
|
@ -216,11 +216,10 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
|
|||
return err
|
||||
}
|
||||
|
||||
err = api.SendEvents(ctx, r.RSAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
|
||||
err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,8 +12,11 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/internal/input"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/perform"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
||||
"github.com/matrix-org/dendrite/roomserver/producers"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -37,6 +40,7 @@ type RoomserverInternalAPI struct {
|
|||
*perform.Upgrader
|
||||
*perform.Admin
|
||||
ProcessContext *process.ProcessContext
|
||||
Base *base.BaseDendrite
|
||||
DB storage.Database
|
||||
Cfg *config.RoomServer
|
||||
Cache caching.RoomServerCaches
|
||||
|
|
@ -49,34 +53,43 @@ type RoomserverInternalAPI struct {
|
|||
JetStream nats.JetStreamContext
|
||||
Durable string
|
||||
InputRoomEventTopic string // JetStream topic for new input room events
|
||||
OutputRoomEventTopic string // JetStream topic for new output room events
|
||||
OutputProducer *producers.RoomEventProducer
|
||||
PerspectiveServerNames []gomatrixserverlib.ServerName
|
||||
}
|
||||
|
||||
func NewRoomserverAPI(
|
||||
processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database,
|
||||
consumer nats.JetStreamContext, nc *nats.Conn,
|
||||
inputRoomEventTopic, outputRoomEventTopic string,
|
||||
caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName,
|
||||
base *base.BaseDendrite, roomserverDB storage.Database,
|
||||
js nats.JetStreamContext, nc *nats.Conn,
|
||||
) *RoomserverInternalAPI {
|
||||
var perspectiveServerNames []gomatrixserverlib.ServerName
|
||||
for _, kp := range base.Cfg.FederationAPI.KeyPerspectives {
|
||||
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
|
||||
}
|
||||
|
||||
serverACLs := acls.NewServerACLs(roomserverDB)
|
||||
producer := &producers.RoomEventProducer{
|
||||
Topic: string(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)),
|
||||
JetStream: js,
|
||||
ACLs: serverACLs,
|
||||
}
|
||||
a := &RoomserverInternalAPI{
|
||||
ProcessContext: processCtx,
|
||||
ProcessContext: base.ProcessContext,
|
||||
DB: roomserverDB,
|
||||
Cfg: cfg,
|
||||
Cache: caches,
|
||||
ServerName: cfg.Matrix.ServerName,
|
||||
Base: base,
|
||||
Cfg: &base.Cfg.RoomServer,
|
||||
Cache: base.Caches,
|
||||
ServerName: base.Cfg.Global.ServerName,
|
||||
PerspectiveServerNames: perspectiveServerNames,
|
||||
InputRoomEventTopic: inputRoomEventTopic,
|
||||
OutputRoomEventTopic: outputRoomEventTopic,
|
||||
JetStream: consumer,
|
||||
InputRoomEventTopic: base.Cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent),
|
||||
OutputProducer: producer,
|
||||
JetStream: js,
|
||||
NATSClient: nc,
|
||||
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
|
||||
Durable: base.Cfg.Global.JetStream.Durable("RoomserverInputConsumer"),
|
||||
ServerACLs: serverACLs,
|
||||
Queryer: &query.Queryer{
|
||||
DB: roomserverDB,
|
||||
Cache: caches,
|
||||
ServerName: cfg.Matrix.ServerName,
|
||||
Cache: base.Caches,
|
||||
ServerName: base.Cfg.Global.ServerName,
|
||||
ServerACLs: serverACLs,
|
||||
},
|
||||
// perform-er structs get initialised when we have a federation sender to use
|
||||
|
|
@ -92,19 +105,20 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
|
|||
r.KeyRing = keyRing
|
||||
|
||||
r.Inputer = &input.Inputer{
|
||||
Cfg: r.Cfg,
|
||||
ProcessContext: r.ProcessContext,
|
||||
DB: r.DB,
|
||||
InputRoomEventTopic: r.InputRoomEventTopic,
|
||||
OutputRoomEventTopic: r.OutputRoomEventTopic,
|
||||
JetStream: r.JetStream,
|
||||
NATSClient: r.NATSClient,
|
||||
Durable: nats.Durable(r.Durable),
|
||||
ServerName: r.Cfg.Matrix.ServerName,
|
||||
FSAPI: fsAPI,
|
||||
KeyRing: keyRing,
|
||||
ACLs: r.ServerACLs,
|
||||
Queryer: r.Queryer,
|
||||
Cfg: &r.Base.Cfg.RoomServer,
|
||||
Base: r.Base,
|
||||
ProcessContext: r.Base.ProcessContext,
|
||||
DB: r.DB,
|
||||
InputRoomEventTopic: r.InputRoomEventTopic,
|
||||
OutputProducer: r.OutputProducer,
|
||||
JetStream: r.JetStream,
|
||||
NATSClient: r.NATSClient,
|
||||
Durable: nats.Durable(r.Durable),
|
||||
ServerName: r.Cfg.Matrix.ServerName,
|
||||
FSAPI: fsAPI,
|
||||
KeyRing: keyRing,
|
||||
ACLs: r.ServerACLs,
|
||||
Queryer: r.Queryer,
|
||||
}
|
||||
r.Inviter = &perform.Inviter{
|
||||
DB: r.DB,
|
||||
|
|
@ -170,6 +184,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
|
|||
Cfg: r.Cfg,
|
||||
Inputer: r.Inputer,
|
||||
Queryer: r.Queryer,
|
||||
Leaver: r.Leaver,
|
||||
}
|
||||
|
||||
if err := r.Inputer.Start(); err != nil {
|
||||
|
|
@ -198,7 +213,7 @@ func (r *RoomserverInternalAPI) PerformInvite(
|
|||
if len(outputEvents) == 0 {
|
||||
return nil
|
||||
}
|
||||
return r.WriteOutputEvents(req.Event.RoomID(), outputEvents)
|
||||
return r.OutputProducer.ProduceRoomEvents(req.Event.RoomID(), outputEvents)
|
||||
}
|
||||
|
||||
func (r *RoomserverInternalAPI) PerformLeave(
|
||||
|
|
@ -214,7 +229,7 @@ func (r *RoomserverInternalAPI) PerformLeave(
|
|||
if len(outputEvents) == 0 {
|
||||
return nil
|
||||
}
|
||||
return r.WriteOutputEvents(req.RoomID, outputEvents)
|
||||
return r.OutputProducer.ProduceRoomEvents(req.RoomID, outputEvents)
|
||||
}
|
||||
|
||||
func (r *RoomserverInternalAPI) PerformForget(
|
||||
|
|
|
|||
|
|
@ -29,7 +29,9 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/acls"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
||||
"github.com/matrix-org/dendrite/roomserver/producers"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
|
|
@ -37,16 +39,8 @@ import (
|
|||
"github.com/nats-io/nats.go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/sirupsen/logrus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
var keyContentFields = map[string]string{
|
||||
"m.room.join_rules": "join_rule",
|
||||
"m.room.history_visibility": "history_visibility",
|
||||
"m.room.member": "membership",
|
||||
}
|
||||
|
||||
// Inputer is responsible for consuming from the roomserver input
|
||||
// streams and processing the events. All input events are queued
|
||||
// into a single NATS stream and the order is preserved strictly.
|
||||
|
|
@ -75,19 +69,20 @@ var keyContentFields = map[string]string{
|
|||
// up, so they will do nothing until a new event comes in for B
|
||||
// or C.
|
||||
type Inputer struct {
|
||||
Cfg *config.RoomServer
|
||||
ProcessContext *process.ProcessContext
|
||||
DB storage.Database
|
||||
NATSClient *nats.Conn
|
||||
JetStream nats.JetStreamContext
|
||||
Durable nats.SubOpt
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
FSAPI fedapi.RoomserverFederationAPI
|
||||
KeyRing gomatrixserverlib.JSONVerifier
|
||||
ACLs *acls.ServerACLs
|
||||
InputRoomEventTopic string
|
||||
OutputRoomEventTopic string
|
||||
workers sync.Map // room ID -> *worker
|
||||
Cfg *config.RoomServer
|
||||
Base *base.BaseDendrite
|
||||
ProcessContext *process.ProcessContext
|
||||
DB storage.Database
|
||||
NATSClient *nats.Conn
|
||||
JetStream nats.JetStreamContext
|
||||
Durable nats.SubOpt
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
FSAPI fedapi.RoomserverFederationAPI
|
||||
KeyRing gomatrixserverlib.JSONVerifier
|
||||
ACLs *acls.ServerACLs
|
||||
InputRoomEventTopic string
|
||||
OutputProducer *producers.RoomEventProducer
|
||||
workers sync.Map // room ID -> *worker
|
||||
|
||||
Queryer *query.Queryer
|
||||
}
|
||||
|
|
@ -167,7 +162,9 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
|
|||
// will look to see if we have a worker for that room which has its
|
||||
// own consumer. If we don't, we'll start one.
|
||||
func (r *Inputer) Start() error {
|
||||
prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration)
|
||||
if r.Base.EnableMetrics {
|
||||
prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration)
|
||||
}
|
||||
_, err := r.JetStream.Subscribe(
|
||||
"", // This is blank because we specified it in BindStream.
|
||||
func(m *nats.Msg) {
|
||||
|
|
@ -370,58 +367,6 @@ func (r *Inputer) InputRoomEvents(
|
|||
}
|
||||
}
|
||||
|
||||
// WriteOutputEvents implements OutputRoomEventWriter
|
||||
func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
|
||||
var err error
|
||||
for _, update := range updates {
|
||||
msg := &nats.Msg{
|
||||
Subject: r.OutputRoomEventTopic,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
msg.Header.Set(jetstream.RoomID, roomID)
|
||||
msg.Data, err = json.Marshal(update)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logger := log.WithFields(log.Fields{
|
||||
"room_id": roomID,
|
||||
"type": update.Type,
|
||||
})
|
||||
if update.NewRoomEvent != nil {
|
||||
eventType := update.NewRoomEvent.Event.Type()
|
||||
logger = logger.WithFields(log.Fields{
|
||||
"event_type": eventType,
|
||||
"event_id": update.NewRoomEvent.Event.EventID(),
|
||||
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
|
||||
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
|
||||
"send_as_server": update.NewRoomEvent.SendAsServer,
|
||||
"sender": update.NewRoomEvent.Event.Sender(),
|
||||
})
|
||||
if update.NewRoomEvent.Event.StateKey() != nil {
|
||||
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
|
||||
}
|
||||
contentKey := keyContentFields[eventType]
|
||||
if contentKey != "" {
|
||||
value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
|
||||
if value.Exists() {
|
||||
logger = logger.WithField("content_value", value.String())
|
||||
}
|
||||
}
|
||||
|
||||
if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
|
||||
ev := update.NewRoomEvent.Event.Unwrap()
|
||||
defer r.ACLs.OnServerACLUpdate(ev)
|
||||
}
|
||||
}
|
||||
logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic)
|
||||
if _, err := r.JetStream.PublishMsg(msg); err != nil {
|
||||
logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var roomserverInputBackpressure = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "dendrite",
|
||||
|
|
|
|||
|
|
@ -381,7 +381,7 @@ func (r *Inputer) processRoomEvent(
|
|||
return fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||
}
|
||||
case api.KindOld:
|
||||
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
|
||||
err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
|
||||
{
|
||||
Type: api.OutputTypeOldRoomEvent,
|
||||
OldRoomEvent: &api.OutputOldRoomEvent{
|
||||
|
|
@ -400,7 +400,7 @@ func (r *Inputer) processRoomEvent(
|
|||
// so notify downstream components to redact this event - they should have it if they've
|
||||
// been tracking our output log.
|
||||
if redactedEventID != "" {
|
||||
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
|
||||
err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
|
||||
{
|
||||
Type: api.OutputTypeRedactedEvent,
|
||||
RedactedEvent: &api.OutputRedactedEvent{
|
||||
|
|
|
|||
|
|
@ -192,7 +192,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
|||
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
||||
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
||||
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
||||
if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
|
||||
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID(), updates); err != nil {
|
||||
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package perform
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
|
@ -34,6 +35,7 @@ type Admin struct {
|
|||
Cfg *config.RoomServer
|
||||
Queryer *query.Queryer
|
||||
Inputer *input.Inputer
|
||||
Leaver *Leaver
|
||||
}
|
||||
|
||||
// PerformEvacuateRoom will remove all local users from the given room.
|
||||
|
|
@ -160,3 +162,71 @@ func (r *Admin) PerformAdminEvacuateRoom(
|
|||
inputRes := &api.InputRoomEventsResponse{}
|
||||
r.Inputer.InputRoomEvents(ctx, inputReq, inputRes)
|
||||
}
|
||||
|
||||
func (r *Admin) PerformAdminEvacuateUser(
|
||||
ctx context.Context,
|
||||
req *api.PerformAdminEvacuateUserRequest,
|
||||
res *api.PerformAdminEvacuateUserResponse,
|
||||
) {
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||
if err != nil {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("Malformed user ID: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
if domain != r.Cfg.Matrix.ServerName {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: "Can only evacuate local users using this endpoint",
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
roomIDs, err := r.DB.GetRoomsByMembership(ctx, req.UserID, gomatrixserverlib.Join)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("r.DB.GetRoomsByMembership: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
inviteRoomIDs, err := r.DB.GetRoomsByMembership(ctx, req.UserID, gomatrixserverlib.Invite)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("r.DB.GetRoomsByMembership: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for _, roomID := range append(roomIDs, inviteRoomIDs...) {
|
||||
leaveReq := &api.PerformLeaveRequest{
|
||||
RoomID: roomID,
|
||||
UserID: req.UserID,
|
||||
}
|
||||
leaveRes := &api.PerformLeaveResponse{}
|
||||
outputEvents, err := r.Leaver.PerformLeave(ctx, leaveReq, leaveRes)
|
||||
if err != nil {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("r.Leaver.PerformLeave: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(outputEvents) == 0 {
|
||||
continue
|
||||
}
|
||||
if err := r.Inputer.OutputProducer.ProduceRoomEvents(roomID, outputEvents); err != nil {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
res.Affected = append(res.Affected, roomID)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
|
@ -206,8 +207,17 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom
|
|||
}
|
||||
logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result)
|
||||
for _, res := range result {
|
||||
if res.Error != nil {
|
||||
logger.WithError(res.Error).Warn("event failed PDU checks")
|
||||
switch err := res.Error.(type) {
|
||||
case nil:
|
||||
case gomatrixserverlib.SignatureErr:
|
||||
// The signature of the event might not be valid anymore, for example if
|
||||
// the key ID was reused with a different signature.
|
||||
logger.WithError(err).Errorf("event failed PDU checks, storing anyway")
|
||||
case gomatrixserverlib.AuthChainErr, gomatrixserverlib.AuthRulesErr:
|
||||
logger.WithError(err).Warn("event failed PDU checks")
|
||||
continue
|
||||
default:
|
||||
logger.WithError(err).Warn("event failed PDU checks")
|
||||
continue
|
||||
}
|
||||
missingMap[id] = res.Event
|
||||
|
|
@ -306,6 +316,7 @@ FederationHit:
|
|||
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res
|
||||
return res, nil
|
||||
}
|
||||
sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit
|
||||
return nil, lastErr
|
||||
}
|
||||
|
||||
|
|
@ -366,19 +377,25 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr
|
|||
}
|
||||
}
|
||||
|
||||
c := gomatrixserverlib.FederatedStateProvider{
|
||||
FedClient: b.fsAPI,
|
||||
RememberAuthEvents: false,
|
||||
Server: b.servers[0],
|
||||
var lastErr error
|
||||
for _, srv := range b.servers {
|
||||
c := gomatrixserverlib.FederatedStateProvider{
|
||||
FedClient: b.fsAPI,
|
||||
RememberAuthEvents: false,
|
||||
Server: srv,
|
||||
}
|
||||
result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
for eventID, ev := range result {
|
||||
b.eventIDMap[eventID] = ev
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for eventID, ev := range result {
|
||||
b.eventIDMap[eventID] = ev
|
||||
}
|
||||
return result, nil
|
||||
sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit
|
||||
return nil, lastErr
|
||||
}
|
||||
|
||||
// ServersAtEvent is called when trying to determine which server to request from.
|
||||
|
|
|
|||
|
|
@ -113,7 +113,7 @@ func (r *InboundPeeker) PerformInboundPeek(
|
|||
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
|
||||
}
|
||||
|
||||
err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{
|
||||
err = r.Inputer.OutputProducer.ProduceRoomEvents(request.RoomID, []api.OutputEvent{
|
||||
{
|
||||
Type: api.OutputTypeNewInboundPeek,
|
||||
NewInboundPeek: &api.OutputNewInboundPeek{
|
||||
|
|
|
|||
|
|
@ -56,7 +56,14 @@ func (r *Inviter) PerformInvite(
|
|||
return nil, fmt.Errorf("failed to load RoomInfo: %w", err)
|
||||
}
|
||||
|
||||
_, domain, _ := gomatrixserverlib.SplitID('@', targetUserID)
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', targetUserID)
|
||||
if err != nil {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("The user ID %q is invalid!", targetUserID),
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
isTargetLocal := domain == r.Cfg.Matrix.ServerName
|
||||
isOriginLocal := event.Origin() == r.Cfg.Matrix.ServerName
|
||||
|
||||
|
|
|
|||
|
|
@ -207,7 +207,7 @@ func (r *Peeker) performPeekRoomByID(
|
|||
|
||||
// TODO: handle federated peeks
|
||||
|
||||
err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{
|
||||
err = r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{
|
||||
{
|
||||
Type: api.OutputTypeNewPeek,
|
||||
NewPeek: &api.OutputNewPeek{
|
||||
|
|
|
|||
|
|
@ -96,7 +96,7 @@ func (r *Unpeeker) performUnpeekRoomByID(
|
|||
|
||||
// TODO: handle federated peeks
|
||||
|
||||
err = r.Inputer.WriteOutputEvents(req.RoomID, []api.OutputEvent{
|
||||
err = r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{
|
||||
{
|
||||
Type: api.OutputTypeRetirePeek,
|
||||
RetirePeek: &api.OutputRetirePeek{
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ const (
|
|||
RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
|
||||
RoomserverPerformForgetPath = "/roomserver/performForget"
|
||||
RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom"
|
||||
RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser"
|
||||
|
||||
// Query operations
|
||||
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
|
||||
|
|
@ -305,6 +306,23 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateRoom(
|
|||
}
|
||||
}
|
||||
|
||||
func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser(
|
||||
ctx context.Context,
|
||||
req *api.PerformAdminEvacuateUserRequest,
|
||||
res *api.PerformAdminEvacuateUserResponse,
|
||||
) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformAdminEvacuateUser")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.roomserverURL + RoomserverPerformAdminEvacuateUserPath
|
||||
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
||||
if err != nil {
|
||||
res.Error = &api.PerformError{
|
||||
Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// QueryLatestEventsAndState implements RoomserverQueryAPI
|
||||
func (h *httpRoomserverInternalAPI) QueryLatestEventsAndState(
|
||||
ctx context.Context,
|
||||
|
|
|
|||
|
|
@ -129,6 +129,17 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
|
|||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(RoomserverPerformAdminEvacuateUserPath,
|
||||
httputil.MakeInternalAPI("performAdminEvacuateUser", func(req *http.Request) util.JSONResponse {
|
||||
var request api.PerformAdminEvacuateUserRequest
|
||||
var response api.PerformAdminEvacuateUserResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
r.PerformAdminEvacuateUser(req.Context(), &request, &response)
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(
|
||||
RoomserverQueryPublishedRoomsPath,
|
||||
httputil.MakeInternalAPI("queryPublishedRooms", func(req *http.Request) util.JSONResponse {
|
||||
|
|
|
|||
89
roomserver/producers/roomevent.go
Normal file
89
roomserver/producers/roomevent.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package producers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/acls"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
var keyContentFields = map[string]string{
|
||||
"m.room.join_rules": "join_rule",
|
||||
"m.room.history_visibility": "history_visibility",
|
||||
"m.room.member": "membership",
|
||||
}
|
||||
|
||||
type RoomEventProducer struct {
|
||||
Topic string
|
||||
ACLs *acls.ServerACLs
|
||||
JetStream nats.JetStreamContext
|
||||
}
|
||||
|
||||
func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error {
|
||||
var err error
|
||||
for _, update := range updates {
|
||||
msg := &nats.Msg{
|
||||
Subject: r.Topic,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
msg.Header.Set(jetstream.RoomID, roomID)
|
||||
msg.Data, err = json.Marshal(update)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logger := log.WithFields(log.Fields{
|
||||
"room_id": roomID,
|
||||
"type": update.Type,
|
||||
})
|
||||
if update.NewRoomEvent != nil {
|
||||
eventType := update.NewRoomEvent.Event.Type()
|
||||
logger = logger.WithFields(log.Fields{
|
||||
"event_type": eventType,
|
||||
"event_id": update.NewRoomEvent.Event.EventID(),
|
||||
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
|
||||
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
|
||||
"send_as_server": update.NewRoomEvent.SendAsServer,
|
||||
"sender": update.NewRoomEvent.Event.Sender(),
|
||||
})
|
||||
if update.NewRoomEvent.Event.StateKey() != nil {
|
||||
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
|
||||
}
|
||||
contentKey := keyContentFields[eventType]
|
||||
if contentKey != "" {
|
||||
value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
|
||||
if value.Exists() {
|
||||
logger = logger.WithField("content_value", value.String())
|
||||
}
|
||||
}
|
||||
|
||||
if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
|
||||
ev := update.NewRoomEvent.Event.Unwrap()
|
||||
defer r.ACLs.OnServerACLUpdate(ev)
|
||||
}
|
||||
}
|
||||
logger.Tracef("Producing to topic '%s'", r.Topic)
|
||||
if _, err := r.JetStream.PublishMsg(msg); err != nil {
|
||||
logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.Topic, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -17,13 +17,10 @@ package roomserver
|
|||
import (
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/inthttp"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/internal"
|
||||
"github.com/matrix-org/dendrite/roomserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
|
@ -40,11 +37,6 @@ func NewInternalAPI(
|
|||
) api.RoomserverInternalAPI {
|
||||
cfg := &base.Cfg.RoomServer
|
||||
|
||||
var perspectiveServerNames []gomatrixserverlib.ServerName
|
||||
for _, kp := range base.Cfg.FederationAPI.KeyPerspectives {
|
||||
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
|
||||
}
|
||||
|
||||
roomserverDB, err := storage.Open(base, &cfg.Database, base.Caches)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panicf("failed to connect to room server db")
|
||||
|
|
@ -53,9 +45,6 @@ func NewInternalAPI(
|
|||
js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
|
||||
return internal.NewRoomserverAPI(
|
||||
base.ProcessContext, cfg, roomserverDB, js, nc,
|
||||
cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
|
||||
cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
|
||||
base.Caches, perspectiveServerNames,
|
||||
base, roomserverDB, js, nc,
|
||||
)
|
||||
}
|
||||
|
|
|
|||
69
roomserver/roomserver_test.go
Normal file
69
roomserver/roomserver_test.go
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
package roomserver_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/matrix-org/dendrite/test/testrig"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
func mustCreateDatabase(t *testing.T, dbType test.DBType) (*base.BaseDendrite, storage.Database, func()) {
|
||||
base, close := testrig.CreateBaseDendrite(t, dbType)
|
||||
db, err := storage.Open(base, &base.Cfg.KeyServer.Database, base.Caches)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create Database: %v", err)
|
||||
}
|
||||
return base, db, close
|
||||
}
|
||||
|
||||
func Test_SharedUsers(t *testing.T) {
|
||||
alice := test.NewUser(t)
|
||||
bob := test.NewUser(t)
|
||||
room := test.NewRoom(t, alice, test.RoomPreset(test.PresetTrustedPrivateChat))
|
||||
|
||||
// Invite and join Bob
|
||||
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
|
||||
"membership": "invite",
|
||||
}, test.WithStateKey(bob.ID))
|
||||
room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{
|
||||
"membership": "join",
|
||||
}, test.WithStateKey(bob.ID))
|
||||
|
||||
ctx := context.Background()
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
base, _, close := mustCreateDatabase(t, dbType)
|
||||
defer close()
|
||||
|
||||
rsAPI := roomserver.NewInternalAPI(base)
|
||||
// SetFederationAPI starts the room event input consumer
|
||||
rsAPI.SetFederationAPI(nil, nil)
|
||||
// Create the room
|
||||
if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", nil, false); err != nil {
|
||||
t.Fatalf("failed to send events: %v", err)
|
||||
}
|
||||
|
||||
// Query the shared users for Alice, there should only be Bob.
|
||||
// This is used by the SyncAPI keychange consumer.
|
||||
res := &api.QuerySharedUsersResponse{}
|
||||
if err := rsAPI.QuerySharedUsers(ctx, &api.QuerySharedUsersRequest{UserID: alice.ID}, res); err != nil {
|
||||
t.Fatalf("unable to query known users: %v", err)
|
||||
}
|
||||
if _, ok := res.UserIDsToCount[bob.ID]; !ok {
|
||||
t.Fatalf("expected to find %s in shared users, but didn't: %+v", bob.ID, res.UserIDsToCount)
|
||||
}
|
||||
// Also verify that we get the expected result when specifying OtherUserIDs.
|
||||
// This is used by the SyncAPI when getting device list changes.
|
||||
if err := rsAPI.QuerySharedUsers(ctx, &api.QuerySharedUsersRequest{UserID: alice.ID, OtherUserIDs: []string{bob.ID}}, res); err != nil {
|
||||
t.Fatalf("unable to query known users: %v", err)
|
||||
}
|
||||
if _, ok := res.UserIDsToCount[bob.ID]; !ok {
|
||||
t.Fatalf("expected to find %s in shared users, but didn't: %+v", bob.ID, res.UserIDsToCount)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -110,7 +110,7 @@ func (v *StateResolution) LoadStateAtEvent(
|
|||
|
||||
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %s", eventID, err)
|
||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err)
|
||||
}
|
||||
if snapshotNID == 0 {
|
||||
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID)
|
||||
|
|
|
|||
|
|
@ -66,12 +66,18 @@ CREATE TABLE IF NOT EXISTS roomserver_membership (
|
|||
);
|
||||
`
|
||||
|
||||
var selectJoinedUsersSetForRoomsSQL = "" +
|
||||
var selectJoinedUsersSetForRoomsAndUserSQL = "" +
|
||||
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
||||
" WHERE room_nid = ANY($1) AND target_nid = ANY($2) AND" +
|
||||
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" +
|
||||
" GROUP BY target_nid"
|
||||
|
||||
var selectJoinedUsersSetForRoomsSQL = "" +
|
||||
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
||||
" WHERE room_nid = ANY($1) AND" +
|
||||
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" +
|
||||
" GROUP BY target_nid"
|
||||
|
||||
// Insert a row in to membership table so that it can be locked by the
|
||||
// SELECT FOR UPDATE
|
||||
const insertMembershipSQL = "" +
|
||||
|
|
@ -154,6 +160,7 @@ type membershipStatements struct {
|
|||
selectLocalMembershipsFromRoomStmt *sql.Stmt
|
||||
updateMembershipStmt *sql.Stmt
|
||||
selectRoomsWithMembershipStmt *sql.Stmt
|
||||
selectJoinedUsersSetForRoomsAndUserStmt *sql.Stmt
|
||||
selectJoinedUsersSetForRoomsStmt *sql.Stmt
|
||||
selectKnownUsersStmt *sql.Stmt
|
||||
updateMembershipForgetRoomStmt *sql.Stmt
|
||||
|
|
@ -187,6 +194,7 @@ func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) {
|
|||
{&s.selectLocalMembershipsFromRoomStmt, selectLocalMembershipsFromRoomSQL},
|
||||
{&s.updateMembershipStmt, updateMembershipSQL},
|
||||
{&s.selectRoomsWithMembershipStmt, selectRoomsWithMembershipSQL},
|
||||
{&s.selectJoinedUsersSetForRoomsAndUserStmt, selectJoinedUsersSetForRoomsAndUserSQL},
|
||||
{&s.selectJoinedUsersSetForRoomsStmt, selectJoinedUsersSetForRoomsSQL},
|
||||
{&s.selectKnownUsersStmt, selectKnownUsersSQL},
|
||||
{&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom},
|
||||
|
|
@ -322,8 +330,18 @@ func (s *membershipStatements) SelectJoinedUsersSetForRooms(
|
|||
roomNIDs []types.RoomNID,
|
||||
userNIDs []types.EventStateKeyNID,
|
||||
) (map[types.EventStateKeyNID]int, error) {
|
||||
var (
|
||||
rows *sql.Rows
|
||||
err error
|
||||
)
|
||||
stmt := sqlutil.TxStmt(txn, s.selectJoinedUsersSetForRoomsStmt)
|
||||
rows, err := stmt.QueryContext(ctx, pq.Array(roomNIDs), pq.Array(userNIDs))
|
||||
if len(userNIDs) > 0 {
|
||||
stmt = sqlutil.TxStmt(txn, s.selectJoinedUsersSetForRoomsAndUserStmt)
|
||||
rows, err = stmt.QueryContext(ctx, pq.Array(roomNIDs), pq.Array(userNIDs))
|
||||
} else {
|
||||
rows, err = stmt.QueryContext(ctx, pq.Array(roomNIDs))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -263,6 +263,12 @@ func (d *Database) snapshotNIDFromEventID(
|
|||
ctx context.Context, txn *sql.Tx, eventID string,
|
||||
) (types.StateSnapshotNID, error) {
|
||||
_, stateNID, err := d.EventsTable.SelectEvent(ctx, txn, eventID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if stateNID == 0 {
|
||||
return 0, sql.ErrNoRows // effectively there's no state entry
|
||||
}
|
||||
return stateNID, err
|
||||
}
|
||||
|
||||
|
|
@ -828,6 +834,9 @@ func (d *Database) handleRedactions(
|
|||
if err != nil {
|
||||
return nil, "", fmt.Errorf("d.GetStateEvent: %w", err)
|
||||
}
|
||||
if powerLevels == nil {
|
||||
return nil, "", fmt.Errorf("unable to fetch m.room.power_levels event from database for room %s", event.RoomID())
|
||||
}
|
||||
pl, err := powerLevels.PowerLevels()
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("unable to get powerlevels for room: %w", err)
|
||||
|
|
@ -1214,6 +1223,13 @@ func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs [
|
|||
stateKeyNIDs[i] = nid
|
||||
i++
|
||||
}
|
||||
// If we didn't have any userIDs to look up, get the UserIDs for the returned userNIDToCount now
|
||||
if len(userIDs) == 0 {
|
||||
nidToUserID, err = d.EventStateKeys(ctx, stateKeyNIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
result := make(map[string]int, len(userNIDToCount))
|
||||
for nid, count := range userNIDToCount {
|
||||
result[nidToUserID[nid]] = count
|
||||
|
|
|
|||
|
|
@ -42,12 +42,18 @@ const membershipSchema = `
|
|||
);
|
||||
`
|
||||
|
||||
var selectJoinedUsersSetForRoomsSQL = "" +
|
||||
var selectJoinedUsersSetForRoomsAndUserSQL = "" +
|
||||
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
||||
" WHERE room_nid IN ($1) AND target_nid IN ($2) AND" +
|
||||
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" +
|
||||
" GROUP BY target_nid"
|
||||
|
||||
var selectJoinedUsersSetForRoomsSQL = "" +
|
||||
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" +
|
||||
" WHERE room_nid IN ($1) AND " +
|
||||
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" +
|
||||
" GROUP BY target_nid"
|
||||
|
||||
// Insert a row in to membership table so that it can be locked by the
|
||||
// SELECT FOR UPDATE
|
||||
const insertMembershipSQL = "" +
|
||||
|
|
@ -302,8 +308,12 @@ func (s *membershipStatements) SelectJoinedUsersSetForRooms(ctx context.Context,
|
|||
for _, v := range userNIDs {
|
||||
params = append(params, v)
|
||||
}
|
||||
|
||||
query := strings.Replace(selectJoinedUsersSetForRoomsSQL, "($1)", sqlutil.QueryVariadic(len(roomNIDs)), 1)
|
||||
query = strings.Replace(query, "($2)", sqlutil.QueryVariadicOffset(len(userNIDs), len(roomNIDs)), 1)
|
||||
if len(userNIDs) > 0 {
|
||||
query = strings.Replace(selectJoinedUsersSetForRoomsAndUserSQL, "($1)", sqlutil.QueryVariadic(len(roomNIDs)), 1)
|
||||
query = strings.Replace(query, "($2)", sqlutil.QueryVariadicOffset(len(userNIDs), len(roomNIDs)), 1)
|
||||
}
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
if txn != nil {
|
||||
|
|
|
|||
|
|
@ -187,7 +187,7 @@ func loadAppServices(config *AppServiceAPI, derived *Derived) error {
|
|||
}
|
||||
|
||||
// Load the config data into our struct
|
||||
if err = yaml.UnmarshalStrict(configData, &appservice); err != nil {
|
||||
if err = yaml.Unmarshal(configData, &appservice); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -315,6 +315,20 @@ func checkErrors(config *AppServiceAPI, derived *Derived) (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Check required fields
|
||||
if appservice.ID == "" {
|
||||
return ConfigErrors([]string{"Application service ID is required"})
|
||||
}
|
||||
if appservice.ASToken == "" {
|
||||
return ConfigErrors([]string{"Application service Token is required"})
|
||||
}
|
||||
if appservice.HSToken == "" {
|
||||
return ConfigErrors([]string{"Homeserver Token is required"})
|
||||
}
|
||||
if appservice.SenderLocalpart == "" {
|
||||
return ConfigErrors([]string{"Sender Localpart is required"})
|
||||
}
|
||||
|
||||
// Check if the url has trailing /'s. If so, remove them
|
||||
appservice.URL = strings.TrimRight(appservice.URL, "/")
|
||||
|
||||
|
|
|
|||
|
|
@ -138,7 +138,7 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
|||
presence := msg.Header.Get("presence")
|
||||
timestamp := msg.Header.Get("last_active_ts")
|
||||
fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync"))
|
||||
logrus.Debugf("syncAPI received presence event: %+v", msg.Header)
|
||||
logrus.Tracef("syncAPI received presence event: %+v", msg.Header)
|
||||
|
||||
if fromSync { // do not process local presence changes; we already did this synchronously.
|
||||
return true
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Ms
|
|||
Type: msg.Header.Get("type"),
|
||||
}
|
||||
|
||||
timestamp, err := strconv.Atoi(msg.Header.Get("timestamp"))
|
||||
timestamp, err := strconv.ParseUint(msg.Header.Get("timestamp"), 10, 64)
|
||||
if err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("output log: message parse failure")
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
package routing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
|
@ -25,6 +26,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
roomserver "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
|
|
@ -149,13 +151,30 @@ func Context(
|
|||
if len(response.State) > filter.Limit {
|
||||
response.State = response.State[len(response.State)-filter.Limit:]
|
||||
}
|
||||
|
||||
start, end, err := getStartEnd(ctx, syncDB, eventsBefore, eventsAfter)
|
||||
if err == nil {
|
||||
response.End = end.String()
|
||||
response.Start = start.String()
|
||||
}
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: response,
|
||||
}
|
||||
}
|
||||
|
||||
func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
|
||||
if len(startEvents) > 0 {
|
||||
start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(endEvents) > 0 {
|
||||
end, err = syncDB.EventPositionInTopology(ctx, endEvents[0].EventID())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func applyLazyLoadMembers(
|
||||
device *userapi.Device,
|
||||
filter *gomatrixserverlib.RoomEventFilter,
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ type messagesReq struct {
|
|||
type messagesResp struct {
|
||||
Start string `json:"start"`
|
||||
StartStream string `json:"start_stream,omitempty"` // NOTSPEC: used by Cerulean, so clients can hit /messages then immediately /sync with a latest sync token
|
||||
End string `json:"end"`
|
||||
End string `json:"end,omitempty"`
|
||||
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
|
||||
State []gomatrixserverlib.ClientEvent `json:"state"`
|
||||
}
|
||||
|
|
@ -200,30 +200,6 @@ func OnIncomingMessagesRequest(
|
|||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
// at least fetch the membership events for the users returned in chunk if LazyLoadMembers is set
|
||||
state := []gomatrixserverlib.ClientEvent{}
|
||||
if filter.LazyLoadMembers {
|
||||
membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
|
||||
for _, evt := range clientEvents {
|
||||
// Don't add membership events the client should already know about
|
||||
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached {
|
||||
continue
|
||||
}
|
||||
membership, err := db.GetStateEvent(req.Context(), roomID, gomatrixserverlib.MRoomMember, evt.Sender)
|
||||
if err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("failed to get membership event for user")
|
||||
continue
|
||||
}
|
||||
if membership != nil {
|
||||
membershipToUser[evt.Sender] = membership
|
||||
lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID())
|
||||
}
|
||||
}
|
||||
for _, evt := range membershipToUser {
|
||||
state = append(state, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync))
|
||||
}
|
||||
}
|
||||
|
||||
util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
||||
"from": from.String(),
|
||||
"to": to.String(),
|
||||
|
|
@ -237,7 +213,13 @@ func OnIncomingMessagesRequest(
|
|||
Chunk: clientEvents,
|
||||
Start: start.String(),
|
||||
End: end.String(),
|
||||
State: state,
|
||||
}
|
||||
res.applyLazyLoadMembers(req.Context(), db, roomID, device, filter.LazyLoadMembers, lazyLoadCache)
|
||||
|
||||
// If we didn't return any events, set the end to an empty string, so it will be omitted
|
||||
// in the response JSON.
|
||||
if len(res.Chunk) == 0 {
|
||||
res.End = ""
|
||||
}
|
||||
if fromStream != nil {
|
||||
res.StartStream = fromStream.String()
|
||||
|
|
@ -250,6 +232,40 @@ func OnIncomingMessagesRequest(
|
|||
}
|
||||
}
|
||||
|
||||
// applyLazyLoadMembers loads membership events for users returned in Chunk, if the filter has
|
||||
// LazyLoadMembers enabled.
|
||||
func (m *messagesResp) applyLazyLoadMembers(
|
||||
ctx context.Context,
|
||||
db storage.Database,
|
||||
roomID string,
|
||||
device *userapi.Device,
|
||||
lazyLoad bool,
|
||||
lazyLoadCache caching.LazyLoadCache,
|
||||
) {
|
||||
if !lazyLoad {
|
||||
return
|
||||
}
|
||||
membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
|
||||
for _, evt := range m.Chunk {
|
||||
// Don't add membership events the client should already know about
|
||||
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached {
|
||||
continue
|
||||
}
|
||||
membership, err := db.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, evt.Sender)
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("failed to get membership event for user")
|
||||
continue
|
||||
}
|
||||
if membership != nil {
|
||||
membershipToUser[evt.Sender] = membership
|
||||
lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID())
|
||||
}
|
||||
}
|
||||
for _, evt := range membershipToUser {
|
||||
m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync))
|
||||
}
|
||||
}
|
||||
|
||||
func checkIsRoomForgotten(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (forgotten bool, exists bool, err error) {
|
||||
req := api.QueryMembershipForUserRequest{
|
||||
RoomID: roomID,
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
|||
currentlyActive := prevPresence.CurrentlyActive()
|
||||
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
|
||||
if skip {
|
||||
req.Log.Debugf("Skipping presence, no change (%s)", presence.UserID)
|
||||
req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,3 +48,4 @@ Notifications can be viewed with GET /notifications
|
|||
# More flakey
|
||||
|
||||
If remote user leaves room we no longer receive device updates
|
||||
Guest users can join guest_access rooms
|
||||
|
|
|
|||
|
|
@ -241,7 +241,6 @@ Inbound federation can receive v2 /send_join
|
|||
Message history can be paginated
|
||||
Backfill works correctly with history visibility set to joined
|
||||
Guest user cannot call /events globally
|
||||
Guest users can join guest_access rooms
|
||||
Guest user can set display names
|
||||
Guest user cannot upgrade other users
|
||||
Guest non-joined user cannot call /events on shared room
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/pushrules"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/dendrite/userapi/producers"
|
||||
|
|
@ -49,6 +50,7 @@ type UserInternalAPI struct {
|
|||
// AppServices is the list of all registered AS
|
||||
AppServices []config.ApplicationService
|
||||
KeyAPI keyapi.UserKeyAPI
|
||||
RSAPI rsapi.UserRoomserverAPI
|
||||
}
|
||||
|
||||
func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error {
|
||||
|
|
@ -452,6 +454,30 @@ func (a *UserInternalAPI) queryAppServiceToken(ctx context.Context, token, appSe
|
|||
|
||||
// PerformAccountDeactivation deactivates the user's account, removing all ability for the user to login again.
|
||||
func (a *UserInternalAPI) PerformAccountDeactivation(ctx context.Context, req *api.PerformAccountDeactivationRequest, res *api.PerformAccountDeactivationResponse) error {
|
||||
evacuateReq := &rsapi.PerformAdminEvacuateUserRequest{
|
||||
UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName),
|
||||
}
|
||||
evacuateRes := &rsapi.PerformAdminEvacuateUserResponse{}
|
||||
a.RSAPI.PerformAdminEvacuateUser(ctx, evacuateReq, evacuateRes)
|
||||
if err := evacuateRes.Error; err != nil {
|
||||
logrus.WithError(err).Errorf("Failed to evacuate user after account deactivation")
|
||||
}
|
||||
|
||||
deviceReq := &api.PerformDeviceDeletionRequest{
|
||||
UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName),
|
||||
}
|
||||
deviceRes := &api.PerformDeviceDeletionResponse{}
|
||||
if err := a.PerformDeviceDeletion(ctx, deviceReq, deviceRes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pusherReq := &api.PerformPusherDeletionRequest{
|
||||
Localpart: req.Localpart,
|
||||
}
|
||||
if err := a.PerformPusherDeletion(ctx, pusherReq, &struct{}{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := a.DB.DeactivateAccount(ctx, req.Localpart)
|
||||
res.AccountDeactivated = err == nil
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -78,6 +78,7 @@ func NewInternalAPI(
|
|||
ServerName: cfg.Matrix.ServerName,
|
||||
AppServices: appServices,
|
||||
KeyAPI: keyAPI,
|
||||
RSAPI: rsAPI,
|
||||
DisableTLSValidation: cfg.PushGatewayDisableTLSValidation,
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue