From 185ad6b00de8079fb6a638ed63e392dfe9f2b49f Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Tue, 12 Dec 2023 11:15:50 +0100 Subject: [PATCH 1/6] Allow some content types to be inlined (#3274) "Shamelessly" stolen from https://github.com/matrix-org/synapse/pull/15988 --- mediaapi/routing/download.go | 54 ++++++++++++++++++++++++++++--- mediaapi/routing/download_test.go | 13 ++++++++ 2 files changed, 62 insertions(+), 5 deletions(-) create mode 100644 mediaapi/routing/download_test.go diff --git a/mediaapi/routing/download.go b/mediaapi/routing/download.go index 51afa1f9f..fa1c417aa 100644 --- a/mediaapi/routing/download.go +++ b/mediaapi/routing/download.go @@ -63,6 +63,40 @@ type downloadRequest struct { DownloadFilename string } +// Taken from: https://github.com/matrix-org/synapse/blob/c3627d0f99ed5a23479305dc2bd0e71ca25ce2b1/synapse/media/_base.py#L53C1-L84 +// A list of all content types that are "safe" to be rendered inline in a browser. +var allowInlineTypes = map[types.ContentType]struct{}{ + "text/css": {}, + "text/plain": {}, + "text/csv": {}, + "application/json": {}, + "application/ld+json": {}, + // We allow some media files deemed as safe, which comes from the matrix-react-sdk. + // https://github.com/matrix-org/matrix-react-sdk/blob/a70fcfd0bcf7f8c85986da18001ea11597989a7c/src/utils/blobs.ts#L51 + // SVGs are *intentionally* omitted. + "image/jpeg": {}, + "image/gif": {}, + "image/png": {}, + "image/apng": {}, + "image/webp": {}, + "image/avif": {}, + "video/mp4": {}, + "video/webm": {}, + "video/ogg": {}, + "video/quicktime": {}, + "audio/mp4": {}, + "audio/webm": {}, + "audio/aac": {}, + "audio/mpeg": {}, + "audio/ogg": {}, + "audio/wave": {}, + "audio/wav": {}, + "audio/x-wav": {}, + "audio/x-pn-wav": {}, + "audio/flac": {}, + "audio/x-flac": {}, +} + // Download implements GET /download and GET /thumbnail // Files from this server (i.e. origin == cfg.ServerName) are served directly // Files from remote servers (i.e. origin != cfg.ServerName) are cached locally. @@ -353,7 +387,7 @@ func (r *downloadRequest) addDownloadFilenameToHeaders( } if len(filename) == 0 { - w.Header().Set("Content-Disposition", "attachment") + w.Header().Set("Content-Disposition", contentDispositionFor("")) return nil } @@ -383,20 +417,21 @@ func (r *downloadRequest) addDownloadFilenameToHeaders( unescaped = strings.ReplaceAll(unescaped, `\`, `\\"`) unescaped = strings.ReplaceAll(unescaped, `"`, `\"`) + disposition := contentDispositionFor(responseMetadata.ContentType) if isASCII { // For ASCII filenames, we should only quote the filename if // it needs to be done, e.g. it contains a space or a character // that would otherwise be parsed as a control character in the // Content-Disposition header w.Header().Set("Content-Disposition", fmt.Sprintf( - `attachment; filename=%s%s%s`, - quote, unescaped, quote, + `%s; filename=%s%s%s`, + disposition, quote, unescaped, quote, )) } else { // For UTF-8 filenames, we quote always, as that's the standard w.Header().Set("Content-Disposition", fmt.Sprintf( - `attachment; filename*=utf-8''%s`, - url.QueryEscape(unescaped), + `%s; filename*=utf-8''%s`, + disposition, url.QueryEscape(unescaped), )) } @@ -808,3 +843,12 @@ func (r *downloadRequest) fetchRemoteFile( return types.Path(finalPath), duplicate, nil } + +// contentDispositionFor returns the Content-Disposition for a given +// content type. +func contentDispositionFor(contentType types.ContentType) string { + if _, ok := allowInlineTypes[contentType]; ok { + return "inline" + } + return "attachment" +} diff --git a/mediaapi/routing/download_test.go b/mediaapi/routing/download_test.go new file mode 100644 index 000000000..21f6bfc2c --- /dev/null +++ b/mediaapi/routing/download_test.go @@ -0,0 +1,13 @@ +package routing + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_dispositionFor(t *testing.T) { + assert.Equal(t, "attachment", contentDispositionFor(""), "empty content type") + assert.Equal(t, "attachment", contentDispositionFor("image/svg"), "image/svg") + assert.Equal(t, "inline", contentDispositionFor("image/jpeg"), "image/jpg") +} From 1555b3542d7da8243abc21e6a29758ebe419e5b5 Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Tue, 12 Dec 2023 12:13:55 +0100 Subject: [PATCH 2/6] Introduce a new stream for the appservice consumer (#3277) This introduces a new stream the syncAPI produces to once it processed a `OutputRoomEvent` and the appservices consumes. This is to work around a race condition where appservices receive an event before the syncAPI has handled it, this can result in e.g. calls to `/joined_members` returning a wrong membership list. --- appservice/appservice_test.go | 197 +++++++++++++++++++++++++++++ appservice/consumers/roomserver.go | 12 +- setup/jetstream/streams.go | 6 + syncapi/consumers/roomserver.go | 9 ++ syncapi/producers/appservices.go | 33 +++++ syncapi/syncapi.go | 9 +- 6 files changed, 264 insertions(+), 2 deletions(-) create mode 100644 syncapi/producers/appservices.go diff --git a/appservice/appservice_test.go b/appservice/appservice_test.go index eca63371d..aa2af9f24 100644 --- a/appservice/appservice_test.go +++ b/appservice/appservice_test.go @@ -14,8 +14,18 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/federationapi/statistics" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/dendrite/syncapi" + uapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" + "github.com/tidwall/gjson" "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice/api" @@ -407,3 +417,190 @@ func TestRoomserverConsumerOneInvite(t *testing.T) { close(evChan) }) } + +// Note: If this test panics, it is because we timed out waiting for the +// join event to come through to the appservice and we close the DB/shutdown Dendrite. This makes the +// syncAPI unhappy, as it is unable to write to the database. +func TestOutputAppserviceEvent(t *testing.T) { + alice := test.NewUser(t) + bob := test.NewUser(t) + + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + cfg, processCtx, closeDB := testrig.CreateConfig(t, dbType) + defer closeDB() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + natsInstance := &jetstream.NATSInstance{} + + evChan := make(chan struct{}) + + caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) + // Create required internal APIs + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) + + // Create the router, so we can hit `/joined_members` + routers := httputil.NewRouters() + + accessTokens := map[*test.User]userDevice{ + bob: {}, + } + + usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) + clientapi.AddPublicRoutes(processCtx, routers, cfg, natsInstance, nil, rsAPI, nil, nil, nil, usrAPI, nil, nil, caching.DisableMetrics) + createAccessTokens(t, accessTokens, usrAPI, processCtx.Context(), routers) + + room := test.NewRoom(t, alice) + + // Invite Bob + room.CreateAndInsert(t, alice, spec.MRoomMember, map[string]interface{}{ + "membership": "invite", + }, test.WithStateKey(bob.ID)) + + // create a dummy AS url, handling the events + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var txn consumers.ApplicationServiceTransaction + err := json.NewDecoder(r.Body).Decode(&txn) + if err != nil { + t.Fatal(err) + } + for _, ev := range txn.Events { + if ev.Type != spec.MRoomMember { + continue + } + if ev.StateKey != nil && *ev.StateKey == bob.ID { + membership := gjson.GetBytes(ev.Content, "membership").Str + t.Logf("Processing membership: %s", membership) + switch membership { + case spec.Invite: + // Accept the invite + joinEv := room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{ + "membership": "join", + }, test.WithStateKey(bob.ID)) + + if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, []*types.HeaderedEvent{joinEv}, "test", "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + case spec.Join: // the AS has received the join event, now hit `/joined_members` to validate that + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/_matrix/client/v3/rooms/"+room.ID+"/joined_members", nil) + req.Header.Set("Authorization", "Bearer "+accessTokens[bob].accessToken) + routers.Client.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected HTTP 200, got %d: %s", rec.Code, rec.Body.String()) + } + + // Both Alice and Bob should be joined. If not, we have a race condition + if !gjson.GetBytes(rec.Body.Bytes(), "joined."+alice.ID).Exists() { + t.Errorf("Alice is not joined to the room") // in theory should not happen + } + if !gjson.GetBytes(rec.Body.Bytes(), "joined."+bob.ID).Exists() { + t.Errorf("Bob is not joined to the room") + } + evChan <- struct{}{} + default: + t.Fatalf("Unexpected membership: %s", membership) + } + } + } + })) + defer srv.Close() + + as := &config.ApplicationService{ + ID: "someID", + URL: srv.URL, + ASToken: "", + HSToken: "", + SenderLocalpart: "senderLocalPart", + NamespaceMap: map[string][]config.ApplicationServiceNamespace{ + "users": {{RegexpObject: regexp.MustCompile(bob.ID)}}, + "aliases": {{RegexpObject: regexp.MustCompile(room.ID)}}, + }, + } + as.CreateHTTPClient(cfg.AppServiceAPI.DisableTLSValidation) + + // Create a dummy application service + cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{*as} + + // Prepare AS Streams on the old topic to validate that they get deleted + jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + + token := jetstream.Tokenise(as.ID) + if err := jetstream.JetStreamConsumer( + processCtx.Context(), jsCtx, cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), + cfg.Global.JetStream.Durable("Appservice_"+token), + 50, // maximum number of events to send in a single transaction + func(ctx context.Context, msgs []*nats.Msg) bool { + return true + }, + ); err != nil { + t.Fatal(err) + } + + // Start the syncAPI to have `/joined_members` available + syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, natsInstance, usrAPI, rsAPI, caches, caching.DisableMetrics) + + // start the consumer + appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI) + + // At this point, the old JetStream consumers should be deleted + for consumer := range jsCtx.Consumers(cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) { + if consumer.Name == cfg.Global.JetStream.Durable("Appservice_"+token)+"Pull" { + t.Fatalf("Consumer still exists") + } + } + + // Create the room, this triggers the AS to receive an invite for Bob. + if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + + select { + // Pretty generous timeout duration... + case <-time.After(time.Millisecond * 1000): // wait for the AS to process the events + t.Errorf("Timed out waiting for join event") + case <-evChan: + } + close(evChan) + }) +} + +type userDevice struct { + accessToken string + deviceID string + password string +} + +func createAccessTokens(t *testing.T, accessTokens map[*test.User]userDevice, userAPI uapi.UserInternalAPI, ctx context.Context, routers httputil.Routers) { + t.Helper() + for u := range accessTokens { + localpart, serverName, _ := gomatrixserverlib.SplitID('@', u.ID) + userRes := &uapi.PerformAccountCreationResponse{} + password := util.RandomString(8) + if err := userAPI.PerformAccountCreation(ctx, &uapi.PerformAccountCreationRequest{ + AccountType: u.AccountType, + Localpart: localpart, + ServerName: serverName, + Password: password, + }, userRes); err != nil { + t.Errorf("failed to create account: %s", err) + } + req := test.NewRequest(t, http.MethodPost, "/_matrix/client/v3/login", test.WithJSONBody(t, map[string]interface{}{ + "type": authtypes.LoginTypePassword, + "identifier": map[string]interface{}{ + "type": "m.id.user", + "user": u.ID, + }, + "password": password, + })) + rec := httptest.NewRecorder() + routers.Client.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("failed to login: %s", rec.Body.String()) + } + accessTokens[u] = userDevice{ + accessToken: gjson.GetBytes(rec.Body.Bytes(), "access_token").String(), + deviceID: gjson.GetBytes(rec.Body.Bytes(), "device_id").String(), + password: password, + } + } +} diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index e8b9211c4..b7fc1f698 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -71,13 +71,14 @@ func NewOutputRoomEventConsumer( ctx: process.Context(), cfg: cfg, jetstream: js, - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputAppserviceEvent), rsAPI: rsAPI, } } // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { + durableNames := make([]string, 0, len(s.cfg.Derived.ApplicationServices)) for _, as := range s.cfg.Derived.ApplicationServices { appsvc := as state := &appserviceState{ @@ -95,6 +96,15 @@ func (s *OutputRoomEventConsumer) Start() error { ); err != nil { return fmt.Errorf("failed to create %q consumer: %w", token, err) } + durableNames = append(durableNames, s.cfg.Matrix.JetStream.Durable("Appservice_"+token)) + } + // Cleanup any consumers still existing on the OutputRoomEvent stream + // to avoid messages not being deleted + for _, consumerName := range durableNames { + err := s.jetstream.DeleteConsumer(s.cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), consumerName+"Pull") + if err != nil && err != nats.ErrConsumerNotFound { + return err + } } return nil } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 741407926..1dc9f4cec 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -20,6 +20,7 @@ var ( InputDeviceListUpdate = "InputDeviceListUpdate" InputSigningKeyUpdate = "InputSigningKeyUpdate" OutputRoomEvent = "OutputRoomEvent" + OutputAppserviceEvent = "OutputAppserviceEvent" OutputSendToDeviceEvent = "OutputSendToDeviceEvent" OutputKeyChangeEvent = "OutputKeyChangeEvent" OutputTypingEvent = "OutputTypingEvent" @@ -65,6 +66,11 @@ var streams = []*nats.StreamConfig{ Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, + { + Name: OutputAppserviceEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, { Name: OutputSendToDeviceEvent, Retention: nats.InterestPolicy, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 666f900d7..81c532f19 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/synctypes" @@ -55,6 +56,7 @@ type OutputRoomEventConsumer struct { inviteStream streams.StreamProvider notifier *notifier.Notifier fts fulltext.Indexer + asProducer *producers.AppserviceEventProducer } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -68,6 +70,7 @@ func NewOutputRoomEventConsumer( inviteStream streams.StreamProvider, rsAPI api.SyncRoomserverAPI, fts *fulltext.Search, + asProducer *producers.AppserviceEventProducer, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ ctx: process.Context(), @@ -81,6 +84,7 @@ func NewOutputRoomEventConsumer( inviteStream: inviteStream, rsAPI: rsAPI, fts: fts, + asProducer: asProducer, } } @@ -119,6 +123,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms } } err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent) + if err == nil && s.asProducer != nil { + if err = s.asProducer.ProduceRoomEvents(msg); err != nil { + log.WithError(err).Warn("failed to produce OutputAppserviceEvent") + } + } case api.OutputTypeOldRoomEvent: err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent) case api.OutputTypeNewInviteEvent: diff --git a/syncapi/producers/appservices.go b/syncapi/producers/appservices.go new file mode 100644 index 000000000..bb0dbb9cf --- /dev/null +++ b/syncapi/producers/appservices.go @@ -0,0 +1,33 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "github.com/nats-io/nats.go" +) + +// AppserviceEventProducer produces events for the appservice API to consume +type AppserviceEventProducer struct { + Topic string + JetStream nats.JetStreamContext +} + +func (a *AppserviceEventProducer) ProduceRoomEvents( + msg *nats.Msg, +) error { + msg.Subject = a.Topic + _, err := a.JetStream.PublishMsg(msg) + return err +} diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 091e3db41..0418ffc05 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -100,9 +100,16 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start key change consumer") } + var asProducer *producers.AppserviceEventProducer + if len(dendriteCfg.AppServiceAPI.Derived.ApplicationServices) > 0 { + asProducer = &producers.AppserviceEventProducer{ + JetStream: js, Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputAppserviceEvent), + } + } + roomConsumer := consumers.NewOutputRoomEventConsumer( processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider, - streams.InviteStreamProvider, rsAPI, fts, + streams.InviteStreamProvider, rsAPI, fts, asProducer, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") From b7054f4274a0c78aba8d3b9e29ae23d2d6172655 Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Tue, 12 Dec 2023 16:55:03 +0100 Subject: [PATCH 3/6] Version 0.13.5 (#3285) --- CHANGES.md | 24 ++++++++++++++++++++++++ Dockerfile | 3 ++- go.mod | 2 +- go.sum | 4 ++-- helm/dendrite/Chart.yaml | 4 ++-- helm/dendrite/README.md | 2 +- internal/version.go | 2 +- 7 files changed, 33 insertions(+), 8 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 57e3a3d4f..97ec7bec4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,29 @@ # Changelog +## Dendrite 0.13.5 (2023-12-12) + +Upgrading to this version is **highly** recommended, as it fixes several long-standing bugs in +our CanonicalJSON implementation. + +### Fixes + +- Convert unicode escapes to lowercase (gomatrixserverlib) +- Fix canonical json utf-16 surrogate pair detection logic (gomatrixserverlib) +- Handle negative zero and exponential numbers in Canonical JSON verification (gomatrixserverlib) +- Avoid logging unnecessary messages when unable to fetch server keys if multiple fetchers are used (gomatrixserverlib) +- Issues around the device list updater have been fixed, which should ensure that there are always + workers available to process incoming device list updates. +- A panic in the `/hierarchy` endpoints used for spaces has been fixed (client-server and server-server API) +- Fixes around the way we handle database transactions (including a potential connection leak) +- ACLs are now updated when received as outliers +- A race condition, which could lead to bridges instantly leaving a room after joining it, between the SyncAPI and + Appservices has been fixed + +### Features + +- **Appservice login is now supported!** +- Users can now kick themselves (used by some bridges) + ## Dendrite 0.13.4 (2023-10-25) Upgrading to this version is **highly** recommended, as it fixes a long-standing bug in the state resolution diff --git a/Dockerfile b/Dockerfile index 8c8f1588f..523857ccc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,8 @@ # # base installs required dependencies and runs go mod download to cache dependencies # -FROM --platform=${BUILDPLATFORM} docker.io/golang:1.21-alpine AS base +# Pinned to alpine3.18 until https://github.com/mattn/go-sqlite3/issues/1164 is solved +FROM --platform=${BUILDPLATFORM} docker.io/golang:1.21-alpine3.18 AS base RUN apk --update --no-cache add bash build-base curl git # diff --git a/go.mod b/go.mod index 387e99d8c..91d8cd3b4 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 - github.com/matrix-org/gomatrixserverlib v0.0.0-20231122130434-2beadf141c98 + github.com/matrix-org/gomatrixserverlib v0.0.0-20231212115925-41497b7563eb github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7 github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/mattn/go-sqlite3 v1.14.17 diff --git a/go.sum b/go.sum index 0dbd3ac48..fba25076d 100644 --- a/go.sum +++ b/go.sum @@ -208,8 +208,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20231122130434-2beadf141c98 h1:+GsF24sG9WJccf5k54cZj9tLgwW3UON1ujz5u+8SHxc= -github.com/matrix-org/gomatrixserverlib v0.0.0-20231122130434-2beadf141c98/go.mod h1:M8m7seOroO5ePlgxA7AFZymnG90Cnh94rYQyngSrZkk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20231212115925-41497b7563eb h1:Nn+Fr96oi7bIfdOwX5A2L6A2MZCM+lqwLe4/+3+nYj8= +github.com/matrix-org/gomatrixserverlib v0.0.0-20231212115925-41497b7563eb/go.mod h1:M8m7seOroO5ePlgxA7AFZymnG90Cnh94rYQyngSrZkk= github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7 h1:6t8kJr8i1/1I5nNttw6nn1ryQJgzVlBmSGgPiiaTdw4= github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7/go.mod h1:ReWMS/LoVnOiRAdq9sNUC2NZnd1mZkMNB52QhpTRWjg= github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 h1:6z4KxomXSIGWqhHcfzExgkH3Z3UkIXry4ibJS4Aqz2Y= diff --git a/helm/dendrite/Chart.yaml b/helm/dendrite/Chart.yaml index 32f479960..f36f457c5 100644 --- a/helm/dendrite/Chart.yaml +++ b/helm/dendrite/Chart.yaml @@ -1,7 +1,7 @@ apiVersion: v2 name: dendrite -version: "0.13.5" -appVersion: "0.13.4" +version: "0.13.6" +appVersion: "0.13.5" description: Dendrite Matrix Homeserver type: application keywords: diff --git a/helm/dendrite/README.md b/helm/dendrite/README.md index 22daa1813..f5f824927 100644 --- a/helm/dendrite/README.md +++ b/helm/dendrite/README.md @@ -1,7 +1,7 @@ # dendrite -![Version: 0.13.5](https://img.shields.io/badge/Version-0.13.5-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 0.13.4](https://img.shields.io/badge/AppVersion-0.13.4-informational?style=flat-square) +![Version: 0.13.6](https://img.shields.io/badge/Version-0.13.6-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 0.13.5](https://img.shields.io/badge/AppVersion-0.13.5-informational?style=flat-square) Dendrite Matrix Homeserver Status: **NOT PRODUCTION READY** diff --git a/internal/version.go b/internal/version.go index 55726ddcb..3fc52e376 100644 --- a/internal/version.go +++ b/internal/version.go @@ -18,7 +18,7 @@ var build string const ( VersionMajor = 0 VersionMinor = 13 - VersionPatch = 4 + VersionPatch = 5 VersionTag = "" // example: "rc1" gitRevLen = 7 // 7 matches the displayed characters on github.com From d65449c7822e89b506bf2caa7a098e38970f6f27 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Tue, 12 Dec 2023 17:31:36 +0100 Subject: [PATCH 4/6] Also pin Pinecone and Yggdrasil demo --- build/docker/Dockerfile.demo-pinecone | 3 ++- build/docker/Dockerfile.demo-yggdrasil | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/build/docker/Dockerfile.demo-pinecone b/build/docker/Dockerfile.demo-pinecone index ab50cf318..fae8ca036 100644 --- a/build/docker/Dockerfile.demo-pinecone +++ b/build/docker/Dockerfile.demo-pinecone @@ -1,4 +1,5 @@ -FROM docker.io/golang:1.21-alpine AS base +# Pinned to alpine3.18 until https://github.com/mattn/go-sqlite3/issues/1164 is solved +FROM docker.io/golang:1.21-alpine3.18 AS base # # Needs to be separate from the main Dockerfile for OpenShift, diff --git a/build/docker/Dockerfile.demo-yggdrasil b/build/docker/Dockerfile.demo-yggdrasil index b9e387666..502233ea9 100644 --- a/build/docker/Dockerfile.demo-yggdrasil +++ b/build/docker/Dockerfile.demo-yggdrasil @@ -1,4 +1,5 @@ -FROM docker.io/golang:1.21-alpine AS base +# Pinned to alpine3.18 until https://github.com/mattn/go-sqlite3/issues/1164 is solved +FROM docker.io/golang:1.21-alpine3.18 AS base # # Needs to be separate from the main Dockerfile for OpenShift, From f93d1c4790cf2f9b6d5fd838f113e8d5f3da8cb1 Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Tue, 19 Dec 2023 08:25:47 +0100 Subject: [PATCH 5/6] Use `AckExplicitPolicy` instead of `AckAllPolicy` (#3288) Fixes https://github.com/matrix-org/dendrite/issues/3240 and potentially a root cause for state resets. While testing, I've had added some more debug logging: ``` time="2023-12-16T18:13:11.319458084Z" level=warning msg="already processed event" event_id="$qFYMl_F2vb1N0yxmvlFAMhqhGhLKq4kA-o_YCQKH7tQ" kind=KindNew times=2 time="2023-12-16T18:13:14.537389126Z" level=warning msg="already processed event" event_id="$EU-LTsKErT6Mt1k12-p_3xOHfiLaK6gtwVDlZ35lSuo" kind=KindNew times=5 time="2023-12-16T18:13:16.789551206Z" level=warning msg="already processed event" event_id="$dIPuAfTL5x0VyG873LKPslQeljCSxFT1WKxUtjIMUGE" kind=KindNew times=5 time="2023-12-16T18:13:17.383838767Z" level=warning msg="already processed event" event_id="$7noSZiCkzerpkz_UBO3iatpRnaOiPx-3IXc0GPDQVGE" kind=KindNew times=2 time="2023-12-16T18:13:22.091946597Z" level=warning msg="already processed event" event_id="$3Lvo3Wbi2ol9-nNbQ93N-E2MuGQCJZo5397KkFH-W6E" kind=KindNew times=1 time="2023-12-16T18:13:23.026417446Z" level=warning msg="already processed event" event_id="$lj1xS46zsLBCChhKOLJEG-bu7z-_pq9i_Y2DUIjzGy4" kind=KindNew times=4 ``` So we did receive the same event over and over again. Given they are `KindNew`, we don't short circuit if we already processed them, which potentially caused the state to be calculated with a now wrong state snapshot. Also fixes the back pressure metric. We now correctly increment the counter once we sent the message to NATS and decrement it once we actually processed an event. --- go.mod | 18 ++--- go.sum | 36 ++++----- roomserver/internal/input/input.go | 92 ++++++++++++++++++----- roomserver/internal/input/input_events.go | 6 +- roomserver/roomserver_test.go | 53 +++++++++++++ 5 files changed, 158 insertions(+), 47 deletions(-) diff --git a/go.mod b/go.mod index 91d8cd3b4..d8b319d13 100644 --- a/go.mod +++ b/go.mod @@ -26,8 +26,8 @@ require ( github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7 github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/mattn/go-sqlite3 v1.14.17 - github.com/nats-io/nats-server/v2 v2.9.23 - github.com/nats-io/nats.go v1.28.0 + github.com/nats-io/nats-server/v2 v2.10.7 + github.com/nats-io/nats.go v1.31.0 github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/opentracing/opentracing-go v1.2.0 @@ -42,12 +42,12 @@ require ( github.com/uber/jaeger-lib v2.4.1+incompatible github.com/yggdrasil-network/yggdrasil-go v0.4.6 go.uber.org/atomic v1.10.0 - golang.org/x/crypto v0.14.0 + golang.org/x/crypto v0.16.0 golang.org/x/exp v0.0.0-20230809150735-7b3493d9a819 golang.org/x/image v0.10.0 golang.org/x/mobile v0.0.0-20221020085226-b36e6246172e golang.org/x/sync v0.3.0 - golang.org/x/term v0.13.0 + golang.org/x/term v0.15.0 gopkg.in/h2non/bimg.v1 v1.1.9 gopkg.in/yaml.v2 v2.4.0 gotest.tools/v3 v3.4.0 @@ -94,7 +94,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/juju/errors v1.0.0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect - github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/compress v1.17.4 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.17 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect @@ -104,7 +104,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/nats-io/jwt/v2 v2.5.0 // indirect + github.com/nats-io/jwt/v2 v2.5.3 // indirect github.com/nats-io/nkeys v0.4.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/onsi/ginkgo/v2 v2.11.0 // indirect @@ -124,9 +124,9 @@ require ( go.etcd.io/bbolt v1.3.6 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect - golang.org/x/time v0.3.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.12.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/macaroon.v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index fba25076d..543b8b4ae 100644 --- a/go.sum +++ b/go.sum @@ -190,8 +190,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:C github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -242,12 +242,12 @@ github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7P github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/nats-io/jwt/v2 v2.5.0 h1:WQQ40AAlqqfx+f6ku+i0pOVm+ASirD4fUh+oQsiE9Ak= -github.com/nats-io/jwt/v2 v2.5.0/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.9.23 h1:6Wj6H6QpP9FMlpCyWUaNu2yeZ/qGj+mdRkZ1wbikExU= -github.com/nats-io/nats-server/v2 v2.9.23/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0= -github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= -github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= +github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= +github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y= +github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -354,8 +354,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -422,24 +422,24 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 404751532..20d2cfc7a 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -119,9 +119,14 @@ func (r *Inputer) startWorkerForRoom(roomID string) { w.Lock() defer w.Unlock() if !loaded || w.subscription == nil { + streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent) consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID)) + logger := logrus.WithFields(logrus.Fields{ + "stream_name": streamName, + "consumer": consumer, + }) // Create the consumer. We do this as a specific step rather than // letting PullSubscribe create it for us because we need the consumer // to outlive the subscription. If we do it this way, we can Bind in the @@ -135,21 +140,62 @@ func (r *Inputer) startWorkerForRoom(roomID string) { // before it. This is necessary because otherwise our consumer will never // acknowledge things we filtered out for other subjects and therefore they // will linger around forever. - if _, err := w.r.JetStream.AddConsumer( - r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), - &nats.ConsumerConfig{ - Durable: consumer, - AckPolicy: nats.AckAllPolicy, - DeliverPolicy: nats.DeliverAllPolicy, - FilterSubject: subject, - AckWait: MaximumMissingProcessingTime + (time.Second * 10), - InactiveThreshold: inactiveThreshold, - }, - ); err != nil { - logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) + + info, err := w.r.JetStream.ConsumerInfo(streamName, consumer) + if err != nil && !errors.Is(err, nats.ErrConsumerNotFound) { + // log and return, we will retry anyway + logger.WithError(err).Errorf("failed to get consumer info") return } + consumerConfig := &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: MaximumMissingProcessingTime + (time.Second * 10), + InactiveThreshold: inactiveThreshold, + } + + // The consumer already exists, try to update if necessary. + if info != nil { + // Not using reflect.DeepEqual here, since consumerConfig does not explicitly set + // e.g. the consumerName, which is added by NATS later. So this would result + // in constantly updating/recreating the consumer. + switch { + case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds(): + // Initially we had a AckWait of 2m 10s, now we have 5m 10s, so we need to update + // existing consumers. + fallthrough + case info.Config.AckPolicy != consumerConfig.AckPolicy: + // We've changed the AckPolicy from AckAll to AckExplicit, this needs a + // recreation of the consumer. (Note: Only a few changes actually need a recreat) + logger.Warn("Consumer already exists, trying to update it.") + // Try updating the consumer first + if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil { + // We failed to update the consumer, recreate it + logger.WithError(err).Warn("Unable to update consumer, recreating...") + if err = w.r.JetStream.DeleteConsumer(streamName, consumer); err != nil { + logger.WithError(err).Fatal("Unable to delete consumer") + return + } + // Set info to nil, so it can be recreated with the correct config. + info = nil + } + } + } + + if info == nil { + // Create the consumer with the correct config + if _, err = w.r.JetStream.AddConsumer( + r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), + consumerConfig, + ); err != nil { + logger.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) + return + } + } + // Bind to our durable consumer. We want to receive all messages waiting // for this subject and we want to manually acknowledge them, so that we // can ensure they are only cleaned up when we are done processing them. @@ -162,7 +208,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) { nats.InactiveThreshold(inactiveThreshold), ) if err != nil { - logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) + logger.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) return } @@ -263,21 +309,23 @@ func (w *worker) _next() { return } + // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + // Try to unmarshal the input room event. If the JSON unmarshalling // fails then we'll terminate the message — this notifies NATS that // we are done with the message and never want to see it again. msg := msgs[0] var inputRoomEvent api.InputRoomEvent if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { - _ = msg.Term() + // using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS + _ = msg.Term(nats.AckWait(time.Second * 5)) return } if scope := sentry.CurrentHub().Scope(); scope != nil { scope.SetTag("event_id", inputRoomEvent.Event.EventID()) } - roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc() - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() // Process the room event. If something goes wrong then we'll tell // NATS to terminate the message. We'll store the error result as @@ -307,10 +355,15 @@ func (w *worker) _next() { "type": inputRoomEvent.Event.Type(), }).Warn("Roomserver failed to process event") } - _ = msg.Term() + // Even though we failed to process this message (e.g. due to Dendrite restarting and receiving a context canceled), + // the message may already have been queued for redelivery or will be, so this makes sure that we still reprocess the msg + // after restarting. We only Ack if the context was not yet canceled. + if w.r.ProcessContext.Context().Err() == nil { + _ = msg.AckSync() + } errString = err.Error() } else { - _ = msg.Ack() + _ = msg.AckSync() } // If it was a synchronous input request then the "sync" field @@ -381,6 +434,9 @@ func (r *Inputer) queueInputRoomEvents( }).Error("Roomserver failed to queue async event") return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err) } + + // Now that the event is queued, increment the room backpressure + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } return } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 520f82a80..1d9208434 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -48,8 +48,10 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" ) -// TODO: Does this value make sense? -const MaximumMissingProcessingTime = time.Minute * 2 +// MaximumMissingProcessingTime is the maximum time we allow "processRoomEvent" to fetch +// e.g. missing auth/prev events. This duration is used for AckWait, and if it is exceeded +// NATS queues the event for redelivery. +const MaximumMissingProcessingTime = time.Minute * 5 var processRoomEventDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index e9cd926d7..88e335711 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -12,7 +12,9 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/tidwall/gjson" @@ -1231,3 +1233,54 @@ func TestNewServerACLs(t *testing.T) { assert.Equal(t, false, banned) }) } + +// Validate that changing the AckPolicy/AckWait of room consumers +// results in their recreation +func TestRoomConsumerRecreation(t *testing.T) { + + alice := test.NewUser(t) + room := test.NewRoom(t, alice) + + // As this is DB unrelated, just use SQLite + cfg, processCtx, closeDB := testrig.CreateConfig(t, test.DBTypeSQLite) + defer closeDB() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + natsInstance := &jetstream.NATSInstance{} + + // Prepare a stream and consumer using the old configuration + jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + + streamName := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent) + consumer := cfg.Global.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(room.ID)) + subject := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEventSubj(room.ID)) + + consumerConfig := &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckAllPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: (time.Minute * 2) + (time.Second * 10), + InactiveThreshold: time.Hour * 24, + } + + // Create the consumer with the old config + _, err := jsCtx.AddConsumer(streamName, consumerConfig) + assert.NoError(t, err) + + caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) + // start JetStream listeners + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) + + // let the RS create the events, this also recreates the Consumers + err = api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false) + assert.NoError(t, err) + + // Validate that AckPolicy and AckWait has changed + info, err := jsCtx.ConsumerInfo(streamName, consumer) + assert.NoError(t, err) + assert.Equal(t, nats.AckExplicitPolicy, info.Config.AckPolicy) + + wantAckWait := input.MaximumMissingProcessingTime + (time.Second * 10) + assert.Equal(t, wantAckWait, info.Config.AckWait) +} From 9a5a56718e52793bbdf672ff1d19b37410010f77 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 19 Dec 2023 08:39:22 +0100 Subject: [PATCH 6/6] Bump golang.org/x/crypto from 0.14.0 to 0.17.0 (#3290) Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.14.0 to 0.17.0.
Commits

[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=golang.org/x/crypto&package-manager=go_modules&previous-version=0.14.0&new-version=0.17.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) ---
Dependabot commands and options
You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) You can disable automated security fix PRs for this repo from the [Security Alerts page](https://github.com/matrix-org/dendrite/network/alerts).
Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d8b319d13..d9efae7fb 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/uber/jaeger-lib v2.4.1+incompatible github.com/yggdrasil-network/yggdrasil-go v0.4.6 go.uber.org/atomic v1.10.0 - golang.org/x/crypto v0.16.0 + golang.org/x/crypto v0.17.0 golang.org/x/exp v0.0.0-20230809150735-7b3493d9a819 golang.org/x/image v0.10.0 golang.org/x/mobile v0.0.0-20221020085226-b36e6246172e diff --git a/go.sum b/go.sum index 543b8b4ae..0e3367fcb 100644 --- a/go.sum +++ b/go.sum @@ -354,8 +354,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= -golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=