diff --git a/CHANGES.md b/CHANGES.md index 57e3a3d4f..97ec7bec4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,29 @@ # Changelog +## Dendrite 0.13.5 (2023-12-12) + +Upgrading to this version is **highly** recommended, as it fixes several long-standing bugs in +our CanonicalJSON implementation. + +### Fixes + +- Convert unicode escapes to lowercase (gomatrixserverlib) +- Fix canonical json utf-16 surrogate pair detection logic (gomatrixserverlib) +- Handle negative zero and exponential numbers in Canonical JSON verification (gomatrixserverlib) +- Avoid logging unnecessary messages when unable to fetch server keys if multiple fetchers are used (gomatrixserverlib) +- Issues around the device list updater have been fixed, which should ensure that there are always + workers available to process incoming device list updates. +- A panic in the `/hierarchy` endpoints used for spaces has been fixed (client-server and server-server API) +- Fixes around the way we handle database transactions (including a potential connection leak) +- ACLs are now updated when received as outliers +- A race condition, which could lead to bridges instantly leaving a room after joining it, between the SyncAPI and + Appservices has been fixed + +### Features + +- **Appservice login is now supported!** +- Users can now kick themselves (used by some bridges) + ## Dendrite 0.13.4 (2023-10-25) Upgrading to this version is **highly** recommended, as it fixes a long-standing bug in the state resolution diff --git a/Dockerfile b/Dockerfile index 8c8f1588f..523857ccc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,8 @@ # # base installs required dependencies and runs go mod download to cache dependencies # -FROM --platform=${BUILDPLATFORM} docker.io/golang:1.21-alpine AS base +# Pinned to alpine3.18 until https://github.com/mattn/go-sqlite3/issues/1164 is solved +FROM --platform=${BUILDPLATFORM} docker.io/golang:1.21-alpine3.18 AS base RUN apk --update --no-cache add bash build-base curl git # diff --git a/appservice/appservice_test.go b/appservice/appservice_test.go index eca63371d..aa2af9f24 100644 --- a/appservice/appservice_test.go +++ b/appservice/appservice_test.go @@ -14,8 +14,18 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/federationapi/statistics" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/dendrite/syncapi" + uapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" + "github.com/tidwall/gjson" "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice/api" @@ -407,3 +417,190 @@ func TestRoomserverConsumerOneInvite(t *testing.T) { close(evChan) }) } + +// Note: If this test panics, it is because we timed out waiting for the +// join event to come through to the appservice and we close the DB/shutdown Dendrite. This makes the +// syncAPI unhappy, as it is unable to write to the database. +func TestOutputAppserviceEvent(t *testing.T) { + alice := test.NewUser(t) + bob := test.NewUser(t) + + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + cfg, processCtx, closeDB := testrig.CreateConfig(t, dbType) + defer closeDB() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + natsInstance := &jetstream.NATSInstance{} + + evChan := make(chan struct{}) + + caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) + // Create required internal APIs + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) + + // Create the router, so we can hit `/joined_members` + routers := httputil.NewRouters() + + accessTokens := map[*test.User]userDevice{ + bob: {}, + } + + usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) + clientapi.AddPublicRoutes(processCtx, routers, cfg, natsInstance, nil, rsAPI, nil, nil, nil, usrAPI, nil, nil, caching.DisableMetrics) + createAccessTokens(t, accessTokens, usrAPI, processCtx.Context(), routers) + + room := test.NewRoom(t, alice) + + // Invite Bob + room.CreateAndInsert(t, alice, spec.MRoomMember, map[string]interface{}{ + "membership": "invite", + }, test.WithStateKey(bob.ID)) + + // create a dummy AS url, handling the events + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var txn consumers.ApplicationServiceTransaction + err := json.NewDecoder(r.Body).Decode(&txn) + if err != nil { + t.Fatal(err) + } + for _, ev := range txn.Events { + if ev.Type != spec.MRoomMember { + continue + } + if ev.StateKey != nil && *ev.StateKey == bob.ID { + membership := gjson.GetBytes(ev.Content, "membership").Str + t.Logf("Processing membership: %s", membership) + switch membership { + case spec.Invite: + // Accept the invite + joinEv := room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{ + "membership": "join", + }, test.WithStateKey(bob.ID)) + + if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, []*types.HeaderedEvent{joinEv}, "test", "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + case spec.Join: // the AS has received the join event, now hit `/joined_members` to validate that + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/_matrix/client/v3/rooms/"+room.ID+"/joined_members", nil) + req.Header.Set("Authorization", "Bearer "+accessTokens[bob].accessToken) + routers.Client.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected HTTP 200, got %d: %s", rec.Code, rec.Body.String()) + } + + // Both Alice and Bob should be joined. If not, we have a race condition + if !gjson.GetBytes(rec.Body.Bytes(), "joined."+alice.ID).Exists() { + t.Errorf("Alice is not joined to the room") // in theory should not happen + } + if !gjson.GetBytes(rec.Body.Bytes(), "joined."+bob.ID).Exists() { + t.Errorf("Bob is not joined to the room") + } + evChan <- struct{}{} + default: + t.Fatalf("Unexpected membership: %s", membership) + } + } + } + })) + defer srv.Close() + + as := &config.ApplicationService{ + ID: "someID", + URL: srv.URL, + ASToken: "", + HSToken: "", + SenderLocalpart: "senderLocalPart", + NamespaceMap: map[string][]config.ApplicationServiceNamespace{ + "users": {{RegexpObject: regexp.MustCompile(bob.ID)}}, + "aliases": {{RegexpObject: regexp.MustCompile(room.ID)}}, + }, + } + as.CreateHTTPClient(cfg.AppServiceAPI.DisableTLSValidation) + + // Create a dummy application service + cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{*as} + + // Prepare AS Streams on the old topic to validate that they get deleted + jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + + token := jetstream.Tokenise(as.ID) + if err := jetstream.JetStreamConsumer( + processCtx.Context(), jsCtx, cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), + cfg.Global.JetStream.Durable("Appservice_"+token), + 50, // maximum number of events to send in a single transaction + func(ctx context.Context, msgs []*nats.Msg) bool { + return true + }, + ); err != nil { + t.Fatal(err) + } + + // Start the syncAPI to have `/joined_members` available + syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, natsInstance, usrAPI, rsAPI, caches, caching.DisableMetrics) + + // start the consumer + appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI) + + // At this point, the old JetStream consumers should be deleted + for consumer := range jsCtx.Consumers(cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) { + if consumer.Name == cfg.Global.JetStream.Durable("Appservice_"+token)+"Pull" { + t.Fatalf("Consumer still exists") + } + } + + // Create the room, this triggers the AS to receive an invite for Bob. + if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + + select { + // Pretty generous timeout duration... + case <-time.After(time.Millisecond * 1000): // wait for the AS to process the events + t.Errorf("Timed out waiting for join event") + case <-evChan: + } + close(evChan) + }) +} + +type userDevice struct { + accessToken string + deviceID string + password string +} + +func createAccessTokens(t *testing.T, accessTokens map[*test.User]userDevice, userAPI uapi.UserInternalAPI, ctx context.Context, routers httputil.Routers) { + t.Helper() + for u := range accessTokens { + localpart, serverName, _ := gomatrixserverlib.SplitID('@', u.ID) + userRes := &uapi.PerformAccountCreationResponse{} + password := util.RandomString(8) + if err := userAPI.PerformAccountCreation(ctx, &uapi.PerformAccountCreationRequest{ + AccountType: u.AccountType, + Localpart: localpart, + ServerName: serverName, + Password: password, + }, userRes); err != nil { + t.Errorf("failed to create account: %s", err) + } + req := test.NewRequest(t, http.MethodPost, "/_matrix/client/v3/login", test.WithJSONBody(t, map[string]interface{}{ + "type": authtypes.LoginTypePassword, + "identifier": map[string]interface{}{ + "type": "m.id.user", + "user": u.ID, + }, + "password": password, + })) + rec := httptest.NewRecorder() + routers.Client.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("failed to login: %s", rec.Body.String()) + } + accessTokens[u] = userDevice{ + accessToken: gjson.GetBytes(rec.Body.Bytes(), "access_token").String(), + deviceID: gjson.GetBytes(rec.Body.Bytes(), "device_id").String(), + password: password, + } + } +} diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index e8b9211c4..b7fc1f698 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -71,13 +71,14 @@ func NewOutputRoomEventConsumer( ctx: process.Context(), cfg: cfg, jetstream: js, - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputAppserviceEvent), rsAPI: rsAPI, } } // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { + durableNames := make([]string, 0, len(s.cfg.Derived.ApplicationServices)) for _, as := range s.cfg.Derived.ApplicationServices { appsvc := as state := &appserviceState{ @@ -95,6 +96,15 @@ func (s *OutputRoomEventConsumer) Start() error { ); err != nil { return fmt.Errorf("failed to create %q consumer: %w", token, err) } + durableNames = append(durableNames, s.cfg.Matrix.JetStream.Durable("Appservice_"+token)) + } + // Cleanup any consumers still existing on the OutputRoomEvent stream + // to avoid messages not being deleted + for _, consumerName := range durableNames { + err := s.jetstream.DeleteConsumer(s.cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), consumerName+"Pull") + if err != nil && err != nats.ErrConsumerNotFound { + return err + } } return nil } diff --git a/build/docker/Dockerfile.demo-pinecone b/build/docker/Dockerfile.demo-pinecone index ab50cf318..fae8ca036 100644 --- a/build/docker/Dockerfile.demo-pinecone +++ b/build/docker/Dockerfile.demo-pinecone @@ -1,4 +1,5 @@ -FROM docker.io/golang:1.21-alpine AS base +# Pinned to alpine3.18 until https://github.com/mattn/go-sqlite3/issues/1164 is solved +FROM docker.io/golang:1.21-alpine3.18 AS base # # Needs to be separate from the main Dockerfile for OpenShift, diff --git a/build/docker/Dockerfile.demo-yggdrasil b/build/docker/Dockerfile.demo-yggdrasil index b9e387666..502233ea9 100644 --- a/build/docker/Dockerfile.demo-yggdrasil +++ b/build/docker/Dockerfile.demo-yggdrasil @@ -1,4 +1,5 @@ -FROM docker.io/golang:1.21-alpine AS base +# Pinned to alpine3.18 until https://github.com/mattn/go-sqlite3/issues/1164 is solved +FROM docker.io/golang:1.21-alpine3.18 AS base # # Needs to be separate from the main Dockerfile for OpenShift, diff --git a/go.mod b/go.mod index 7ce1720a8..0d02eac8a 100644 --- a/go.mod +++ b/go.mod @@ -22,12 +22,12 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 - github.com/matrix-org/gomatrixserverlib v0.0.0-20231122130434-2beadf141c98 + github.com/matrix-org/gomatrixserverlib v0.0.0-20231212115925-41497b7563eb github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7 github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/mattn/go-sqlite3 v1.14.17 - github.com/nats-io/nats-server/v2 v2.9.23 - github.com/nats-io/nats.go v1.28.0 + github.com/nats-io/nats-server/v2 v2.10.7 + github.com/nats-io/nats.go v1.31.0 github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/opentracing/opentracing-go v1.2.0 @@ -42,12 +42,12 @@ require ( github.com/uber/jaeger-lib v2.4.1+incompatible github.com/yggdrasil-network/yggdrasil-go v0.4.6 go.uber.org/atomic v1.10.0 - golang.org/x/crypto v0.14.0 + golang.org/x/crypto v0.17.0 golang.org/x/exp v0.0.0-20230809150735-7b3493d9a819 golang.org/x/image v0.10.0 golang.org/x/mobile v0.0.0-20221020085226-b36e6246172e golang.org/x/sync v0.3.0 - golang.org/x/term v0.13.0 + golang.org/x/term v0.15.0 gopkg.in/h2non/bimg.v1 v1.1.9 gopkg.in/yaml.v2 v2.4.0 gotest.tools/v3 v3.4.0 @@ -99,7 +99,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/juju/errors v1.0.0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect - github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/compress v1.17.4 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.17 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect @@ -109,7 +109,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/nats-io/jwt/v2 v2.5.0 // indirect + github.com/nats-io/jwt/v2 v2.5.3 // indirect github.com/nats-io/nkeys v0.4.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/onsi/ginkgo/v2 v2.11.0 // indirect @@ -129,9 +129,9 @@ require ( go.etcd.io/bbolt v1.3.6 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect - golang.org/x/time v0.3.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.12.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/macaroon.v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index 0339fa170..be5804567 100644 --- a/go.sum +++ b/go.sum @@ -221,8 +221,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:C github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -239,8 +239,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20231122130434-2beadf141c98 h1:+GsF24sG9WJccf5k54cZj9tLgwW3UON1ujz5u+8SHxc= -github.com/matrix-org/gomatrixserverlib v0.0.0-20231122130434-2beadf141c98/go.mod h1:M8m7seOroO5ePlgxA7AFZymnG90Cnh94rYQyngSrZkk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20231212115925-41497b7563eb h1:Nn+Fr96oi7bIfdOwX5A2L6A2MZCM+lqwLe4/+3+nYj8= +github.com/matrix-org/gomatrixserverlib v0.0.0-20231212115925-41497b7563eb/go.mod h1:M8m7seOroO5ePlgxA7AFZymnG90Cnh94rYQyngSrZkk= github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7 h1:6t8kJr8i1/1I5nNttw6nn1ryQJgzVlBmSGgPiiaTdw4= github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7/go.mod h1:ReWMS/LoVnOiRAdq9sNUC2NZnd1mZkMNB52QhpTRWjg= github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 h1:6z4KxomXSIGWqhHcfzExgkH3Z3UkIXry4ibJS4Aqz2Y= @@ -273,12 +273,12 @@ github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7P github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/nats-io/jwt/v2 v2.5.0 h1:WQQ40AAlqqfx+f6ku+i0pOVm+ASirD4fUh+oQsiE9Ak= -github.com/nats-io/jwt/v2 v2.5.0/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.9.23 h1:6Wj6H6QpP9FMlpCyWUaNu2yeZ/qGj+mdRkZ1wbikExU= -github.com/nats-io/nats-server/v2 v2.9.23/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0= -github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= -github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= +github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= +github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y= +github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -391,8 +391,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -461,13 +461,13 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -475,11 +475,11 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/helm/dendrite/Chart.yaml b/helm/dendrite/Chart.yaml index 32f479960..f36f457c5 100644 --- a/helm/dendrite/Chart.yaml +++ b/helm/dendrite/Chart.yaml @@ -1,7 +1,7 @@ apiVersion: v2 name: dendrite -version: "0.13.5" -appVersion: "0.13.4" +version: "0.13.6" +appVersion: "0.13.5" description: Dendrite Matrix Homeserver type: application keywords: diff --git a/helm/dendrite/README.md b/helm/dendrite/README.md index 22daa1813..f5f824927 100644 --- a/helm/dendrite/README.md +++ b/helm/dendrite/README.md @@ -1,7 +1,7 @@ # dendrite -![Version: 0.13.5](https://img.shields.io/badge/Version-0.13.5-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 0.13.4](https://img.shields.io/badge/AppVersion-0.13.4-informational?style=flat-square) +![Version: 0.13.6](https://img.shields.io/badge/Version-0.13.6-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 0.13.5](https://img.shields.io/badge/AppVersion-0.13.5-informational?style=flat-square) Dendrite Matrix Homeserver Status: **NOT PRODUCTION READY** diff --git a/internal/version.go b/internal/version.go index 55726ddcb..3fc52e376 100644 --- a/internal/version.go +++ b/internal/version.go @@ -18,7 +18,7 @@ var build string const ( VersionMajor = 0 VersionMinor = 13 - VersionPatch = 4 + VersionPatch = 5 VersionTag = "" // example: "rc1" gitRevLen = 7 // 7 matches the displayed characters on github.com diff --git a/mediaapi/routing/download.go b/mediaapi/routing/download.go index 51afa1f9f..fa1c417aa 100644 --- a/mediaapi/routing/download.go +++ b/mediaapi/routing/download.go @@ -63,6 +63,40 @@ type downloadRequest struct { DownloadFilename string } +// Taken from: https://github.com/matrix-org/synapse/blob/c3627d0f99ed5a23479305dc2bd0e71ca25ce2b1/synapse/media/_base.py#L53C1-L84 +// A list of all content types that are "safe" to be rendered inline in a browser. +var allowInlineTypes = map[types.ContentType]struct{}{ + "text/css": {}, + "text/plain": {}, + "text/csv": {}, + "application/json": {}, + "application/ld+json": {}, + // We allow some media files deemed as safe, which comes from the matrix-react-sdk. + // https://github.com/matrix-org/matrix-react-sdk/blob/a70fcfd0bcf7f8c85986da18001ea11597989a7c/src/utils/blobs.ts#L51 + // SVGs are *intentionally* omitted. + "image/jpeg": {}, + "image/gif": {}, + "image/png": {}, + "image/apng": {}, + "image/webp": {}, + "image/avif": {}, + "video/mp4": {}, + "video/webm": {}, + "video/ogg": {}, + "video/quicktime": {}, + "audio/mp4": {}, + "audio/webm": {}, + "audio/aac": {}, + "audio/mpeg": {}, + "audio/ogg": {}, + "audio/wave": {}, + "audio/wav": {}, + "audio/x-wav": {}, + "audio/x-pn-wav": {}, + "audio/flac": {}, + "audio/x-flac": {}, +} + // Download implements GET /download and GET /thumbnail // Files from this server (i.e. origin == cfg.ServerName) are served directly // Files from remote servers (i.e. origin != cfg.ServerName) are cached locally. @@ -353,7 +387,7 @@ func (r *downloadRequest) addDownloadFilenameToHeaders( } if len(filename) == 0 { - w.Header().Set("Content-Disposition", "attachment") + w.Header().Set("Content-Disposition", contentDispositionFor("")) return nil } @@ -383,20 +417,21 @@ func (r *downloadRequest) addDownloadFilenameToHeaders( unescaped = strings.ReplaceAll(unescaped, `\`, `\\"`) unescaped = strings.ReplaceAll(unescaped, `"`, `\"`) + disposition := contentDispositionFor(responseMetadata.ContentType) if isASCII { // For ASCII filenames, we should only quote the filename if // it needs to be done, e.g. it contains a space or a character // that would otherwise be parsed as a control character in the // Content-Disposition header w.Header().Set("Content-Disposition", fmt.Sprintf( - `attachment; filename=%s%s%s`, - quote, unescaped, quote, + `%s; filename=%s%s%s`, + disposition, quote, unescaped, quote, )) } else { // For UTF-8 filenames, we quote always, as that's the standard w.Header().Set("Content-Disposition", fmt.Sprintf( - `attachment; filename*=utf-8''%s`, - url.QueryEscape(unescaped), + `%s; filename*=utf-8''%s`, + disposition, url.QueryEscape(unescaped), )) } @@ -808,3 +843,12 @@ func (r *downloadRequest) fetchRemoteFile( return types.Path(finalPath), duplicate, nil } + +// contentDispositionFor returns the Content-Disposition for a given +// content type. +func contentDispositionFor(contentType types.ContentType) string { + if _, ok := allowInlineTypes[contentType]; ok { + return "inline" + } + return "attachment" +} diff --git a/mediaapi/routing/download_test.go b/mediaapi/routing/download_test.go new file mode 100644 index 000000000..21f6bfc2c --- /dev/null +++ b/mediaapi/routing/download_test.go @@ -0,0 +1,13 @@ +package routing + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_dispositionFor(t *testing.T) { + assert.Equal(t, "attachment", contentDispositionFor(""), "empty content type") + assert.Equal(t, "attachment", contentDispositionFor("image/svg"), "image/svg") + assert.Equal(t, "inline", contentDispositionFor("image/jpeg"), "image/jpg") +} diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 404751532..20d2cfc7a 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -119,9 +119,14 @@ func (r *Inputer) startWorkerForRoom(roomID string) { w.Lock() defer w.Unlock() if !loaded || w.subscription == nil { + streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent) consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID)) + logger := logrus.WithFields(logrus.Fields{ + "stream_name": streamName, + "consumer": consumer, + }) // Create the consumer. We do this as a specific step rather than // letting PullSubscribe create it for us because we need the consumer // to outlive the subscription. If we do it this way, we can Bind in the @@ -135,21 +140,62 @@ func (r *Inputer) startWorkerForRoom(roomID string) { // before it. This is necessary because otherwise our consumer will never // acknowledge things we filtered out for other subjects and therefore they // will linger around forever. - if _, err := w.r.JetStream.AddConsumer( - r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), - &nats.ConsumerConfig{ - Durable: consumer, - AckPolicy: nats.AckAllPolicy, - DeliverPolicy: nats.DeliverAllPolicy, - FilterSubject: subject, - AckWait: MaximumMissingProcessingTime + (time.Second * 10), - InactiveThreshold: inactiveThreshold, - }, - ); err != nil { - logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) + + info, err := w.r.JetStream.ConsumerInfo(streamName, consumer) + if err != nil && !errors.Is(err, nats.ErrConsumerNotFound) { + // log and return, we will retry anyway + logger.WithError(err).Errorf("failed to get consumer info") return } + consumerConfig := &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: MaximumMissingProcessingTime + (time.Second * 10), + InactiveThreshold: inactiveThreshold, + } + + // The consumer already exists, try to update if necessary. + if info != nil { + // Not using reflect.DeepEqual here, since consumerConfig does not explicitly set + // e.g. the consumerName, which is added by NATS later. So this would result + // in constantly updating/recreating the consumer. + switch { + case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds(): + // Initially we had a AckWait of 2m 10s, now we have 5m 10s, so we need to update + // existing consumers. + fallthrough + case info.Config.AckPolicy != consumerConfig.AckPolicy: + // We've changed the AckPolicy from AckAll to AckExplicit, this needs a + // recreation of the consumer. (Note: Only a few changes actually need a recreat) + logger.Warn("Consumer already exists, trying to update it.") + // Try updating the consumer first + if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil { + // We failed to update the consumer, recreate it + logger.WithError(err).Warn("Unable to update consumer, recreating...") + if err = w.r.JetStream.DeleteConsumer(streamName, consumer); err != nil { + logger.WithError(err).Fatal("Unable to delete consumer") + return + } + // Set info to nil, so it can be recreated with the correct config. + info = nil + } + } + } + + if info == nil { + // Create the consumer with the correct config + if _, err = w.r.JetStream.AddConsumer( + r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), + consumerConfig, + ); err != nil { + logger.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) + return + } + } + // Bind to our durable consumer. We want to receive all messages waiting // for this subject and we want to manually acknowledge them, so that we // can ensure they are only cleaned up when we are done processing them. @@ -162,7 +208,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) { nats.InactiveThreshold(inactiveThreshold), ) if err != nil { - logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) + logger.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) return } @@ -263,21 +309,23 @@ func (w *worker) _next() { return } + // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + // Try to unmarshal the input room event. If the JSON unmarshalling // fails then we'll terminate the message — this notifies NATS that // we are done with the message and never want to see it again. msg := msgs[0] var inputRoomEvent api.InputRoomEvent if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { - _ = msg.Term() + // using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS + _ = msg.Term(nats.AckWait(time.Second * 5)) return } if scope := sentry.CurrentHub().Scope(); scope != nil { scope.SetTag("event_id", inputRoomEvent.Event.EventID()) } - roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc() - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() // Process the room event. If something goes wrong then we'll tell // NATS to terminate the message. We'll store the error result as @@ -307,10 +355,15 @@ func (w *worker) _next() { "type": inputRoomEvent.Event.Type(), }).Warn("Roomserver failed to process event") } - _ = msg.Term() + // Even though we failed to process this message (e.g. due to Dendrite restarting and receiving a context canceled), + // the message may already have been queued for redelivery or will be, so this makes sure that we still reprocess the msg + // after restarting. We only Ack if the context was not yet canceled. + if w.r.ProcessContext.Context().Err() == nil { + _ = msg.AckSync() + } errString = err.Error() } else { - _ = msg.Ack() + _ = msg.AckSync() } // If it was a synchronous input request then the "sync" field @@ -381,6 +434,9 @@ func (r *Inputer) queueInputRoomEvents( }).Error("Roomserver failed to queue async event") return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err) } + + // Now that the event is queued, increment the room backpressure + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } return } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 520f82a80..1d9208434 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -48,8 +48,10 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" ) -// TODO: Does this value make sense? -const MaximumMissingProcessingTime = time.Minute * 2 +// MaximumMissingProcessingTime is the maximum time we allow "processRoomEvent" to fetch +// e.g. missing auth/prev events. This duration is used for AckWait, and if it is exceeded +// NATS queues the event for redelivery. +const MaximumMissingProcessingTime = time.Minute * 5 var processRoomEventDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index e9cd926d7..88e335711 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -12,7 +12,9 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/tidwall/gjson" @@ -1231,3 +1233,54 @@ func TestNewServerACLs(t *testing.T) { assert.Equal(t, false, banned) }) } + +// Validate that changing the AckPolicy/AckWait of room consumers +// results in their recreation +func TestRoomConsumerRecreation(t *testing.T) { + + alice := test.NewUser(t) + room := test.NewRoom(t, alice) + + // As this is DB unrelated, just use SQLite + cfg, processCtx, closeDB := testrig.CreateConfig(t, test.DBTypeSQLite) + defer closeDB() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + natsInstance := &jetstream.NATSInstance{} + + // Prepare a stream and consumer using the old configuration + jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + + streamName := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent) + consumer := cfg.Global.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(room.ID)) + subject := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEventSubj(room.ID)) + + consumerConfig := &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckAllPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: (time.Minute * 2) + (time.Second * 10), + InactiveThreshold: time.Hour * 24, + } + + // Create the consumer with the old config + _, err := jsCtx.AddConsumer(streamName, consumerConfig) + assert.NoError(t, err) + + caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) + // start JetStream listeners + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) + + // let the RS create the events, this also recreates the Consumers + err = api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false) + assert.NoError(t, err) + + // Validate that AckPolicy and AckWait has changed + info, err := jsCtx.ConsumerInfo(streamName, consumer) + assert.NoError(t, err) + assert.Equal(t, nats.AckExplicitPolicy, info.Config.AckPolicy) + + wantAckWait := input.MaximumMissingProcessingTime + (time.Second * 10) + assert.Equal(t, wantAckWait, info.Config.AckWait) +} diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 741407926..1dc9f4cec 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -20,6 +20,7 @@ var ( InputDeviceListUpdate = "InputDeviceListUpdate" InputSigningKeyUpdate = "InputSigningKeyUpdate" OutputRoomEvent = "OutputRoomEvent" + OutputAppserviceEvent = "OutputAppserviceEvent" OutputSendToDeviceEvent = "OutputSendToDeviceEvent" OutputKeyChangeEvent = "OutputKeyChangeEvent" OutputTypingEvent = "OutputTypingEvent" @@ -65,6 +66,11 @@ var streams = []*nats.StreamConfig{ Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, + { + Name: OutputAppserviceEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, { Name: OutputSendToDeviceEvent, Retention: nats.InterestPolicy, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 666f900d7..81c532f19 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/synctypes" @@ -55,6 +56,7 @@ type OutputRoomEventConsumer struct { inviteStream streams.StreamProvider notifier *notifier.Notifier fts fulltext.Indexer + asProducer *producers.AppserviceEventProducer } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -68,6 +70,7 @@ func NewOutputRoomEventConsumer( inviteStream streams.StreamProvider, rsAPI api.SyncRoomserverAPI, fts *fulltext.Search, + asProducer *producers.AppserviceEventProducer, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ ctx: process.Context(), @@ -81,6 +84,7 @@ func NewOutputRoomEventConsumer( inviteStream: inviteStream, rsAPI: rsAPI, fts: fts, + asProducer: asProducer, } } @@ -119,6 +123,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms } } err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent) + if err == nil && s.asProducer != nil { + if err = s.asProducer.ProduceRoomEvents(msg); err != nil { + log.WithError(err).Warn("failed to produce OutputAppserviceEvent") + } + } case api.OutputTypeOldRoomEvent: err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent) case api.OutputTypeNewInviteEvent: diff --git a/syncapi/producers/appservices.go b/syncapi/producers/appservices.go new file mode 100644 index 000000000..bb0dbb9cf --- /dev/null +++ b/syncapi/producers/appservices.go @@ -0,0 +1,33 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "github.com/nats-io/nats.go" +) + +// AppserviceEventProducer produces events for the appservice API to consume +type AppserviceEventProducer struct { + Topic string + JetStream nats.JetStreamContext +} + +func (a *AppserviceEventProducer) ProduceRoomEvents( + msg *nats.Msg, +) error { + msg.Subject = a.Topic + _, err := a.JetStream.PublishMsg(msg) + return err +} diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 091e3db41..0418ffc05 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -100,9 +100,16 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start key change consumer") } + var asProducer *producers.AppserviceEventProducer + if len(dendriteCfg.AppServiceAPI.Derived.ApplicationServices) > 0 { + asProducer = &producers.AppserviceEventProducer{ + JetStream: js, Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputAppserviceEvent), + } + } + roomConsumer := consumers.NewOutputRoomEventConsumer( processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider, - streams.InviteStreamProvider, rsAPI, fts, + streams.InviteStreamProvider, rsAPI, fts, asProducer, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer")