From 62afb936a5d282d0f500645c3442c1c294d37bdb Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 12 Sep 2022 15:27:45 +0100 Subject: [PATCH 01/10] Update to matrix-org/gomatrixserverlib@7b96db4 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a3e52c1a4..d151b2995 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20220911125436-dec87dbaa407 + github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 diff --git a/go.sum b/go.sum index 4ab56010c..3cad3f530 100644 --- a/go.sum +++ b/go.sum @@ -388,8 +388,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220911125436-dec87dbaa407 h1:UciyfR3UTWnpqFBvEAMwGmZpjjO2hemFkWmwa/tB+fw= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220911125436-dec87dbaa407/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a h1:XeGBDZZsUe4kgj3myl0EiuDNVWxszJecMTrON3Wn9sI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 h1:XuJYAJNkdG3zj9cO0yQSvL+Sp2xogsTOuZRx7PwdtoA= github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= From 100fa9b2354efa05b4fbff3a3cb745ea7783d41c Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Tue, 13 Sep 2022 08:07:43 +0200 Subject: [PATCH 02/10] Check unique constraint errors when manually inserting migrations (#2712) This should avoid unnecessary logging on startup if the migration (were we need `InsertMigration`) was already executed. This now checks for "unique constraint errors" for SQLite and Postgres and fails the startup process if the migration couldn't be manually inserted for some other reason. --- internal/sqlutil/migrate.go | 5 +++++ ...{postgres_wasm.go => unique_constraint.go} | 21 ++++++++++++++++--- ...{postgres.go => unique_constraint_wasm.go} | 17 +++++++++------ .../storage/postgres/key_changes_table.go | 6 ++---- .../storage/sqlite3/key_changes_table.go | 6 ++---- roomserver/storage/postgres/storage.go | 5 +---- roomserver/storage/sqlite3/storage.go | 4 +--- 7 files changed, 40 insertions(+), 24 deletions(-) rename internal/sqlutil/{postgres_wasm.go => unique_constraint.go} (62%) rename internal/sqlutil/{postgres.go => unique_constraint_wasm.go} (74%) diff --git a/internal/sqlutil/migrate.go b/internal/sqlutil/migrate.go index 98e7d8938..b6a8b1f25 100644 --- a/internal/sqlutil/migrate.go +++ b/internal/sqlutil/migrate.go @@ -155,5 +155,10 @@ func InsertMigration(ctx context.Context, db *sql.DB, migrationName string) erro time.Now().Format(time.RFC3339), internal.VersionString(), ) + // If the migration was already executed, we'll get a unique constraint error, + // return nil instead, to avoid unnecessary logging. + if IsUniqueConstraintViolationErr(err) { + return nil + } return err } diff --git a/internal/sqlutil/postgres_wasm.go b/internal/sqlutil/unique_constraint.go similarity index 62% rename from internal/sqlutil/postgres_wasm.go rename to internal/sqlutil/unique_constraint.go index 34086f450..4a1b7fd94 100644 --- a/internal/sqlutil/postgres_wasm.go +++ b/internal/sqlutil/unique_constraint.go @@ -12,12 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build wasm -// +build wasm +//go:build !wasm +// +build !wasm package sqlutil -// IsUniqueConstraintViolationErr no-ops for this architecture +import ( + "github.com/lib/pq" + "github.com/mattn/go-sqlite3" +) + +// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error func IsUniqueConstraintViolationErr(err error) bool { + switch e := err.(type) { + case *pq.Error: + return e.Code == "23505" + case pq.Error: + return e.Code == "23505" + case *sqlite3.Error: + return e.Code == sqlite3.ErrConstraint + case sqlite3.Error: + return e.Code == sqlite3.ErrConstraint + } return false } diff --git a/internal/sqlutil/postgres.go b/internal/sqlutil/unique_constraint_wasm.go similarity index 74% rename from internal/sqlutil/postgres.go rename to internal/sqlutil/unique_constraint_wasm.go index 5e656b1da..02ceb5851 100644 --- a/internal/sqlutil/postgres.go +++ b/internal/sqlutil/unique_constraint_wasm.go @@ -12,15 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build !wasm -// +build !wasm +//go:build wasm +// +build wasm package sqlutil -import "github.com/lib/pq" +import "github.com/mattn/go-sqlite3" -// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error +// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error func IsUniqueConstraintViolationErr(err error) bool { - pqErr, ok := err.(*pq.Error) - return ok && pqErr.Code == "23505" + switch e := err.(type) { + case *sqlite3.Error: + return e.Code == sqlite3.ErrConstraint + case sqlite3.Error: + return e.Code == sqlite3.ErrConstraint + } + return false } diff --git a/keyserver/storage/postgres/key_changes_table.go b/keyserver/storage/postgres/key_changes_table.go index e91b048d5..c0e3429c7 100644 --- a/keyserver/storage/postgres/key_changes_table.go +++ b/keyserver/storage/postgres/key_changes_table.go @@ -18,8 +18,7 @@ import ( "context" "database/sql" "errors" - - "github.com/sirupsen/logrus" + "fmt" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -81,8 +80,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error { if err != nil { if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil { - // not a fatal error, log and continue - logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName) + return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err) } return nil } diff --git a/keyserver/storage/sqlite3/key_changes_table.go b/keyserver/storage/sqlite3/key_changes_table.go index b68df5552..0c844d67a 100644 --- a/keyserver/storage/sqlite3/key_changes_table.go +++ b/keyserver/storage/sqlite3/key_changes_table.go @@ -18,8 +18,7 @@ import ( "context" "database/sql" "errors" - - "github.com/sirupsen/logrus" + "fmt" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -80,8 +79,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error { if err != nil { if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil { - // not a fatal error, log and continue - logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName) + return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err) } return nil } diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go index 26178df83..23a5f79eb 100644 --- a/roomserver/storage/postgres/storage.go +++ b/roomserver/storage/postgres/storage.go @@ -21,8 +21,6 @@ import ( "errors" "fmt" - "github.com/sirupsen/logrus" - // Import the postgres database driver. _ "github.com/lib/pq" @@ -79,8 +77,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error { if err != nil { if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil { - // not a fatal error, log and continue - logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName) + return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err) } return nil } diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index c7bc00393..01c3f879c 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -22,7 +22,6 @@ import ( "fmt" "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -87,8 +86,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error { if err != nil { if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil { - // not a fatal error, log and continue - logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName) + return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err) } return nil } From c366ccdfcaf1ea820bef4744fda90aff4db1c67d Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Tue, 13 Sep 2022 09:35:45 +0200 Subject: [PATCH 03/10] Send-to-device consumer/producer tweaks (#2713) Some tweaks for the send-to-device consumers/producers: - use `json.RawMessage` without marshalling it first - try further devices (if available) if we failed to `PublishMsg` in the producers - some logging changes (to better debug E2EE issues) --- clientapi/producers/syncapi.go | 32 ++++++++++++++---------------- federationapi/producers/syncapi.go | 24 ++++++++++------------ syncapi/consumers/sendtodevice.go | 15 ++++++++------ syncapi/syncapi_test.go | 4 +--- 4 files changed, 35 insertions(+), 40 deletions(-) diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 5933ce1a8..2dc0c4843 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -21,12 +21,13 @@ import ( "strconv" "time" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/syncapi/types" - userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" ) // SyncAPIProducer produces events for the sync API server to consume @@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt( func (p *SyncAPIProducer) SendToDevice( ctx context.Context, sender, userID, deviceID, eventType string, - message interface{}, + message json.RawMessage, ) error { devices := []string{} _, domain, err := gomatrixserverlib.SplitID('@', userID) @@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice( devices = append(devices, deviceID) } - js, err := json.Marshal(message) - if err != nil { - return err - } - log.WithFields(log.Fields{ "user_id": userID, "num_devices": len(devices), "type": eventType, }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) - for _, device := range devices { + for i, device := range devices { ote := &types.OutputSendToDeviceEvent{ UserID: userID, DeviceID: device, SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ Sender: sender, Type: eventType, - Content: js, + Content: message, }, } @@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice( log.WithError(err).Error("sendToDevice failed json.Marshal") return err } - m := &nats.Msg{ - Subject: p.TopicSendToDeviceEvent, - Data: eventJSON, - Header: nats.Header{}, - } + m := nats.NewMsg(p.TopicSendToDeviceEvent) + m.Data = eventJSON m.Header.Set("sender", sender) m.Header.Set(jetstream.UserID, userID) + if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { - log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + if i < len(devices)-1 { + log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices") + continue + } + log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices") return err } } diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go index 86c8c10a3..4abd3fbe5 100644 --- a/federationapi/producers/syncapi.go +++ b/federationapi/producers/syncapi.go @@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt( func (p *SyncAPIProducer) SendToDevice( ctx context.Context, sender, userID, deviceID, eventType string, - message interface{}, + message json.RawMessage, ) error { devices := []string{} _, domain, err := gomatrixserverlib.SplitID('@', userID) @@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice( devices = append(devices, deviceID) } - js, err := json.Marshal(message) - if err != nil { - return err - } - log.WithFields(log.Fields{ "user_id": userID, "num_devices": len(devices), "type": eventType, }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) - for _, device := range devices { + for i, device := range devices { ote := &types.OutputSendToDeviceEvent{ UserID: userID, DeviceID: device, SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ Sender: sender, Type: eventType, - Content: js, + Content: message, }, } @@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice( log.WithError(err).Error("sendToDevice failed json.Marshal") return err } - m := &nats.Msg{ - Subject: p.TopicSendToDeviceEvent, - Data: eventJSON, - Header: nats.Header{}, - } + m := nats.NewMsg(p.TopicSendToDeviceEvent) + m.Data = eventJSON m.Header.Set("sender", sender) m.Header.Set(jetstream.UserID, userID) if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { - log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + if i < len(devices)-1 { + log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices") + continue + } + log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices") return err } } diff --git a/syncapi/consumers/sendtodevice.go b/syncapi/consumers/sendtodevice.go index 89b01d7e5..7d6aae597 100644 --- a/syncapi/consumers/sendtodevice.go +++ b/syncapi/consumers/sendtodevice.go @@ -19,16 +19,17 @@ import ( "encoding/json" "github.com/getsentry/sentry-go" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/nats-io/nats.go" - log "github.com/sirupsen/logrus" ) // OutputSendToDeviceEventConsumer consumes events that originated in the EDU server. @@ -79,16 +80,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs [] _, domain, err := gomatrixserverlib.SplitID('@', userID) if err != nil { sentry.CaptureException(err) + log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message") return true } if domain != s.serverName { + log.Tracef("ignoring send-to-device event with destination %s", domain) return true } var output types.OutputSendToDeviceEvent if err = json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("output log: message parse failure") + log.WithError(err).Errorf("send-to-device: message parse failure") sentry.CaptureException(err) return true } @@ -105,7 +108,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs [] ) if err != nil { sentry.CaptureException(err) - log.WithError(err).Errorf("failed to store send-to-device message") + log.WithError(err).Errorf("send-to-device: failed to store message") return false } diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index c81256aa7..a4985dbf4 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -624,9 +624,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { // Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}` for i := 0; i < tc.sendMessagesCount; i++ { msgCounter++ - msg := map[string]string{ - "dummy": fmt.Sprintf("message %d", msgCounter), - } + msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter)) if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil { t.Fatalf("unable to send to device message: %v", err) } From 3e55856254f6bc918f5075d7adb85d53da92c7c8 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 13 Sep 2022 09:37:38 +0100 Subject: [PATCH 04/10] Always resolve state in `QueryStateAfterEvents` --- roomserver/internal/query/query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 6dce2bc3e..d08c5c491 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -106,7 +106,7 @@ func (r *Queryer) QueryStateAfterEvents( return err } - if len(request.PrevEventIDs) > 1 && len(request.StateToFetch) == 0 { + if len(request.PrevEventIDs) > 1 { var authEventIDs []string for _, e := range stateEvents { authEventIDs = append(authEventIDs, e.AuthEventIDs()...) From b05e028f7d8befc045dcb62e87d09ad462bb277c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 13 Sep 2022 12:52:09 +0100 Subject: [PATCH 05/10] Tweak `LoadMembershipAtEvent` behaviour when state not known (#2716) Previously `LoadMembershipAtEvent` would fail if the state before one of the events was not known, i.e. because it was an outlier. This modifies it so that it gracefully handles not knowing the state and returns no memberships instead, so that history visibility doesn't freak out and kill `/sync` requests dead. --- roomserver/api/query.go | 3 ++- roomserver/internal/query/query.go | 9 ++++++++- roomserver/state/state.go | 8 ++++++-- syncapi/streams/stream_pdu.go | 5 ++--- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 32d63bb51..aa7dc4735 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -439,6 +439,7 @@ type QueryMembershipAtEventRequest struct { // QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest. type QueryMembershipAtEventResponse struct { - // Memberships is a map from eventID to a list of events (if any). + // Memberships is a map from eventID to a list of events (if any). Events that + // do not have known state will return an empty array here. Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"` } diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index d08c5c491..b41a92e94 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -208,6 +208,9 @@ func (r *Queryer) QueryMembershipForUser( return err } +// QueryMembershipAtEvent returns the known memberships at a given event. +// If the state before an event is not known, an empty list will be returned +// for that event instead. func (r *Queryer) QueryMembershipAtEvent( ctx context.Context, request *api.QueryMembershipAtEventRequest, @@ -237,7 +240,11 @@ func (r *Queryer) QueryMembershipAtEvent( } for _, eventID := range request.EventIDs { - stateEntry := stateEntries[eventID] + stateEntry, ok := stateEntries[eventID] + if !ok { + response.Memberships[eventID] = []*gomatrixserverlib.HeaderedEvent{} + continue + } memberships, err := helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false) if err != nil { return fmt.Errorf("unable to get memberships at state: %w", err) diff --git a/roomserver/state/state.go b/roomserver/state/state.go index a40a2e9ba..cb96d83ec 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -18,6 +18,7 @@ package state import ( "context" + "database/sql" "fmt" "sort" "sync" @@ -134,11 +135,14 @@ func (v *StateResolution) LoadMembershipAtEvent( for i := range eventIDs { eventID := eventIDs[i] snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID) - if err != nil { + if err != nil && err != sql.ErrNoRows { return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err) } if snapshotNID == 0 { - return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID) + // If we don't know a state snapshot for this event then we can't calculate + // memberships at the time of the event, so skip over it. This means that + // it isn't guaranteed that the response map will contain every single event. + continue } snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID) } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index ffcf64df6..0ab6de886 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -388,7 +388,7 @@ func applyHistoryVisibilityFilter( if err != nil { // Not a fatal error, we can continue without the stateEvents, // they are only needed if there are state events in the timeline. - logrus.WithError(err).Warnf("failed to get current room state") + logrus.WithError(err).Warnf("Failed to get current room state for history visibility") } alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents)) for _, ev := range stateEvents { @@ -397,7 +397,6 @@ func applyHistoryVisibilityFilter( startTime := time.Now() events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync") if err != nil { - return nil, err } logrus.WithFields(logrus.Fields{ @@ -405,7 +404,7 @@ func applyHistoryVisibilityFilter( "room_id": roomID, "before": len(recentEvents), "after": len(events), - }).Debug("applied history visibility (sync)") + }).Trace("Applied history visibility (sync)") return events, nil } From 482914aef4a7d637a8c468d46904fde9f478b5d1 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 13 Sep 2022 15:25:02 +0100 Subject: [PATCH 06/10] Use `AckNone` on the ephemeral room input consumer --- roomserver/internal/input/input.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index a8a3e0248..1cb980dc3 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -172,11 +172,10 @@ func (r *Inputer) Start() error { func(m *nats.Msg) { roomID := m.Header.Get(jetstream.RoomID) r.startWorkerForRoom(roomID) - _ = m.Ack() }, nats.HeadersOnly(), nats.DeliverAll(), - nats.AckAll(), + nats.AckNone(), nats.BindStream(r.InputRoomEventTopic), ) return err From 7f89fed1e45cc6ea65c420e32a017df634f4164f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 14 Sep 2022 09:55:50 +0100 Subject: [PATCH 07/10] Revert 482914aef4a7d637a8c468d46904fde9f478b5d1 --- roomserver/internal/input/input.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 1cb980dc3..c47793f0a 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -172,10 +172,12 @@ func (r *Inputer) Start() error { func(m *nats.Msg) { roomID := m.Header.Get(jetstream.RoomID) r.startWorkerForRoom(roomID) + _ = m.Ack() }, nats.HeadersOnly(), nats.DeliverAll(), - nats.AckNone(), + nats.AckExplicit(), + nats.ReplayInstant(), nats.BindStream(r.InputRoomEventTopic), ) return err From e6960d0b15b6cb216f1d566a7a47a891348fd48f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 14 Sep 2022 14:25:25 +0100 Subject: [PATCH 08/10] Update to matrix-org/pinecone@608215eb1b2920f3510b56c4a36a87ed9e75779f --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d151b2995..263a0065d 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a - github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 + github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 github.com/nats-io/nats-server/v2 v2.9.0 diff --git a/go.sum b/go.sum index 3cad3f530..abdc11665 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5d github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a h1:XeGBDZZsUe4kgj3myl0EiuDNVWxszJecMTrON3Wn9sI= github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= -github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 h1:XuJYAJNkdG3zj9cO0yQSvL+Sp2xogsTOuZRx7PwdtoA= -github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= +github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 h1:/AIaqhK1BBi2sMEVQdgZRV8H8sNloAGCgztLZhsPqD0= +github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= From 0ea948c70548875601a544c57a588271af3adce4 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 14 Sep 2022 14:26:24 +0100 Subject: [PATCH 09/10] Fix Pinecone demo build errors after Pinecone update --- build/gobind-pinecone/monolith.go | 2 +- cmd/dendrite-demo-pinecone/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 91bbb6873..500403ae4 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -255,7 +255,7 @@ func (m *DendriteMonolith) Start() { m.logger.SetOutput(BindLogger{}) logrus.SetOutput(BindLogger{}) - m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) + m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"}) m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter) m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil) diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index ff9cb5aa1..99e597e59 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -151,7 +151,7 @@ func main() { base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck - pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) + pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) pManager := pineconeConnections.NewConnectionManager(pRouter, nil) From a5f8c07184921b4cfdea42c872852ca5e5e5a4ce Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Thu, 15 Sep 2022 07:26:26 +0200 Subject: [PATCH 10/10] Hopefully fix `upgrade-tests` (#2717) Wait for events to come down `/sync` before ending the test. --- cmd/dendrite-upgrade-tests/tests.go | 45 +++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/cmd/dendrite-upgrade-tests/tests.go b/cmd/dendrite-upgrade-tests/tests.go index ff1e09dda..5c9589df2 100644 --- a/cmd/dendrite-upgrade-tests/tests.go +++ b/cmd/dendrite-upgrade-tests/tests.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" @@ -81,11 +82,14 @@ func runTests(baseURL, branchName string) error { client: users[1].client, text: "4: " + branchName, }, } + wantEventIDs := make(map[string]struct{}, 8) for _, msg := range msgs { - _, err = msg.client.SendText(dmRoomID, msg.text) + var resp *gomatrix.RespSendEvent + resp, err = msg.client.SendText(dmRoomID, msg.text) if err != nil { return fmt.Errorf("failed to send text in dm room: %s", err) } + wantEventIDs[resp.EventID] = struct{}{} } // attempt to create/join the shared public room @@ -113,11 +117,48 @@ func runTests(baseURL, branchName string) error { } // send messages for _, msg := range msgs { - _, err = msg.client.SendText(publicRoomID, "public "+msg.text) + resp, err := msg.client.SendText(publicRoomID, "public "+msg.text) if err != nil { return fmt.Errorf("failed to send text in public room: %s", err) } + wantEventIDs[resp.EventID] = struct{}{} } + + // Sync until we have all expected messages + doneCh := make(chan struct{}) + go func() { + syncClient := users[0].client + since := "" + for len(wantEventIDs) > 0 { + select { + case <-doneCh: + return + default: + } + syncResp, err := syncClient.SyncRequest(1000, since, "1", false, "") + if err != nil { + continue + } + for _, room := range syncResp.Rooms.Join { + for _, ev := range room.Timeline.Events { + if ev.Type != "m.room.message" { + continue + } + delete(wantEventIDs, ev.ID) + } + } + since = syncResp.NextBatch + } + close(doneCh) + }() + + select { + case <-time.After(time.Second * 10): + close(doneCh) + return fmt.Errorf("failed to receive all expected messages: %+v", wantEventIDs) + case <-doneCh: + } + log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID) return nil }