diff --git a/.github/workflows/dendrite.yml b/.github/workflows/dendrite.yml index 5d60301c7..6ebef4e13 100644 --- a/.github/workflows/dendrite.yml +++ b/.github/workflows/dendrite.yml @@ -17,6 +17,7 @@ jobs: name: WASM build test timeout-minutes: 5 runs-on: ubuntu-latest + if: ${{ false }} # disable for now steps: - uses: actions/checkout@v2 diff --git a/CHANGES.md b/CHANGES.md index 0db25f05a..3df03b2f6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,22 @@ # Changelog +## Dendrite 0.8.9 (2022-07-01) + +### Features + +* Incoming device list updates over federation are now queued in JetStream for processing so that they will no longer block incoming federation transactions and should never end up dropped, which will hopefully help E2EE reliability +* The `/context` endpoint now returns `"start"` and `"end"` parameters to allow pagination from a context call +* The `/messages` endpoint will no longer return `"end"` when there are no more messages remaining +* Deactivated user accounts will now leave all rooms automatically +* New admin endpoint `/_dendrite/admin/evacuateUser/{userID}` has been added for forcing a local user to leave all joined rooms +* Dendrite will now automatically attempt to raise the file descriptor limit at startup if it is too low + +### Fixes + +* A rare crash when retrieving remote device lists has been fixed +* Fixes a bug where events were not redacted properly over federation +* The `/invite` endpoints will now return an error instead of silently proceeding if the user ID is obviously malformed + ## Dendrite 0.8.8 (2022-06-09) ### Features diff --git a/build.sh b/build.sh index 700e6434f..f8b5001bf 100755 --- a/build.sh +++ b/build.sh @@ -21,4 +21,4 @@ mkdir -p bin CGO_ENABLED=1 go build -trimpath -ldflags "$FLAGS" -v -o "bin/" ./cmd/... -CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone +# CGO_ENABLED=0 GOOS=js GOARCH=wasm go build -trimpath -ldflags "$FLAGS" -o bin/main.wasm ./cmd/dendritejs-pinecone diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 48b1ae88d..0ac637793 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -17,6 +17,7 @@ package producers import ( "context" "encoding/json" + "fmt" "strconv" "time" @@ -83,7 +84,7 @@ func (p *SyncAPIProducer) SendReceipt( m.Header.Set(jetstream.RoomID, roomID) m.Header.Set(jetstream.EventID, eventID) m.Header.Set("type", receiptType) - m.Header.Set("timestamp", strconv.Itoa(int(timestamp))) + m.Header.Set("timestamp", fmt.Sprintf("%d", timestamp)) log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent) _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) diff --git a/clientapi/routing/register.go b/clientapi/routing/register.go index f8fa0dad3..af8d14f13 100644 --- a/clientapi/routing/register.go +++ b/clientapi/routing/register.go @@ -156,6 +156,13 @@ func (d *sessionsDict) startTimer(duration time.Duration, sessionID string) { }) } +func (d *sessionsDict) hasSession(sessionID string) bool { + d.RLock() + defer d.RUnlock() + _, ok := d.sessions[sessionID] + return ok +} + // addCompletedSessionStage records that a session has completed an auth stage // also starts a timer to delete the session once done. func (d *sessionsDict) addCompletedSessionStage(sessionID string, stage authtypes.LoginType) { diff --git a/clientapi/routing/register_publickey.go b/clientapi/routing/register_publickey.go index 258a47249..f5c972bb1 100644 --- a/clientapi/routing/register_publickey.go +++ b/clientapi/routing/register_publickey.go @@ -62,7 +62,7 @@ func handlePublicKeyRegistration( return false, authtypes.LoginStagePublicKeyNewRegistration, nil } - if _, ok := sessions.sessions[authHandler.GetSession()]; !ok { + if !sessions.hasSession(authHandler.GetSession()) { return false, "", &util.JSONResponse{ Code: http.StatusUnauthorized, JSON: jsonerror.Unknown("the session ID is missing or unknown."), diff --git a/federationapi/consumers/receipts.go b/federationapi/consumers/receipts.go index 9300451eb..2c9d79bcb 100644 --- a/federationapi/consumers/receipts.go +++ b/federationapi/consumers/receipts.go @@ -90,7 +90,7 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msg *nats.Msg) bo return true } - timestamp, err := strconv.Atoi(msg.Header.Get("timestamp")) + timestamp, err := strconv.ParseUint(msg.Header.Get("timestamp"), 10, 64) if err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("EDU output log: message parse failure") diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go index e371baaaa..43dd08dd8 100644 --- a/federationapi/producers/syncapi.go +++ b/federationapi/producers/syncapi.go @@ -53,7 +53,7 @@ func (p *SyncAPIProducer) SendReceipt( m.Header.Set(jetstream.RoomID, roomID) m.Header.Set(jetstream.EventID, eventID) m.Header.Set("type", receiptType) - m.Header.Set("timestamp", strconv.Itoa(int(timestamp))) + m.Header.Set("timestamp", fmt.Sprintf("%d", timestamp)) log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent) _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) diff --git a/go.mod b/go.mod index 7d3e354ae..20f444e3e 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a + github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 diff --git a/go.sum b/go.sum index c452bcfe2..0e1785c47 100644 --- a/go.sum +++ b/go.sum @@ -549,8 +549,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a h1:jOkrb6twViAGTHHadA51sQwdloHT0Vx1MCptk9InTHo= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220613132209-aedb3fbb511a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f h1:XF2+J6sOq07yhK1I7ItwsgRwXorjj7gqiCvgZ4dn8W8= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220701090733-da53994b0c7f/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE= github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= diff --git a/internal/version.go b/internal/version.go index e29996f36..9568f08cb 100644 --- a/internal/version.go +++ b/internal/version.go @@ -17,7 +17,7 @@ var build string const ( VersionMajor = 0 VersionMinor = 8 - VersionPatch = 8 + VersionPatch = 9 VersionTag = "" // example: "rc1" ) diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go index 9940eac60..44cfb5f2a 100644 --- a/keyserver/storage/storage_test.go +++ b/keyserver/storage/storage_test.go @@ -1,36 +1,26 @@ -package storage +package storage_test import ( "context" - "fmt" - "io/ioutil" - "log" - "os" "reflect" "testing" "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/keyserver/storage" "github.com/matrix-org/dendrite/keyserver/types" - "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/test" + "github.com/matrix-org/dendrite/test/testrig" ) var ctx = context.Background() -func MustCreateDatabase(t *testing.T) (Database, func()) { - tmpfile, err := ioutil.TempFile("", "keyserver_storage_test") +func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { + base, close := testrig.CreateBaseDendrite(t, dbType) + db, err := storage.NewDatabase(base, &base.Cfg.KeyServer.Database) if err != nil { - log.Fatal(err) - } - t.Logf("Database %s", tmpfile.Name()) - db, err := NewDatabase(nil, &config.DatabaseOptions{ - ConnectionString: config.DataSource(fmt.Sprintf("file://%s", tmpfile.Name())), - }) - if err != nil { - t.Fatalf("Failed to NewDatabase: %s", err) - } - return db, func() { - os.Remove(tmpfile.Name()) + t.Fatalf("failed to create new database: %v", err) } + return db, close } func MustNotError(t *testing.T, err error) { @@ -42,151 +32,159 @@ func MustNotError(t *testing.T, err error) { } func TestKeyChanges(t *testing.T) { - db, clean := MustCreateDatabase(t) - defer clean() - _, err := db.StoreKeyChange(ctx, "@alice:localhost") - MustNotError(t, err) - deviceChangeIDB, err := db.StoreKeyChange(ctx, "@bob:localhost") - MustNotError(t, err) - deviceChangeIDC, err := db.StoreKeyChange(ctx, "@charlie:localhost") - MustNotError(t, err) - userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, types.OffsetNewest) - if err != nil { - t.Fatalf("Failed to KeyChanges: %s", err) - } - if latest != deviceChangeIDC { - t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeIDC) - } - if !reflect.DeepEqual(userIDs, []string{"@charlie:localhost"}) { - t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs) - } + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + db, clean := MustCreateDatabase(t, dbType) + defer clean() + _, err := db.StoreKeyChange(ctx, "@alice:localhost") + MustNotError(t, err) + deviceChangeIDB, err := db.StoreKeyChange(ctx, "@bob:localhost") + MustNotError(t, err) + deviceChangeIDC, err := db.StoreKeyChange(ctx, "@charlie:localhost") + MustNotError(t, err) + userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, types.OffsetNewest) + if err != nil { + t.Fatalf("Failed to KeyChanges: %s", err) + } + if latest != deviceChangeIDC { + t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeIDC) + } + if !reflect.DeepEqual(userIDs, []string{"@charlie:localhost"}) { + t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs) + } + }) } func TestKeyChangesNoDupes(t *testing.T) { - db, clean := MustCreateDatabase(t) - defer clean() - deviceChangeIDA, err := db.StoreKeyChange(ctx, "@alice:localhost") - MustNotError(t, err) - deviceChangeIDB, err := db.StoreKeyChange(ctx, "@alice:localhost") - MustNotError(t, err) - if deviceChangeIDA == deviceChangeIDB { - t.Fatalf("Expected change ID to be different even when inserting key change for the same user, got %d for both changes", deviceChangeIDA) - } - deviceChangeID, err := db.StoreKeyChange(ctx, "@alice:localhost") - MustNotError(t, err) - userIDs, latest, err := db.KeyChanges(ctx, 0, types.OffsetNewest) - if err != nil { - t.Fatalf("Failed to KeyChanges: %s", err) - } - if latest != deviceChangeID { - t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeID) - } - if !reflect.DeepEqual(userIDs, []string{"@alice:localhost"}) { - t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs) - } + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + db, clean := MustCreateDatabase(t, dbType) + defer clean() + deviceChangeIDA, err := db.StoreKeyChange(ctx, "@alice:localhost") + MustNotError(t, err) + deviceChangeIDB, err := db.StoreKeyChange(ctx, "@alice:localhost") + MustNotError(t, err) + if deviceChangeIDA == deviceChangeIDB { + t.Fatalf("Expected change ID to be different even when inserting key change for the same user, got %d for both changes", deviceChangeIDA) + } + deviceChangeID, err := db.StoreKeyChange(ctx, "@alice:localhost") + MustNotError(t, err) + userIDs, latest, err := db.KeyChanges(ctx, 0, types.OffsetNewest) + if err != nil { + t.Fatalf("Failed to KeyChanges: %s", err) + } + if latest != deviceChangeID { + t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeID) + } + if !reflect.DeepEqual(userIDs, []string{"@alice:localhost"}) { + t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs) + } + }) } func TestKeyChangesUpperLimit(t *testing.T) { - db, clean := MustCreateDatabase(t) - defer clean() - deviceChangeIDA, err := db.StoreKeyChange(ctx, "@alice:localhost") - MustNotError(t, err) - deviceChangeIDB, err := db.StoreKeyChange(ctx, "@bob:localhost") - MustNotError(t, err) - _, err = db.StoreKeyChange(ctx, "@charlie:localhost") - MustNotError(t, err) - userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDA, deviceChangeIDB) - if err != nil { - t.Fatalf("Failed to KeyChanges: %s", err) - } - if latest != deviceChangeIDB { - t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeIDB) - } - if !reflect.DeepEqual(userIDs, []string{"@bob:localhost"}) { - t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs) - } + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + db, clean := MustCreateDatabase(t, dbType) + defer clean() + deviceChangeIDA, err := db.StoreKeyChange(ctx, "@alice:localhost") + MustNotError(t, err) + deviceChangeIDB, err := db.StoreKeyChange(ctx, "@bob:localhost") + MustNotError(t, err) + _, err = db.StoreKeyChange(ctx, "@charlie:localhost") + MustNotError(t, err) + userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDA, deviceChangeIDB) + if err != nil { + t.Fatalf("Failed to KeyChanges: %s", err) + } + if latest != deviceChangeIDB { + t.Fatalf("KeyChanges: got latest=%d want %d", latest, deviceChangeIDB) + } + if !reflect.DeepEqual(userIDs, []string{"@bob:localhost"}) { + t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs) + } + }) } // The purpose of this test is to make sure that the storage layer is generating sequential stream IDs per user, // and that they are returned correctly when querying for device keys. func TestDeviceKeysStreamIDGeneration(t *testing.T) { var err error - db, clean := MustCreateDatabase(t) - defer clean() - alice := "@alice:TestDeviceKeysStreamIDGeneration" - bob := "@bob:TestDeviceKeysStreamIDGeneration" - msgs := []api.DeviceMessage{ - { - Type: api.TypeDeviceKeyUpdate, - DeviceKeys: &api.DeviceKeys{ - DeviceID: "AAA", - UserID: alice, - KeyJSON: []byte(`{"key":"v1"}`), + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + db, clean := MustCreateDatabase(t, dbType) + defer clean() + alice := "@alice:TestDeviceKeysStreamIDGeneration" + bob := "@bob:TestDeviceKeysStreamIDGeneration" + msgs := []api.DeviceMessage{ + { + Type: api.TypeDeviceKeyUpdate, + DeviceKeys: &api.DeviceKeys{ + DeviceID: "AAA", + UserID: alice, + KeyJSON: []byte(`{"key":"v1"}`), + }, + // StreamID: 1 }, - // StreamID: 1 - }, - { - Type: api.TypeDeviceKeyUpdate, - DeviceKeys: &api.DeviceKeys{ - DeviceID: "AAA", - UserID: bob, - KeyJSON: []byte(`{"key":"v1"}`), + { + Type: api.TypeDeviceKeyUpdate, + DeviceKeys: &api.DeviceKeys{ + DeviceID: "AAA", + UserID: bob, + KeyJSON: []byte(`{"key":"v1"}`), + }, + // StreamID: 1 as this is a different user }, - // StreamID: 1 as this is a different user - }, - { - Type: api.TypeDeviceKeyUpdate, - DeviceKeys: &api.DeviceKeys{ - DeviceID: "another_device", - UserID: alice, - KeyJSON: []byte(`{"key":"v1"}`), + { + Type: api.TypeDeviceKeyUpdate, + DeviceKeys: &api.DeviceKeys{ + DeviceID: "another_device", + UserID: alice, + KeyJSON: []byte(`{"key":"v1"}`), + }, + // StreamID: 2 as this is a 2nd device key }, - // StreamID: 2 as this is a 2nd device key - }, - } - MustNotError(t, db.StoreLocalDeviceKeys(ctx, msgs)) - if msgs[0].StreamID != 1 { - t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=1 but got %d", msgs[0].StreamID) - } - if msgs[1].StreamID != 1 { - t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=1 (different user) but got %d", msgs[1].StreamID) - } - if msgs[2].StreamID != 2 { - t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=2 (another device) but got %d", msgs[2].StreamID) - } - - // updating a device sets the next stream ID for that user - msgs = []api.DeviceMessage{ - { - Type: api.TypeDeviceKeyUpdate, - DeviceKeys: &api.DeviceKeys{ - DeviceID: "AAA", - UserID: alice, - KeyJSON: []byte(`{"key":"v2"}`), - }, - // StreamID: 3 - }, - } - MustNotError(t, db.StoreLocalDeviceKeys(ctx, msgs)) - if msgs[0].StreamID != 3 { - t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID) - } - - // Querying for device keys returns the latest stream IDs - msgs, err = db.DeviceKeysForUser(ctx, alice, []string{"AAA", "another_device"}, false) - if err != nil { - t.Fatalf("DeviceKeysForUser returned error: %s", err) - } - wantStreamIDs := map[string]int64{ - "AAA": 3, - "another_device": 2, - } - if len(msgs) != len(wantStreamIDs) { - t.Fatalf("DeviceKeysForUser: wrong number of devices, got %d want %d", len(msgs), len(wantStreamIDs)) - } - for _, m := range msgs { - if m.StreamID != wantStreamIDs[m.DeviceID] { - t.Errorf("DeviceKeysForUser: wrong returned stream ID for key, got %d want %d", m.StreamID, wantStreamIDs[m.DeviceID]) } - } + MustNotError(t, db.StoreLocalDeviceKeys(ctx, msgs)) + if msgs[0].StreamID != 1 { + t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=1 but got %d", msgs[0].StreamID) + } + if msgs[1].StreamID != 1 { + t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=1 (different user) but got %d", msgs[1].StreamID) + } + if msgs[2].StreamID != 2 { + t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=2 (another device) but got %d", msgs[2].StreamID) + } + + // updating a device sets the next stream ID for that user + msgs = []api.DeviceMessage{ + { + Type: api.TypeDeviceKeyUpdate, + DeviceKeys: &api.DeviceKeys{ + DeviceID: "AAA", + UserID: alice, + KeyJSON: []byte(`{"key":"v2"}`), + }, + // StreamID: 3 + }, + } + MustNotError(t, db.StoreLocalDeviceKeys(ctx, msgs)) + if msgs[0].StreamID != 3 { + t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID) + } + + // Querying for device keys returns the latest stream IDs + msgs, err = db.DeviceKeysForUser(ctx, alice, []string{"AAA", "another_device"}, false) + if err != nil { + t.Fatalf("DeviceKeysForUser returned error: %s", err) + } + wantStreamIDs := map[string]int64{ + "AAA": 3, + "another_device": 2, + } + if len(msgs) != len(wantStreamIDs) { + t.Fatalf("DeviceKeysForUser: wrong number of devices, got %d want %d", len(msgs), len(wantStreamIDs)) + } + for _, m := range msgs { + if m.StreamID != wantStreamIDs[m.DeviceID] { + t.Errorf("DeviceKeysForUser: wrong returned stream ID for key, got %d want %d", m.StreamID, wantStreamIDs[m.DeviceID]) + } + } + }) } diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go index f47ae47fe..175bb9310 100644 --- a/roomserver/internal/alias.go +++ b/roomserver/internal/alias.go @@ -216,11 +216,10 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( return err } - err = api.SendEvents(ctx, r.RSAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false) + err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false) if err != nil { return err } - } } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index acdaeef6f..1a11586a5 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -12,8 +12,11 @@ import ( "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/internal/perform" "github.com/matrix-org/dendrite/roomserver/internal/query" + "github.com/matrix-org/dendrite/roomserver/producers" "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -37,6 +40,7 @@ type RoomserverInternalAPI struct { *perform.Upgrader *perform.Admin ProcessContext *process.ProcessContext + Base *base.BaseDendrite DB storage.Database Cfg *config.RoomServer Cache caching.RoomServerCaches @@ -49,34 +53,43 @@ type RoomserverInternalAPI struct { JetStream nats.JetStreamContext Durable string InputRoomEventTopic string // JetStream topic for new input room events - OutputRoomEventTopic string // JetStream topic for new output room events + OutputProducer *producers.RoomEventProducer PerspectiveServerNames []gomatrixserverlib.ServerName } func NewRoomserverAPI( - processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database, - consumer nats.JetStreamContext, nc *nats.Conn, - inputRoomEventTopic, outputRoomEventTopic string, - caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName, + base *base.BaseDendrite, roomserverDB storage.Database, + js nats.JetStreamContext, nc *nats.Conn, ) *RoomserverInternalAPI { + var perspectiveServerNames []gomatrixserverlib.ServerName + for _, kp := range base.Cfg.FederationAPI.KeyPerspectives { + perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) + } + serverACLs := acls.NewServerACLs(roomserverDB) + producer := &producers.RoomEventProducer{ + Topic: string(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)), + JetStream: js, + ACLs: serverACLs, + } a := &RoomserverInternalAPI{ - ProcessContext: processCtx, + ProcessContext: base.ProcessContext, DB: roomserverDB, - Cfg: cfg, - Cache: caches, - ServerName: cfg.Matrix.ServerName, + Base: base, + Cfg: &base.Cfg.RoomServer, + Cache: base.Caches, + ServerName: base.Cfg.Global.ServerName, PerspectiveServerNames: perspectiveServerNames, - InputRoomEventTopic: inputRoomEventTopic, - OutputRoomEventTopic: outputRoomEventTopic, - JetStream: consumer, + InputRoomEventTopic: base.Cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent), + OutputProducer: producer, + JetStream: js, NATSClient: nc, - Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"), + Durable: base.Cfg.Global.JetStream.Durable("RoomserverInputConsumer"), ServerACLs: serverACLs, Queryer: &query.Queryer{ DB: roomserverDB, - Cache: caches, - ServerName: cfg.Matrix.ServerName, + Cache: base.Caches, + ServerName: base.Cfg.Global.ServerName, ServerACLs: serverACLs, }, // perform-er structs get initialised when we have a federation sender to use @@ -92,19 +105,20 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio r.KeyRing = keyRing r.Inputer = &input.Inputer{ - Cfg: r.Cfg, - ProcessContext: r.ProcessContext, - DB: r.DB, - InputRoomEventTopic: r.InputRoomEventTopic, - OutputRoomEventTopic: r.OutputRoomEventTopic, - JetStream: r.JetStream, - NATSClient: r.NATSClient, - Durable: nats.Durable(r.Durable), - ServerName: r.Cfg.Matrix.ServerName, - FSAPI: fsAPI, - KeyRing: keyRing, - ACLs: r.ServerACLs, - Queryer: r.Queryer, + Cfg: &r.Base.Cfg.RoomServer, + Base: r.Base, + ProcessContext: r.Base.ProcessContext, + DB: r.DB, + InputRoomEventTopic: r.InputRoomEventTopic, + OutputProducer: r.OutputProducer, + JetStream: r.JetStream, + NATSClient: r.NATSClient, + Durable: nats.Durable(r.Durable), + ServerName: r.Cfg.Matrix.ServerName, + FSAPI: fsAPI, + KeyRing: keyRing, + ACLs: r.ServerACLs, + Queryer: r.Queryer, } r.Inviter = &perform.Inviter{ DB: r.DB, @@ -199,7 +213,7 @@ func (r *RoomserverInternalAPI) PerformInvite( if len(outputEvents) == 0 { return nil } - return r.WriteOutputEvents(req.Event.RoomID(), outputEvents) + return r.OutputProducer.ProduceRoomEvents(req.Event.RoomID(), outputEvents) } func (r *RoomserverInternalAPI) PerformLeave( @@ -215,7 +229,7 @@ func (r *RoomserverInternalAPI) PerformLeave( if len(outputEvents) == 0 { return nil } - return r.WriteOutputEvents(req.RoomID, outputEvents) + return r.OutputProducer.ProduceRoomEvents(req.RoomID, outputEvents) } func (r *RoomserverInternalAPI) PerformForget( diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 600994c5a..ecd4ecbb5 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -29,7 +29,9 @@ import ( "github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/query" + "github.com/matrix-org/dendrite/roomserver/producers" "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" @@ -37,16 +39,8 @@ import ( "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" - log "github.com/sirupsen/logrus" - "github.com/tidwall/gjson" ) -var keyContentFields = map[string]string{ - "m.room.join_rules": "join_rule", - "m.room.history_visibility": "history_visibility", - "m.room.member": "membership", -} - // Inputer is responsible for consuming from the roomserver input // streams and processing the events. All input events are queued // into a single NATS stream and the order is preserved strictly. @@ -75,19 +69,20 @@ var keyContentFields = map[string]string{ // up, so they will do nothing until a new event comes in for B // or C. type Inputer struct { - Cfg *config.RoomServer - ProcessContext *process.ProcessContext - DB storage.Database - NATSClient *nats.Conn - JetStream nats.JetStreamContext - Durable nats.SubOpt - ServerName gomatrixserverlib.ServerName - FSAPI fedapi.RoomserverFederationAPI - KeyRing gomatrixserverlib.JSONVerifier - ACLs *acls.ServerACLs - InputRoomEventTopic string - OutputRoomEventTopic string - workers sync.Map // room ID -> *worker + Cfg *config.RoomServer + Base *base.BaseDendrite + ProcessContext *process.ProcessContext + DB storage.Database + NATSClient *nats.Conn + JetStream nats.JetStreamContext + Durable nats.SubOpt + ServerName gomatrixserverlib.ServerName + FSAPI fedapi.RoomserverFederationAPI + KeyRing gomatrixserverlib.JSONVerifier + ACLs *acls.ServerACLs + InputRoomEventTopic string + OutputProducer *producers.RoomEventProducer + workers sync.Map // room ID -> *worker Queryer *query.Queryer } @@ -167,7 +162,9 @@ func (r *Inputer) startWorkerForRoom(roomID string) { // will look to see if we have a worker for that room which has its // own consumer. If we don't, we'll start one. func (r *Inputer) Start() error { - prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration) + if r.Base.EnableMetrics { + prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration) + } _, err := r.JetStream.Subscribe( "", // This is blank because we specified it in BindStream. func(m *nats.Msg) { @@ -370,58 +367,6 @@ func (r *Inputer) InputRoomEvents( } } -// WriteOutputEvents implements OutputRoomEventWriter -func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { - var err error - for _, update := range updates { - msg := &nats.Msg{ - Subject: r.OutputRoomEventTopic, - Header: nats.Header{}, - } - msg.Header.Set(jetstream.RoomID, roomID) - msg.Data, err = json.Marshal(update) - if err != nil { - return err - } - logger := log.WithFields(log.Fields{ - "room_id": roomID, - "type": update.Type, - }) - if update.NewRoomEvent != nil { - eventType := update.NewRoomEvent.Event.Type() - logger = logger.WithFields(log.Fields{ - "event_type": eventType, - "event_id": update.NewRoomEvent.Event.EventID(), - "adds_state": len(update.NewRoomEvent.AddsStateEventIDs), - "removes_state": len(update.NewRoomEvent.RemovesStateEventIDs), - "send_as_server": update.NewRoomEvent.SendAsServer, - "sender": update.NewRoomEvent.Event.Sender(), - }) - if update.NewRoomEvent.Event.StateKey() != nil { - logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey()) - } - contentKey := keyContentFields[eventType] - if contentKey != "" { - value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey) - if value.Exists() { - logger = logger.WithField("content_value", value.String()) - } - } - - if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") { - ev := update.NewRoomEvent.Event.Unwrap() - defer r.ACLs.OnServerACLUpdate(ev) - } - } - logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic) - if _, err := r.JetStream.PublishMsg(msg); err != nil { - logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err) - return err - } - } - return nil -} - var roomserverInputBackpressure = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dendrite", diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index ff05f798c..743b1efe6 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -381,7 +381,7 @@ func (r *Inputer) processRoomEvent( return fmt.Errorf("r.updateLatestEvents: %w", err) } case api.KindOld: - err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ + err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{ { Type: api.OutputTypeOldRoomEvent, OldRoomEvent: &api.OutputOldRoomEvent{ @@ -400,7 +400,7 @@ func (r *Inputer) processRoomEvent( // so notify downstream components to redact this event - they should have it if they've // been tracking our output log. if redactedEventID != "" { - err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ + err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{ { Type: api.OutputTypeRedactedEvent, RedactedEvent: &api.OutputRedactedEvent{ diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index e76f4ba8d..f7d15fdb5 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -192,7 +192,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // send the event asynchronously but we would need to ensure that 1) the events are written to the log in // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { + if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID(), updates); err != nil { return fmt.Errorf("u.api.WriteOutputEvents: %w", err) } diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go index d3fb71099..1cb52966a 100644 --- a/roomserver/internal/perform/perform_admin.go +++ b/roomserver/internal/perform/perform_admin.go @@ -219,7 +219,7 @@ func (r *Admin) PerformAdminEvacuateUser( if len(outputEvents) == 0 { continue } - if err := r.Inputer.WriteOutputEvents(roomID, outputEvents); err != nil { + if err := r.Inputer.OutputProducer.ProduceRoomEvents(roomID, outputEvents); err != nil { res.Error = &api.PerformError{ Code: api.PerformErrorBadRequest, Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err), diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index 1bc4c75ce..9eddca733 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -18,6 +18,7 @@ import ( "context" "fmt" + "github.com/getsentry/sentry-go" federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" @@ -206,8 +207,17 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom } logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result) for _, res := range result { - if res.Error != nil { - logger.WithError(res.Error).Warn("event failed PDU checks") + switch err := res.Error.(type) { + case nil: + case gomatrixserverlib.SignatureErr: + // The signature of the event might not be valid anymore, for example if + // the key ID was reused with a different signature. + logger.WithError(err).Errorf("event failed PDU checks, storing anyway") + case gomatrixserverlib.AuthChainErr, gomatrixserverlib.AuthRulesErr: + logger.WithError(err).Warn("event failed PDU checks") + continue + default: + logger.WithError(err).Warn("event failed PDU checks") continue } missingMap[id] = res.Event @@ -306,6 +316,7 @@ FederationHit: b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res return res, nil } + sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit return nil, lastErr } @@ -366,19 +377,25 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr } } - c := gomatrixserverlib.FederatedStateProvider{ - FedClient: b.fsAPI, - RememberAuthEvents: false, - Server: b.servers[0], + var lastErr error + for _, srv := range b.servers { + c := gomatrixserverlib.FederatedStateProvider{ + FedClient: b.fsAPI, + RememberAuthEvents: false, + Server: srv, + } + result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs) + if err != nil { + lastErr = err + continue + } + for eventID, ev := range result { + b.eventIDMap[eventID] = ev + } + return result, nil } - result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs) - if err != nil { - return nil, err - } - for eventID, ev := range result { - b.eventIDMap[eventID] = ev - } - return result, nil + sentry.CaptureException(lastErr) // temporary to see if we might need to raise the server limit + return nil, lastErr } // ServersAtEvent is called when trying to determine which server to request from. diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go index d19fc8386..32c81e849 100644 --- a/roomserver/internal/perform/perform_inbound_peek.go +++ b/roomserver/internal/perform/perform_inbound_peek.go @@ -113,7 +113,7 @@ func (r *InboundPeeker) PerformInboundPeek( response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion)) } - err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{ + err = r.Inputer.OutputProducer.ProduceRoomEvents(request.RoomID, []api.OutputEvent{ { Type: api.OutputTypeNewInboundPeek, NewInboundPeek: &api.OutputNewInboundPeek{ diff --git a/roomserver/internal/perform/perform_peek.go b/roomserver/internal/perform/perform_peek.go index 45e63888d..5560916b2 100644 --- a/roomserver/internal/perform/perform_peek.go +++ b/roomserver/internal/perform/perform_peek.go @@ -207,7 +207,7 @@ func (r *Peeker) performPeekRoomByID( // TODO: handle federated peeks - err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{ + err = r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{ { Type: api.OutputTypeNewPeek, NewPeek: &api.OutputNewPeek{ diff --git a/roomserver/internal/perform/perform_unpeek.go b/roomserver/internal/perform/perform_unpeek.go index 1057499cb..1fe8d5a0f 100644 --- a/roomserver/internal/perform/perform_unpeek.go +++ b/roomserver/internal/perform/perform_unpeek.go @@ -96,7 +96,7 @@ func (r *Unpeeker) performUnpeekRoomByID( // TODO: handle federated peeks - err = r.Inputer.WriteOutputEvents(req.RoomID, []api.OutputEvent{ + err = r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{ { Type: api.OutputTypeRetirePeek, RetirePeek: &api.OutputRetirePeek{ diff --git a/roomserver/producers/roomevent.go b/roomserver/producers/roomevent.go new file mode 100644 index 000000000..987e6c942 --- /dev/null +++ b/roomserver/producers/roomevent.go @@ -0,0 +1,89 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + "github.com/matrix-org/dendrite/roomserver/acls" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" +) + +var keyContentFields = map[string]string{ + "m.room.join_rules": "join_rule", + "m.room.history_visibility": "history_visibility", + "m.room.member": "membership", +} + +type RoomEventProducer struct { + Topic string + ACLs *acls.ServerACLs + JetStream nats.JetStreamContext +} + +func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error { + var err error + for _, update := range updates { + msg := &nats.Msg{ + Subject: r.Topic, + Header: nats.Header{}, + } + msg.Header.Set(jetstream.RoomID, roomID) + msg.Data, err = json.Marshal(update) + if err != nil { + return err + } + logger := log.WithFields(log.Fields{ + "room_id": roomID, + "type": update.Type, + }) + if update.NewRoomEvent != nil { + eventType := update.NewRoomEvent.Event.Type() + logger = logger.WithFields(log.Fields{ + "event_type": eventType, + "event_id": update.NewRoomEvent.Event.EventID(), + "adds_state": len(update.NewRoomEvent.AddsStateEventIDs), + "removes_state": len(update.NewRoomEvent.RemovesStateEventIDs), + "send_as_server": update.NewRoomEvent.SendAsServer, + "sender": update.NewRoomEvent.Event.Sender(), + }) + if update.NewRoomEvent.Event.StateKey() != nil { + logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey()) + } + contentKey := keyContentFields[eventType] + if contentKey != "" { + value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey) + if value.Exists() { + logger = logger.WithField("content_value", value.String()) + } + } + + if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") { + ev := update.NewRoomEvent.Event.Unwrap() + defer r.ACLs.OnServerACLUpdate(ev) + } + } + logger.Tracef("Producing to topic '%s'", r.Topic) + if _, err := r.JetStream.PublishMsg(msg); err != nil { + logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.Topic, err) + return err + } + } + return nil +} diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 1480e8942..1f707735b 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -17,13 +17,10 @@ package roomserver import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/roomserver/inthttp" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/roomserver/internal" + "github.com/matrix-org/dendrite/roomserver/inthttp" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/base" - "github.com/matrix-org/dendrite/setup/jetstream" "github.com/sirupsen/logrus" ) @@ -40,11 +37,6 @@ func NewInternalAPI( ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer - var perspectiveServerNames []gomatrixserverlib.ServerName - for _, kp := range base.Cfg.FederationAPI.KeyPerspectives { - perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) - } - roomserverDB, err := storage.Open(base, &cfg.Database, base.Caches) if err != nil { logrus.WithError(err).Panicf("failed to connect to room server db") @@ -53,9 +45,6 @@ func NewInternalAPI( js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) return internal.NewRoomserverAPI( - base.ProcessContext, cfg, roomserverDB, js, nc, - cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), - cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), - base.Caches, perspectiveServerNames, + base, roomserverDB, js, nc, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go new file mode 100644 index 000000000..4e98af853 --- /dev/null +++ b/roomserver/roomserver_test.go @@ -0,0 +1,69 @@ +package roomserver_test + +import ( + "context" + "testing" + + "github.com/matrix-org/dendrite/roomserver" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/test" + "github.com/matrix-org/dendrite/test/testrig" + "github.com/matrix-org/gomatrixserverlib" +) + +func mustCreateDatabase(t *testing.T, dbType test.DBType) (*base.BaseDendrite, storage.Database, func()) { + base, close := testrig.CreateBaseDendrite(t, dbType) + db, err := storage.Open(base, &base.Cfg.KeyServer.Database, base.Caches) + if err != nil { + t.Fatalf("failed to create Database: %v", err) + } + return base, db, close +} + +func Test_SharedUsers(t *testing.T) { + alice := test.NewUser(t) + bob := test.NewUser(t) + room := test.NewRoom(t, alice, test.RoomPreset(test.PresetTrustedPrivateChat)) + + // Invite and join Bob + room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{ + "membership": "invite", + }, test.WithStateKey(bob.ID)) + room.CreateAndInsert(t, bob, gomatrixserverlib.MRoomMember, map[string]interface{}{ + "membership": "join", + }, test.WithStateKey(bob.ID)) + + ctx := context.Background() + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + base, _, close := mustCreateDatabase(t, dbType) + defer close() + + rsAPI := roomserver.NewInternalAPI(base) + // SetFederationAPI starts the room event input consumer + rsAPI.SetFederationAPI(nil, nil) + // Create the room + if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + + // Query the shared users for Alice, there should only be Bob. + // This is used by the SyncAPI keychange consumer. + res := &api.QuerySharedUsersResponse{} + if err := rsAPI.QuerySharedUsers(ctx, &api.QuerySharedUsersRequest{UserID: alice.ID}, res); err != nil { + t.Fatalf("unable to query known users: %v", err) + } + if _, ok := res.UserIDsToCount[bob.ID]; !ok { + t.Fatalf("expected to find %s in shared users, but didn't: %+v", bob.ID, res.UserIDsToCount) + } + // Also verify that we get the expected result when specifying OtherUserIDs. + // This is used by the SyncAPI when getting device list changes. + if err := rsAPI.QuerySharedUsers(ctx, &api.QuerySharedUsersRequest{UserID: alice.ID, OtherUserIDs: []string{bob.ID}}, res); err != nil { + t.Fatalf("unable to query known users: %v", err) + } + if _, ok := res.UserIDsToCount[bob.ID]; !ok { + t.Fatalf("expected to find %s in shared users, but didn't: %+v", bob.ID, res.UserIDsToCount) + } + }) +} diff --git a/roomserver/state/state.go b/roomserver/state/state.go index 6c4e4b860..91f271652 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -110,7 +110,7 @@ func (v *StateResolution) LoadStateAtEvent( snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID) if err != nil { - return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %s", eventID, err) + return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err) } if snapshotNID == 0 { return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID) diff --git a/roomserver/storage/postgres/membership_table.go b/roomserver/storage/postgres/membership_table.go index c01753c3a..ce626ad1d 100644 --- a/roomserver/storage/postgres/membership_table.go +++ b/roomserver/storage/postgres/membership_table.go @@ -65,12 +65,18 @@ CREATE TABLE IF NOT EXISTS roomserver_membership ( ); ` -var selectJoinedUsersSetForRoomsSQL = "" + +var selectJoinedUsersSetForRoomsAndUserSQL = "" + "SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" + " WHERE room_nid = ANY($1) AND target_nid = ANY($2) AND" + " membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" + " GROUP BY target_nid" +var selectJoinedUsersSetForRoomsSQL = "" + + "SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" + + " WHERE room_nid = ANY($1) AND" + + " membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" + + " GROUP BY target_nid" + // Insert a row in to membership table so that it can be locked by the // SELECT FOR UPDATE const insertMembershipSQL = "" + @@ -153,6 +159,7 @@ type membershipStatements struct { selectLocalMembershipsFromRoomStmt *sql.Stmt updateMembershipStmt *sql.Stmt selectRoomsWithMembershipStmt *sql.Stmt + selectJoinedUsersSetForRoomsAndUserStmt *sql.Stmt selectJoinedUsersSetForRoomsStmt *sql.Stmt selectKnownUsersStmt *sql.Stmt updateMembershipForgetRoomStmt *sql.Stmt @@ -178,6 +185,7 @@ func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) { {&s.selectLocalMembershipsFromRoomStmt, selectLocalMembershipsFromRoomSQL}, {&s.updateMembershipStmt, updateMembershipSQL}, {&s.selectRoomsWithMembershipStmt, selectRoomsWithMembershipSQL}, + {&s.selectJoinedUsersSetForRoomsAndUserStmt, selectJoinedUsersSetForRoomsAndUserSQL}, {&s.selectJoinedUsersSetForRoomsStmt, selectJoinedUsersSetForRoomsSQL}, {&s.selectKnownUsersStmt, selectKnownUsersSQL}, {&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom}, @@ -313,8 +321,18 @@ func (s *membershipStatements) SelectJoinedUsersSetForRooms( roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID, ) (map[types.EventStateKeyNID]int, error) { + var ( + rows *sql.Rows + err error + ) stmt := sqlutil.TxStmt(txn, s.selectJoinedUsersSetForRoomsStmt) - rows, err := stmt.QueryContext(ctx, pq.Array(roomNIDs), pq.Array(userNIDs)) + if len(userNIDs) > 0 { + stmt = sqlutil.TxStmt(txn, s.selectJoinedUsersSetForRoomsAndUserStmt) + rows, err = stmt.QueryContext(ctx, pq.Array(roomNIDs), pq.Array(userNIDs)) + } else { + rows, err = stmt.QueryContext(ctx, pq.Array(roomNIDs)) + } + if err != nil { return nil, err } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 67dcfdf38..3191280cb 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -263,6 +263,12 @@ func (d *Database) snapshotNIDFromEventID( ctx context.Context, txn *sql.Tx, eventID string, ) (types.StateSnapshotNID, error) { _, stateNID, err := d.EventsTable.SelectEvent(ctx, txn, eventID) + if err != nil { + return 0, err + } + if stateNID == 0 { + return 0, sql.ErrNoRows // effectively there's no state entry + } return stateNID, err } @@ -1214,6 +1220,13 @@ func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs [ stateKeyNIDs[i] = nid i++ } + // If we didn't have any userIDs to look up, get the UserIDs for the returned userNIDToCount now + if len(userIDs) == 0 { + nidToUserID, err = d.EventStateKeys(ctx, stateKeyNIDs) + if err != nil { + return nil, err + } + } result := make(map[string]int, len(userNIDToCount)) for nid, count := range userNIDToCount { result[nidToUserID[nid]] = count diff --git a/roomserver/storage/sqlite3/membership_table.go b/roomserver/storage/sqlite3/membership_table.go index 6f0fe8b64..570d3919c 100644 --- a/roomserver/storage/sqlite3/membership_table.go +++ b/roomserver/storage/sqlite3/membership_table.go @@ -41,12 +41,18 @@ const membershipSchema = ` ); ` -var selectJoinedUsersSetForRoomsSQL = "" + +var selectJoinedUsersSetForRoomsAndUserSQL = "" + "SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" + " WHERE room_nid IN ($1) AND target_nid IN ($2) AND" + " membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" + " GROUP BY target_nid" +var selectJoinedUsersSetForRoomsSQL = "" + + "SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" + + " WHERE room_nid IN ($1) AND " + + " membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" + + " GROUP BY target_nid" + // Insert a row in to membership table so that it can be locked by the // SELECT FOR UPDATE const insertMembershipSQL = "" + @@ -293,8 +299,12 @@ func (s *membershipStatements) SelectJoinedUsersSetForRooms(ctx context.Context, for _, v := range userNIDs { params = append(params, v) } + query := strings.Replace(selectJoinedUsersSetForRoomsSQL, "($1)", sqlutil.QueryVariadic(len(roomNIDs)), 1) - query = strings.Replace(query, "($2)", sqlutil.QueryVariadicOffset(len(userNIDs), len(roomNIDs)), 1) + if len(userNIDs) > 0 { + query = strings.Replace(selectJoinedUsersSetForRoomsAndUserSQL, "($1)", sqlutil.QueryVariadic(len(roomNIDs)), 1) + query = strings.Replace(query, "($2)", sqlutil.QueryVariadicOffset(len(userNIDs), len(roomNIDs)), 1) + } var rows *sql.Rows var err error if txn != nil { diff --git a/setup/config/config_appservice.go b/setup/config/config_appservice.go index ff3287714..9b89fc9af 100644 --- a/setup/config/config_appservice.go +++ b/setup/config/config_appservice.go @@ -187,7 +187,7 @@ func loadAppServices(config *AppServiceAPI, derived *Derived) error { } // Load the config data into our struct - if err = yaml.UnmarshalStrict(configData, &appservice); err != nil { + if err = yaml.Unmarshal(configData, &appservice); err != nil { return err } @@ -315,6 +315,20 @@ func checkErrors(config *AppServiceAPI, derived *Derived) (err error) { } } + // Check required fields + if appservice.ID == "" { + return ConfigErrors([]string{"Application service ID is required"}) + } + if appservice.ASToken == "" { + return ConfigErrors([]string{"Application service Token is required"}) + } + if appservice.HSToken == "" { + return ConfigErrors([]string{"Homeserver Token is required"}) + } + if appservice.SenderLocalpart == "" { + return ConfigErrors([]string{"Sender Localpart is required"}) + } + // Check if the url has trailing /'s. If so, remove them appservice.URL = strings.TrimRight(appservice.URL, "/") diff --git a/syncapi/consumers/receipts.go b/syncapi/consumers/receipts.go index 6bb0747f0..83156cf93 100644 --- a/syncapi/consumers/receipts.go +++ b/syncapi/consumers/receipts.go @@ -87,7 +87,7 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Ms Type: msg.Header.Get("type"), } - timestamp, err := strconv.Atoi(msg.Header.Get("timestamp")) + timestamp, err := strconv.ParseUint(msg.Header.Get("timestamp"), 10, 64) if err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("output log: message parse failure") diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index d021d365d..f6b4d15e0 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -15,6 +15,7 @@ package routing import ( + "context" "database/sql" "encoding/json" "fmt" @@ -25,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/internal/caching" roomserver "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -149,13 +151,30 @@ func Context( if len(response.State) > filter.Limit { response.State = response.State[len(response.State)-filter.Limit:] } - + start, end, err := getStartEnd(ctx, syncDB, eventsBefore, eventsAfter) + if err == nil { + response.End = end.String() + response.Start = start.String() + } return util.JSONResponse{ Code: http.StatusOK, JSON: response, } } +func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { + if len(startEvents) > 0 { + start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID()) + if err != nil { + return + } + } + if len(endEvents) > 0 { + end, err = syncDB.EventPositionInTopology(ctx, endEvents[0].EventID()) + } + return +} + func applyLazyLoadMembers( device *userapi.Device, filter *gomatrixserverlib.RoomEventFilter, diff --git a/sytest-blacklist b/sytest-blacklist index 56142797b..bcc345f6e 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -48,11 +48,4 @@ Notifications can be viewed with GET /notifications # More flakey If remote user leaves room we no longer receive device updates - -# User sees their own presence in a sync - -# Inbound /v1/send_join rejects joins from other servers - -# Some changes regressed this test. Disabling for now while investigating - Guest users can join guest_access rooms