diff --git a/CHANGES.md b/CHANGES.md index 57e3a3d4f..97ec7bec4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,29 @@ # Changelog +## Dendrite 0.13.5 (2023-12-12) + +Upgrading to this version is **highly** recommended, as it fixes several long-standing bugs in +our CanonicalJSON implementation. + +### Fixes + +- Convert unicode escapes to lowercase (gomatrixserverlib) +- Fix canonical json utf-16 surrogate pair detection logic (gomatrixserverlib) +- Handle negative zero and exponential numbers in Canonical JSON verification (gomatrixserverlib) +- Avoid logging unnecessary messages when unable to fetch server keys if multiple fetchers are used (gomatrixserverlib) +- Issues around the device list updater have been fixed, which should ensure that there are always + workers available to process incoming device list updates. +- A panic in the `/hierarchy` endpoints used for spaces has been fixed (client-server and server-server API) +- Fixes around the way we handle database transactions (including a potential connection leak) +- ACLs are now updated when received as outliers +- A race condition, which could lead to bridges instantly leaving a room after joining it, between the SyncAPI and + Appservices has been fixed + +### Features + +- **Appservice login is now supported!** +- Users can now kick themselves (used by some bridges) + ## Dendrite 0.13.4 (2023-10-25) Upgrading to this version is **highly** recommended, as it fixes a long-standing bug in the state resolution diff --git a/Dockerfile b/Dockerfile index 8c8f1588f..523857ccc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,8 @@ # # base installs required dependencies and runs go mod download to cache dependencies # -FROM --platform=${BUILDPLATFORM} docker.io/golang:1.21-alpine AS base +# Pinned to alpine3.18 until https://github.com/mattn/go-sqlite3/issues/1164 is solved +FROM --platform=${BUILDPLATFORM} docker.io/golang:1.21-alpine3.18 AS base RUN apk --update --no-cache add bash build-base curl git # diff --git a/appservice/appservice_test.go b/appservice/appservice_test.go index eca63371d..aa2af9f24 100644 --- a/appservice/appservice_test.go +++ b/appservice/appservice_test.go @@ -14,8 +14,18 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/federationapi/statistics" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/dendrite/syncapi" + uapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" + "github.com/tidwall/gjson" "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice/api" @@ -407,3 +417,190 @@ func TestRoomserverConsumerOneInvite(t *testing.T) { close(evChan) }) } + +// Note: If this test panics, it is because we timed out waiting for the +// join event to come through to the appservice and we close the DB/shutdown Dendrite. This makes the +// syncAPI unhappy, as it is unable to write to the database. +func TestOutputAppserviceEvent(t *testing.T) { + alice := test.NewUser(t) + bob := test.NewUser(t) + + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + cfg, processCtx, closeDB := testrig.CreateConfig(t, dbType) + defer closeDB() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + natsInstance := &jetstream.NATSInstance{} + + evChan := make(chan struct{}) + + caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) + // Create required internal APIs + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) + + // Create the router, so we can hit `/joined_members` + routers := httputil.NewRouters() + + accessTokens := map[*test.User]userDevice{ + bob: {}, + } + + usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) + clientapi.AddPublicRoutes(processCtx, routers, cfg, natsInstance, nil, rsAPI, nil, nil, nil, usrAPI, nil, nil, caching.DisableMetrics) + createAccessTokens(t, accessTokens, usrAPI, processCtx.Context(), routers) + + room := test.NewRoom(t, alice) + + // Invite Bob + room.CreateAndInsert(t, alice, spec.MRoomMember, map[string]interface{}{ + "membership": "invite", + }, test.WithStateKey(bob.ID)) + + // create a dummy AS url, handling the events + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var txn consumers.ApplicationServiceTransaction + err := json.NewDecoder(r.Body).Decode(&txn) + if err != nil { + t.Fatal(err) + } + for _, ev := range txn.Events { + if ev.Type != spec.MRoomMember { + continue + } + if ev.StateKey != nil && *ev.StateKey == bob.ID { + membership := gjson.GetBytes(ev.Content, "membership").Str + t.Logf("Processing membership: %s", membership) + switch membership { + case spec.Invite: + // Accept the invite + joinEv := room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{ + "membership": "join", + }, test.WithStateKey(bob.ID)) + + if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, []*types.HeaderedEvent{joinEv}, "test", "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + case spec.Join: // the AS has received the join event, now hit `/joined_members` to validate that + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/_matrix/client/v3/rooms/"+room.ID+"/joined_members", nil) + req.Header.Set("Authorization", "Bearer "+accessTokens[bob].accessToken) + routers.Client.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected HTTP 200, got %d: %s", rec.Code, rec.Body.String()) + } + + // Both Alice and Bob should be joined. If not, we have a race condition + if !gjson.GetBytes(rec.Body.Bytes(), "joined."+alice.ID).Exists() { + t.Errorf("Alice is not joined to the room") // in theory should not happen + } + if !gjson.GetBytes(rec.Body.Bytes(), "joined."+bob.ID).Exists() { + t.Errorf("Bob is not joined to the room") + } + evChan <- struct{}{} + default: + t.Fatalf("Unexpected membership: %s", membership) + } + } + } + })) + defer srv.Close() + + as := &config.ApplicationService{ + ID: "someID", + URL: srv.URL, + ASToken: "", + HSToken: "", + SenderLocalpart: "senderLocalPart", + NamespaceMap: map[string][]config.ApplicationServiceNamespace{ + "users": {{RegexpObject: regexp.MustCompile(bob.ID)}}, + "aliases": {{RegexpObject: regexp.MustCompile(room.ID)}}, + }, + } + as.CreateHTTPClient(cfg.AppServiceAPI.DisableTLSValidation) + + // Create a dummy application service + cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{*as} + + // Prepare AS Streams on the old topic to validate that they get deleted + jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + + token := jetstream.Tokenise(as.ID) + if err := jetstream.JetStreamConsumer( + processCtx.Context(), jsCtx, cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), + cfg.Global.JetStream.Durable("Appservice_"+token), + 50, // maximum number of events to send in a single transaction + func(ctx context.Context, msgs []*nats.Msg) bool { + return true + }, + ); err != nil { + t.Fatal(err) + } + + // Start the syncAPI to have `/joined_members` available + syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, natsInstance, usrAPI, rsAPI, caches, caching.DisableMetrics) + + // start the consumer + appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI) + + // At this point, the old JetStream consumers should be deleted + for consumer := range jsCtx.Consumers(cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) { + if consumer.Name == cfg.Global.JetStream.Durable("Appservice_"+token)+"Pull" { + t.Fatalf("Consumer still exists") + } + } + + // Create the room, this triggers the AS to receive an invite for Bob. + if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + + select { + // Pretty generous timeout duration... + case <-time.After(time.Millisecond * 1000): // wait for the AS to process the events + t.Errorf("Timed out waiting for join event") + case <-evChan: + } + close(evChan) + }) +} + +type userDevice struct { + accessToken string + deviceID string + password string +} + +func createAccessTokens(t *testing.T, accessTokens map[*test.User]userDevice, userAPI uapi.UserInternalAPI, ctx context.Context, routers httputil.Routers) { + t.Helper() + for u := range accessTokens { + localpart, serverName, _ := gomatrixserverlib.SplitID('@', u.ID) + userRes := &uapi.PerformAccountCreationResponse{} + password := util.RandomString(8) + if err := userAPI.PerformAccountCreation(ctx, &uapi.PerformAccountCreationRequest{ + AccountType: u.AccountType, + Localpart: localpart, + ServerName: serverName, + Password: password, + }, userRes); err != nil { + t.Errorf("failed to create account: %s", err) + } + req := test.NewRequest(t, http.MethodPost, "/_matrix/client/v3/login", test.WithJSONBody(t, map[string]interface{}{ + "type": authtypes.LoginTypePassword, + "identifier": map[string]interface{}{ + "type": "m.id.user", + "user": u.ID, + }, + "password": password, + })) + rec := httptest.NewRecorder() + routers.Client.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("failed to login: %s", rec.Body.String()) + } + accessTokens[u] = userDevice{ + accessToken: gjson.GetBytes(rec.Body.Bytes(), "access_token").String(), + deviceID: gjson.GetBytes(rec.Body.Bytes(), "device_id").String(), + password: password, + } + } +} diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index e8b9211c4..b7fc1f698 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -71,13 +71,14 @@ func NewOutputRoomEventConsumer( ctx: process.Context(), cfg: cfg, jetstream: js, - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputAppserviceEvent), rsAPI: rsAPI, } } // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { + durableNames := make([]string, 0, len(s.cfg.Derived.ApplicationServices)) for _, as := range s.cfg.Derived.ApplicationServices { appsvc := as state := &appserviceState{ @@ -95,6 +96,15 @@ func (s *OutputRoomEventConsumer) Start() error { ); err != nil { return fmt.Errorf("failed to create %q consumer: %w", token, err) } + durableNames = append(durableNames, s.cfg.Matrix.JetStream.Durable("Appservice_"+token)) + } + // Cleanup any consumers still existing on the OutputRoomEvent stream + // to avoid messages not being deleted + for _, consumerName := range durableNames { + err := s.jetstream.DeleteConsumer(s.cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), consumerName+"Pull") + if err != nil && err != nats.ErrConsumerNotFound { + return err + } } return nil } diff --git a/build/docker/Dockerfile.demo-pinecone b/build/docker/Dockerfile.demo-pinecone index ab50cf318..fae8ca036 100644 --- a/build/docker/Dockerfile.demo-pinecone +++ b/build/docker/Dockerfile.demo-pinecone @@ -1,4 +1,5 @@ -FROM docker.io/golang:1.21-alpine AS base +# Pinned to alpine3.18 until https://github.com/mattn/go-sqlite3/issues/1164 is solved +FROM docker.io/golang:1.21-alpine3.18 AS base # # Needs to be separate from the main Dockerfile for OpenShift, diff --git a/build/docker/Dockerfile.demo-yggdrasil b/build/docker/Dockerfile.demo-yggdrasil index b9e387666..502233ea9 100644 --- a/build/docker/Dockerfile.demo-yggdrasil +++ b/build/docker/Dockerfile.demo-yggdrasil @@ -1,4 +1,5 @@ -FROM docker.io/golang:1.21-alpine AS base +# Pinned to alpine3.18 until https://github.com/mattn/go-sqlite3/issues/1164 is solved +FROM docker.io/golang:1.21-alpine3.18 AS base # # Needs to be separate from the main Dockerfile for OpenShift, diff --git a/go.mod b/go.mod index 387e99d8c..91d8cd3b4 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 - github.com/matrix-org/gomatrixserverlib v0.0.0-20231122130434-2beadf141c98 + github.com/matrix-org/gomatrixserverlib v0.0.0-20231212115925-41497b7563eb github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7 github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/mattn/go-sqlite3 v1.14.17 diff --git a/go.sum b/go.sum index 0dbd3ac48..fba25076d 100644 --- a/go.sum +++ b/go.sum @@ -208,8 +208,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20231122130434-2beadf141c98 h1:+GsF24sG9WJccf5k54cZj9tLgwW3UON1ujz5u+8SHxc= -github.com/matrix-org/gomatrixserverlib v0.0.0-20231122130434-2beadf141c98/go.mod h1:M8m7seOroO5ePlgxA7AFZymnG90Cnh94rYQyngSrZkk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20231212115925-41497b7563eb h1:Nn+Fr96oi7bIfdOwX5A2L6A2MZCM+lqwLe4/+3+nYj8= +github.com/matrix-org/gomatrixserverlib v0.0.0-20231212115925-41497b7563eb/go.mod h1:M8m7seOroO5ePlgxA7AFZymnG90Cnh94rYQyngSrZkk= github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7 h1:6t8kJr8i1/1I5nNttw6nn1ryQJgzVlBmSGgPiiaTdw4= github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7/go.mod h1:ReWMS/LoVnOiRAdq9sNUC2NZnd1mZkMNB52QhpTRWjg= github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 h1:6z4KxomXSIGWqhHcfzExgkH3Z3UkIXry4ibJS4Aqz2Y= diff --git a/helm/dendrite/Chart.yaml b/helm/dendrite/Chart.yaml index 32f479960..f36f457c5 100644 --- a/helm/dendrite/Chart.yaml +++ b/helm/dendrite/Chart.yaml @@ -1,7 +1,7 @@ apiVersion: v2 name: dendrite -version: "0.13.5" -appVersion: "0.13.4" +version: "0.13.6" +appVersion: "0.13.5" description: Dendrite Matrix Homeserver type: application keywords: diff --git a/helm/dendrite/README.md b/helm/dendrite/README.md index 22daa1813..f5f824927 100644 --- a/helm/dendrite/README.md +++ b/helm/dendrite/README.md @@ -1,7 +1,7 @@ # dendrite -![Version: 0.13.5](https://img.shields.io/badge/Version-0.13.5-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 0.13.4](https://img.shields.io/badge/AppVersion-0.13.4-informational?style=flat-square) +![Version: 0.13.6](https://img.shields.io/badge/Version-0.13.6-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 0.13.5](https://img.shields.io/badge/AppVersion-0.13.5-informational?style=flat-square) Dendrite Matrix Homeserver Status: **NOT PRODUCTION READY** diff --git a/internal/version.go b/internal/version.go index 55726ddcb..3fc52e376 100644 --- a/internal/version.go +++ b/internal/version.go @@ -18,7 +18,7 @@ var build string const ( VersionMajor = 0 VersionMinor = 13 - VersionPatch = 4 + VersionPatch = 5 VersionTag = "" // example: "rc1" gitRevLen = 7 // 7 matches the displayed characters on github.com diff --git a/mediaapi/routing/download.go b/mediaapi/routing/download.go index 51afa1f9f..fa1c417aa 100644 --- a/mediaapi/routing/download.go +++ b/mediaapi/routing/download.go @@ -63,6 +63,40 @@ type downloadRequest struct { DownloadFilename string } +// Taken from: https://github.com/matrix-org/synapse/blob/c3627d0f99ed5a23479305dc2bd0e71ca25ce2b1/synapse/media/_base.py#L53C1-L84 +// A list of all content types that are "safe" to be rendered inline in a browser. +var allowInlineTypes = map[types.ContentType]struct{}{ + "text/css": {}, + "text/plain": {}, + "text/csv": {}, + "application/json": {}, + "application/ld+json": {}, + // We allow some media files deemed as safe, which comes from the matrix-react-sdk. + // https://github.com/matrix-org/matrix-react-sdk/blob/a70fcfd0bcf7f8c85986da18001ea11597989a7c/src/utils/blobs.ts#L51 + // SVGs are *intentionally* omitted. + "image/jpeg": {}, + "image/gif": {}, + "image/png": {}, + "image/apng": {}, + "image/webp": {}, + "image/avif": {}, + "video/mp4": {}, + "video/webm": {}, + "video/ogg": {}, + "video/quicktime": {}, + "audio/mp4": {}, + "audio/webm": {}, + "audio/aac": {}, + "audio/mpeg": {}, + "audio/ogg": {}, + "audio/wave": {}, + "audio/wav": {}, + "audio/x-wav": {}, + "audio/x-pn-wav": {}, + "audio/flac": {}, + "audio/x-flac": {}, +} + // Download implements GET /download and GET /thumbnail // Files from this server (i.e. origin == cfg.ServerName) are served directly // Files from remote servers (i.e. origin != cfg.ServerName) are cached locally. @@ -353,7 +387,7 @@ func (r *downloadRequest) addDownloadFilenameToHeaders( } if len(filename) == 0 { - w.Header().Set("Content-Disposition", "attachment") + w.Header().Set("Content-Disposition", contentDispositionFor("")) return nil } @@ -383,20 +417,21 @@ func (r *downloadRequest) addDownloadFilenameToHeaders( unescaped = strings.ReplaceAll(unescaped, `\`, `\\"`) unescaped = strings.ReplaceAll(unescaped, `"`, `\"`) + disposition := contentDispositionFor(responseMetadata.ContentType) if isASCII { // For ASCII filenames, we should only quote the filename if // it needs to be done, e.g. it contains a space or a character // that would otherwise be parsed as a control character in the // Content-Disposition header w.Header().Set("Content-Disposition", fmt.Sprintf( - `attachment; filename=%s%s%s`, - quote, unescaped, quote, + `%s; filename=%s%s%s`, + disposition, quote, unescaped, quote, )) } else { // For UTF-8 filenames, we quote always, as that's the standard w.Header().Set("Content-Disposition", fmt.Sprintf( - `attachment; filename*=utf-8''%s`, - url.QueryEscape(unescaped), + `%s; filename*=utf-8''%s`, + disposition, url.QueryEscape(unescaped), )) } @@ -808,3 +843,12 @@ func (r *downloadRequest) fetchRemoteFile( return types.Path(finalPath), duplicate, nil } + +// contentDispositionFor returns the Content-Disposition for a given +// content type. +func contentDispositionFor(contentType types.ContentType) string { + if _, ok := allowInlineTypes[contentType]; ok { + return "inline" + } + return "attachment" +} diff --git a/mediaapi/routing/download_test.go b/mediaapi/routing/download_test.go new file mode 100644 index 000000000..21f6bfc2c --- /dev/null +++ b/mediaapi/routing/download_test.go @@ -0,0 +1,13 @@ +package routing + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_dispositionFor(t *testing.T) { + assert.Equal(t, "attachment", contentDispositionFor(""), "empty content type") + assert.Equal(t, "attachment", contentDispositionFor("image/svg"), "image/svg") + assert.Equal(t, "inline", contentDispositionFor("image/jpeg"), "image/jpg") +} diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 741407926..1dc9f4cec 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -20,6 +20,7 @@ var ( InputDeviceListUpdate = "InputDeviceListUpdate" InputSigningKeyUpdate = "InputSigningKeyUpdate" OutputRoomEvent = "OutputRoomEvent" + OutputAppserviceEvent = "OutputAppserviceEvent" OutputSendToDeviceEvent = "OutputSendToDeviceEvent" OutputKeyChangeEvent = "OutputKeyChangeEvent" OutputTypingEvent = "OutputTypingEvent" @@ -65,6 +66,11 @@ var streams = []*nats.StreamConfig{ Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, + { + Name: OutputAppserviceEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, { Name: OutputSendToDeviceEvent, Retention: nats.InterestPolicy, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 666f900d7..81c532f19 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/synctypes" @@ -55,6 +56,7 @@ type OutputRoomEventConsumer struct { inviteStream streams.StreamProvider notifier *notifier.Notifier fts fulltext.Indexer + asProducer *producers.AppserviceEventProducer } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -68,6 +70,7 @@ func NewOutputRoomEventConsumer( inviteStream streams.StreamProvider, rsAPI api.SyncRoomserverAPI, fts *fulltext.Search, + asProducer *producers.AppserviceEventProducer, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ ctx: process.Context(), @@ -81,6 +84,7 @@ func NewOutputRoomEventConsumer( inviteStream: inviteStream, rsAPI: rsAPI, fts: fts, + asProducer: asProducer, } } @@ -119,6 +123,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms } } err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent) + if err == nil && s.asProducer != nil { + if err = s.asProducer.ProduceRoomEvents(msg); err != nil { + log.WithError(err).Warn("failed to produce OutputAppserviceEvent") + } + } case api.OutputTypeOldRoomEvent: err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent) case api.OutputTypeNewInviteEvent: diff --git a/syncapi/producers/appservices.go b/syncapi/producers/appservices.go new file mode 100644 index 000000000..bb0dbb9cf --- /dev/null +++ b/syncapi/producers/appservices.go @@ -0,0 +1,33 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "github.com/nats-io/nats.go" +) + +// AppserviceEventProducer produces events for the appservice API to consume +type AppserviceEventProducer struct { + Topic string + JetStream nats.JetStreamContext +} + +func (a *AppserviceEventProducer) ProduceRoomEvents( + msg *nats.Msg, +) error { + msg.Subject = a.Topic + _, err := a.JetStream.PublishMsg(msg) + return err +} diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 091e3db41..0418ffc05 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -100,9 +100,16 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start key change consumer") } + var asProducer *producers.AppserviceEventProducer + if len(dendriteCfg.AppServiceAPI.Derived.ApplicationServices) > 0 { + asProducer = &producers.AppserviceEventProducer{ + JetStream: js, Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputAppserviceEvent), + } + } + roomConsumer := consumers.NewOutputRoomEventConsumer( processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider, - streams.InviteStreamProvider, rsAPI, fts, + streams.InviteStreamProvider, rsAPI, fts, asProducer, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer")