From 3e55856254f6bc918f5075d7adb85d53da92c7c8 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 13 Sep 2022 09:37:38 +0100 Subject: [PATCH 1/7] Always resolve state in `QueryStateAfterEvents` --- roomserver/internal/query/query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 6dce2bc3e..d08c5c491 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -106,7 +106,7 @@ func (r *Queryer) QueryStateAfterEvents( return err } - if len(request.PrevEventIDs) > 1 && len(request.StateToFetch) == 0 { + if len(request.PrevEventIDs) > 1 { var authEventIDs []string for _, e := range stateEvents { authEventIDs = append(authEventIDs, e.AuthEventIDs()...) From b05e028f7d8befc045dcb62e87d09ad462bb277c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 13 Sep 2022 12:52:09 +0100 Subject: [PATCH 2/7] Tweak `LoadMembershipAtEvent` behaviour when state not known (#2716) Previously `LoadMembershipAtEvent` would fail if the state before one of the events was not known, i.e. because it was an outlier. This modifies it so that it gracefully handles not knowing the state and returns no memberships instead, so that history visibility doesn't freak out and kill `/sync` requests dead. --- roomserver/api/query.go | 3 ++- roomserver/internal/query/query.go | 9 ++++++++- roomserver/state/state.go | 8 ++++++-- syncapi/streams/stream_pdu.go | 5 ++--- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 32d63bb51..aa7dc4735 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -439,6 +439,7 @@ type QueryMembershipAtEventRequest struct { // QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest. type QueryMembershipAtEventResponse struct { - // Memberships is a map from eventID to a list of events (if any). + // Memberships is a map from eventID to a list of events (if any). Events that + // do not have known state will return an empty array here. Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"` } diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index d08c5c491..b41a92e94 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -208,6 +208,9 @@ func (r *Queryer) QueryMembershipForUser( return err } +// QueryMembershipAtEvent returns the known memberships at a given event. +// If the state before an event is not known, an empty list will be returned +// for that event instead. func (r *Queryer) QueryMembershipAtEvent( ctx context.Context, request *api.QueryMembershipAtEventRequest, @@ -237,7 +240,11 @@ func (r *Queryer) QueryMembershipAtEvent( } for _, eventID := range request.EventIDs { - stateEntry := stateEntries[eventID] + stateEntry, ok := stateEntries[eventID] + if !ok { + response.Memberships[eventID] = []*gomatrixserverlib.HeaderedEvent{} + continue + } memberships, err := helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false) if err != nil { return fmt.Errorf("unable to get memberships at state: %w", err) diff --git a/roomserver/state/state.go b/roomserver/state/state.go index a40a2e9ba..cb96d83ec 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -18,6 +18,7 @@ package state import ( "context" + "database/sql" "fmt" "sort" "sync" @@ -134,11 +135,14 @@ func (v *StateResolution) LoadMembershipAtEvent( for i := range eventIDs { eventID := eventIDs[i] snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID) - if err != nil { + if err != nil && err != sql.ErrNoRows { return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err) } if snapshotNID == 0 { - return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID) + // If we don't know a state snapshot for this event then we can't calculate + // memberships at the time of the event, so skip over it. This means that + // it isn't guaranteed that the response map will contain every single event. + continue } snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID) } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index ffcf64df6..0ab6de886 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -388,7 +388,7 @@ func applyHistoryVisibilityFilter( if err != nil { // Not a fatal error, we can continue without the stateEvents, // they are only needed if there are state events in the timeline. - logrus.WithError(err).Warnf("failed to get current room state") + logrus.WithError(err).Warnf("Failed to get current room state for history visibility") } alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents)) for _, ev := range stateEvents { @@ -397,7 +397,6 @@ func applyHistoryVisibilityFilter( startTime := time.Now() events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync") if err != nil { - return nil, err } logrus.WithFields(logrus.Fields{ @@ -405,7 +404,7 @@ func applyHistoryVisibilityFilter( "room_id": roomID, "before": len(recentEvents), "after": len(events), - }).Debug("applied history visibility (sync)") + }).Trace("Applied history visibility (sync)") return events, nil } From 482914aef4a7d637a8c468d46904fde9f478b5d1 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 13 Sep 2022 15:25:02 +0100 Subject: [PATCH 3/7] Use `AckNone` on the ephemeral room input consumer --- roomserver/internal/input/input.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index a8a3e0248..1cb980dc3 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -172,11 +172,10 @@ func (r *Inputer) Start() error { func(m *nats.Msg) { roomID := m.Header.Get(jetstream.RoomID) r.startWorkerForRoom(roomID) - _ = m.Ack() }, nats.HeadersOnly(), nats.DeliverAll(), - nats.AckAll(), + nats.AckNone(), nats.BindStream(r.InputRoomEventTopic), ) return err From 7f89fed1e45cc6ea65c420e32a017df634f4164f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 14 Sep 2022 09:55:50 +0100 Subject: [PATCH 4/7] Revert 482914aef4a7d637a8c468d46904fde9f478b5d1 --- roomserver/internal/input/input.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 1cb980dc3..c47793f0a 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -172,10 +172,12 @@ func (r *Inputer) Start() error { func(m *nats.Msg) { roomID := m.Header.Get(jetstream.RoomID) r.startWorkerForRoom(roomID) + _ = m.Ack() }, nats.HeadersOnly(), nats.DeliverAll(), - nats.AckNone(), + nats.AckExplicit(), + nats.ReplayInstant(), nats.BindStream(r.InputRoomEventTopic), ) return err From e6960d0b15b6cb216f1d566a7a47a891348fd48f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 14 Sep 2022 14:25:25 +0100 Subject: [PATCH 5/7] Update to matrix-org/pinecone@608215eb1b2920f3510b56c4a36a87ed9e75779f --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d151b2995..263a0065d 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a - github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 + github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 github.com/nats-io/nats-server/v2 v2.9.0 diff --git a/go.sum b/go.sum index 3cad3f530..abdc11665 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5d github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a h1:XeGBDZZsUe4kgj3myl0EiuDNVWxszJecMTrON3Wn9sI= github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= -github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 h1:XuJYAJNkdG3zj9cO0yQSvL+Sp2xogsTOuZRx7PwdtoA= -github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= +github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 h1:/AIaqhK1BBi2sMEVQdgZRV8H8sNloAGCgztLZhsPqD0= +github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= From 0ea948c70548875601a544c57a588271af3adce4 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 14 Sep 2022 14:26:24 +0100 Subject: [PATCH 6/7] Fix Pinecone demo build errors after Pinecone update --- build/gobind-pinecone/monolith.go | 2 +- cmd/dendrite-demo-pinecone/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 91bbb6873..500403ae4 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -255,7 +255,7 @@ func (m *DendriteMonolith) Start() { m.logger.SetOutput(BindLogger{}) logrus.SetOutput(BindLogger{}) - m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) + m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"}) m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter) m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil) diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index ff9cb5aa1..99e597e59 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -151,7 +151,7 @@ func main() { base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck - pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) + pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) pManager := pineconeConnections.NewConnectionManager(pRouter, nil) From a5f8c07184921b4cfdea42c872852ca5e5e5a4ce Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Thu, 15 Sep 2022 07:26:26 +0200 Subject: [PATCH 7/7] Hopefully fix `upgrade-tests` (#2717) Wait for events to come down `/sync` before ending the test. --- cmd/dendrite-upgrade-tests/tests.go | 45 +++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/cmd/dendrite-upgrade-tests/tests.go b/cmd/dendrite-upgrade-tests/tests.go index ff1e09dda..5c9589df2 100644 --- a/cmd/dendrite-upgrade-tests/tests.go +++ b/cmd/dendrite-upgrade-tests/tests.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" @@ -81,11 +82,14 @@ func runTests(baseURL, branchName string) error { client: users[1].client, text: "4: " + branchName, }, } + wantEventIDs := make(map[string]struct{}, 8) for _, msg := range msgs { - _, err = msg.client.SendText(dmRoomID, msg.text) + var resp *gomatrix.RespSendEvent + resp, err = msg.client.SendText(dmRoomID, msg.text) if err != nil { return fmt.Errorf("failed to send text in dm room: %s", err) } + wantEventIDs[resp.EventID] = struct{}{} } // attempt to create/join the shared public room @@ -113,11 +117,48 @@ func runTests(baseURL, branchName string) error { } // send messages for _, msg := range msgs { - _, err = msg.client.SendText(publicRoomID, "public "+msg.text) + resp, err := msg.client.SendText(publicRoomID, "public "+msg.text) if err != nil { return fmt.Errorf("failed to send text in public room: %s", err) } + wantEventIDs[resp.EventID] = struct{}{} } + + // Sync until we have all expected messages + doneCh := make(chan struct{}) + go func() { + syncClient := users[0].client + since := "" + for len(wantEventIDs) > 0 { + select { + case <-doneCh: + return + default: + } + syncResp, err := syncClient.SyncRequest(1000, since, "1", false, "") + if err != nil { + continue + } + for _, room := range syncResp.Rooms.Join { + for _, ev := range room.Timeline.Events { + if ev.Type != "m.room.message" { + continue + } + delete(wantEventIDs, ev.ID) + } + } + since = syncResp.NextBatch + } + close(doneCh) + }() + + select { + case <-time.After(time.Second * 10): + close(doneCh) + return fmt.Errorf("failed to receive all expected messages: %+v", wantEventIDs) + case <-doneCh: + } + log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID) return nil }