diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 91bbb6873..500403ae4 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -255,7 +255,7 @@ func (m *DendriteMonolith) Start() { m.logger.SetOutput(BindLogger{}) logrus.SetOutput(BindLogger{}) - m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) + m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"}) m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter) m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil) diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index ff9cb5aa1..99e597e59 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -151,7 +151,7 @@ func main() { base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck - pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) + pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) pManager := pineconeConnections.NewConnectionManager(pRouter, nil) diff --git a/cmd/dendrite-upgrade-tests/tests.go b/cmd/dendrite-upgrade-tests/tests.go index ff1e09dda..5c9589df2 100644 --- a/cmd/dendrite-upgrade-tests/tests.go +++ b/cmd/dendrite-upgrade-tests/tests.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" @@ -81,11 +82,14 @@ func runTests(baseURL, branchName string) error { client: users[1].client, text: "4: " + branchName, }, } + wantEventIDs := make(map[string]struct{}, 8) for _, msg := range msgs { - _, err = msg.client.SendText(dmRoomID, msg.text) + var resp *gomatrix.RespSendEvent + resp, err = msg.client.SendText(dmRoomID, msg.text) if err != nil { return fmt.Errorf("failed to send text in dm room: %s", err) } + wantEventIDs[resp.EventID] = struct{}{} } // attempt to create/join the shared public room @@ -113,11 +117,48 @@ func runTests(baseURL, branchName string) error { } // send messages for _, msg := range msgs { - _, err = msg.client.SendText(publicRoomID, "public "+msg.text) + resp, err := msg.client.SendText(publicRoomID, "public "+msg.text) if err != nil { return fmt.Errorf("failed to send text in public room: %s", err) } + wantEventIDs[resp.EventID] = struct{}{} } + + // Sync until we have all expected messages + doneCh := make(chan struct{}) + go func() { + syncClient := users[0].client + since := "" + for len(wantEventIDs) > 0 { + select { + case <-doneCh: + return + default: + } + syncResp, err := syncClient.SyncRequest(1000, since, "1", false, "") + if err != nil { + continue + } + for _, room := range syncResp.Rooms.Join { + for _, ev := range room.Timeline.Events { + if ev.Type != "m.room.message" { + continue + } + delete(wantEventIDs, ev.ID) + } + } + since = syncResp.NextBatch + } + close(doneCh) + }() + + select { + case <-time.After(time.Second * 10): + close(doneCh) + return fmt.Errorf("failed to receive all expected messages: %+v", wantEventIDs) + case <-doneCh: + } + log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID) return nil } diff --git a/go.mod b/go.mod index d151b2995..263a0065d 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a - github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 + github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 github.com/nats-io/nats-server/v2 v2.9.0 diff --git a/go.sum b/go.sum index 3cad3f530..abdc11665 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5d github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a h1:XeGBDZZsUe4kgj3myl0EiuDNVWxszJecMTrON3Wn9sI= github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= -github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 h1:XuJYAJNkdG3zj9cO0yQSvL+Sp2xogsTOuZRx7PwdtoA= -github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= +github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 h1:/AIaqhK1BBi2sMEVQdgZRV8H8sNloAGCgztLZhsPqD0= +github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 32d63bb51..aa7dc4735 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -439,6 +439,7 @@ type QueryMembershipAtEventRequest struct { // QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest. type QueryMembershipAtEventResponse struct { - // Memberships is a map from eventID to a list of events (if any). + // Memberships is a map from eventID to a list of events (if any). Events that + // do not have known state will return an empty array here. Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"` } diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index a8a3e0248..c47793f0a 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -176,7 +176,8 @@ func (r *Inputer) Start() error { }, nats.HeadersOnly(), nats.DeliverAll(), - nats.AckAll(), + nats.AckExplicit(), + nats.ReplayInstant(), nats.BindStream(r.InputRoomEventTopic), ) return err diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 6dce2bc3e..b41a92e94 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -106,7 +106,7 @@ func (r *Queryer) QueryStateAfterEvents( return err } - if len(request.PrevEventIDs) > 1 && len(request.StateToFetch) == 0 { + if len(request.PrevEventIDs) > 1 { var authEventIDs []string for _, e := range stateEvents { authEventIDs = append(authEventIDs, e.AuthEventIDs()...) @@ -208,6 +208,9 @@ func (r *Queryer) QueryMembershipForUser( return err } +// QueryMembershipAtEvent returns the known memberships at a given event. +// If the state before an event is not known, an empty list will be returned +// for that event instead. func (r *Queryer) QueryMembershipAtEvent( ctx context.Context, request *api.QueryMembershipAtEventRequest, @@ -237,7 +240,11 @@ func (r *Queryer) QueryMembershipAtEvent( } for _, eventID := range request.EventIDs { - stateEntry := stateEntries[eventID] + stateEntry, ok := stateEntries[eventID] + if !ok { + response.Memberships[eventID] = []*gomatrixserverlib.HeaderedEvent{} + continue + } memberships, err := helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false) if err != nil { return fmt.Errorf("unable to get memberships at state: %w", err) diff --git a/roomserver/state/state.go b/roomserver/state/state.go index a40a2e9ba..cb96d83ec 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -18,6 +18,7 @@ package state import ( "context" + "database/sql" "fmt" "sort" "sync" @@ -134,11 +135,14 @@ func (v *StateResolution) LoadMembershipAtEvent( for i := range eventIDs { eventID := eventIDs[i] snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID) - if err != nil { + if err != nil && err != sql.ErrNoRows { return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err) } if snapshotNID == 0 { - return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID) + // If we don't know a state snapshot for this event then we can't calculate + // memberships at the time of the event, so skip over it. This means that + // it isn't guaranteed that the response map will contain every single event. + continue } snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID) } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index ffcf64df6..0ab6de886 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -388,7 +388,7 @@ func applyHistoryVisibilityFilter( if err != nil { // Not a fatal error, we can continue without the stateEvents, // they are only needed if there are state events in the timeline. - logrus.WithError(err).Warnf("failed to get current room state") + logrus.WithError(err).Warnf("Failed to get current room state for history visibility") } alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents)) for _, ev := range stateEvents { @@ -397,7 +397,6 @@ func applyHistoryVisibilityFilter( startTime := time.Now() events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync") if err != nil { - return nil, err } logrus.WithFields(logrus.Fields{ @@ -405,7 +404,7 @@ func applyHistoryVisibilityFilter( "room_id": roomID, "before": len(recentEvents), "after": len(events), - }).Debug("applied history visibility (sync)") + }).Trace("Applied history visibility (sync)") return events, nil }