diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 91bbb6873..500403ae4 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -255,7 +255,7 @@ func (m *DendriteMonolith) Start() { m.logger.SetOutput(BindLogger{}) logrus.SetOutput(BindLogger{}) - m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) + m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"}) m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter) m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil) diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index ff9cb5aa1..99e597e59 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -151,7 +151,7 @@ func main() { base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck - pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) + pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) pManager := pineconeConnections.NewConnectionManager(pRouter, nil) diff --git a/cmd/dendrite-upgrade-tests/tests.go b/cmd/dendrite-upgrade-tests/tests.go index ff1e09dda..5c9589df2 100644 --- a/cmd/dendrite-upgrade-tests/tests.go +++ b/cmd/dendrite-upgrade-tests/tests.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" @@ -81,11 +82,14 @@ func runTests(baseURL, branchName string) error { client: users[1].client, text: "4: " + branchName, }, } + wantEventIDs := make(map[string]struct{}, 8) for _, msg := range msgs { - _, err = msg.client.SendText(dmRoomID, msg.text) + var resp *gomatrix.RespSendEvent + resp, err = msg.client.SendText(dmRoomID, msg.text) if err != nil { return fmt.Errorf("failed to send text in dm room: %s", err) } + wantEventIDs[resp.EventID] = struct{}{} } // attempt to create/join the shared public room @@ -113,11 +117,48 @@ func runTests(baseURL, branchName string) error { } // send messages for _, msg := range msgs { - _, err = msg.client.SendText(publicRoomID, "public "+msg.text) + resp, err := msg.client.SendText(publicRoomID, "public "+msg.text) if err != nil { return fmt.Errorf("failed to send text in public room: %s", err) } + wantEventIDs[resp.EventID] = struct{}{} } + + // Sync until we have all expected messages + doneCh := make(chan struct{}) + go func() { + syncClient := users[0].client + since := "" + for len(wantEventIDs) > 0 { + select { + case <-doneCh: + return + default: + } + syncResp, err := syncClient.SyncRequest(1000, since, "1", false, "") + if err != nil { + continue + } + for _, room := range syncResp.Rooms.Join { + for _, ev := range room.Timeline.Events { + if ev.Type != "m.room.message" { + continue + } + delete(wantEventIDs, ev.ID) + } + } + since = syncResp.NextBatch + } + close(doneCh) + }() + + select { + case <-time.After(time.Second * 10): + close(doneCh) + return fmt.Errorf("failed to receive all expected messages: %+v", wantEventIDs) + case <-doneCh: + } + log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID) return nil } diff --git a/go.mod b/go.mod index d151b2995..263a0065d 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a - github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 + github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 github.com/nats-io/nats-server/v2 v2.9.0 diff --git a/go.sum b/go.sum index 3cad3f530..abdc11665 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5d github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a h1:XeGBDZZsUe4kgj3myl0EiuDNVWxszJecMTrON3Wn9sI= github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= -github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 h1:XuJYAJNkdG3zj9cO0yQSvL+Sp2xogsTOuZRx7PwdtoA= -github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= +github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 h1:/AIaqhK1BBi2sMEVQdgZRV8H8sNloAGCgztLZhsPqD0= +github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 205a33e83..a223820ef 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -264,16 +264,27 @@ func (u *latestEventsUpdater) latestState() error { return fmt.Errorf("roomState.CalculateAndStoreStateAfterEvents: %w", err) } - // Now that we have a new state snapshot based on the latest events, - // we can compare that new snapshot to the previous one and see what - // has changed. This gives us one list of removed state events and - // another list of added ones. Replacing a value for a state-key tuple - // will result one removed (the old event) and one added (the new event). - u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots( - ctx, u.oldStateNID, u.newStateNID, - ) - if err != nil { - return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err) + // Include information about what changed in the state transition. If the + // event rewrites the state (i.e. is a federated join) then we will simply + // include the entire state snapshot as added events, as the "RewritesState" + // flag in the output event signals downstream components to purge their + // room state first. If it doesn't rewrite the state then we will work out + // what the difference is between the state snapshots and send that. In all + // cases where a state event is being replaced, the old state event will + // appear in "removed" and the replacement will appear in "added". + if u.rewritesState { + u.removed = []types.StateEntry{} + u.added, err = roomState.LoadStateAtSnapshot(ctx, u.newStateNID) + if err != nil { + return fmt.Errorf("roomState.LoadStateAtSnapshot: %w", err) + } + } else { + u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots( + ctx, u.oldStateNID, u.newStateNID, + ) + if err != nil { + return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err) + } } if removed := len(u.removed) - len(u.added); !u.rewritesState && removed > 0 {