diff --git a/go.mod b/go.mod index 3559c5bb1..79ef5c3e8 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20220725104114-b6003e522771 + github.com/matrix-org/gomatrixserverlib v0.0.0-20220801083850-5ff38e2c2839 github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.13 @@ -34,7 +34,7 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.2 - github.com/sirupsen/logrus v1.8.1 + github.com/sirupsen/logrus v1.9.0 github.com/stretchr/testify v1.7.1 github.com/tidwall/gjson v1.14.1 github.com/tidwall/sjson v1.2.4 @@ -42,7 +42,7 @@ require ( github.com/uber/jaeger-lib v2.4.1+incompatible github.com/yggdrasil-network/yggdrasil-go v0.4.3 go.uber.org/atomic v1.9.0 - golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e + golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9 golang.org/x/mobile v0.0.0-20220518205345-8578da9835fd golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e @@ -99,7 +99,7 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect - golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + golang.org/x/sys v0.0.0-20220731174439-a90be440212d // indirect golang.org/x/text v0.3.8-0.20211004125949-5bd84dd9b33b // indirect golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect golang.org/x/tools v0.1.10 // indirect diff --git a/go.sum b/go.sum index 2c8bb4f18..9d5c50d2c 100644 --- a/go.sum +++ b/go.sum @@ -343,8 +343,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220725104114-b6003e522771 h1:ZIPHFIPNDS9dmEbPEiJbNmyCGJtn9exfpLC7JOcn/bE= -github.com/matrix-org/gomatrixserverlib v0.0.0-20220725104114-b6003e522771/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220801083850-5ff38e2c2839 h1:QEFxKWH8PlEt3ZQKl31yJNAm8lvpNUwT51IMNTl9v1k= +github.com/matrix-org/gomatrixserverlib v0.0.0-20220801083850-5ff38e2c2839/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9 h1:ed8yvWhTLk7+sNeK/eOZRTvESFTOHDRevoRoyeqPtvY= github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9/go.mod h1:P4MqPf+u83OPulPJ+XTbSDbbWrdFYNY4LZ/B1PIduFE= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= @@ -493,8 +493,8 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= -github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= @@ -569,8 +569,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -748,8 +748,10 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211102192858-4dd72447c267/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220731174439-a90be440212d h1:Sv5ogFZatcgIMMtBSTTAgMYsicp25MXBubjXNDKwm80= +golang.org/x/sys v0.0.0-20220731174439-a90be440212d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20220526004731-065cf7ba2467 h1:CBpWXWQpIRjzmkkA+M7q9Fqnwd2mZr3AFqexg8YTfoM= diff --git a/roomserver/api/api.go b/roomserver/api/api.go index ee0212ecf..baf63aa31 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -97,6 +97,14 @@ type SyncRoomserverAPI interface { req *PerformBackfillRequest, res *PerformBackfillResponse, ) error + + // QueryMembershipAtEvent queries the memberships at the given events. + // Returns a map from eventID to a slice of gomatrixserverlib.HeaderedEvent. + QueryMembershipAtEvent( + ctx context.Context, + request *QueryMembershipAtEventRequest, + response *QueryMembershipAtEventResponse, + ) error } type AppserviceRoomserverAPI interface { diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go index 18a617331..8bef35379 100644 --- a/roomserver/api/api_trace.go +++ b/roomserver/api/api_trace.go @@ -382,6 +382,16 @@ func (t *RoomserverInternalAPITrace) QueryRestrictedJoinAllowed( return err } +func (t *RoomserverInternalAPITrace) QueryMembershipAtEvent( + ctx context.Context, + request *QueryMembershipAtEventRequest, + response *QueryMembershipAtEventResponse, +) error { + err := t.Impl.QueryMembershipAtEvent(ctx, request, response) + util.GetLogger(ctx).WithError(err).Infof("QueryMembershipAtEvent req=%+v res=%+v", js(request), js(response)) + return err +} + func js(thing interface{}) string { b, err := json.Marshal(thing) if err != nil { diff --git a/roomserver/api/query.go b/roomserver/api/query.go index f157a9025..c8e6f9dc6 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -427,3 +427,17 @@ func (r *QueryCurrentStateResponse) UnmarshalJSON(data []byte) error { } return nil } + +// QueryMembershipAtEventRequest requests the membership events for a user +// for a list of eventIDs. +type QueryMembershipAtEventRequest struct { + RoomID string + EventIDs []string + UserID string +} + +// QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest. +type QueryMembershipAtEventResponse struct { + // Memberships is a map from eventID to a list of events (if any). + Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"` +} diff --git a/roomserver/internal/helpers/helpers.go b/roomserver/internal/helpers/helpers.go index d61aa08cb..6091f8ec2 100644 --- a/roomserver/internal/helpers/helpers.go +++ b/roomserver/internal/helpers/helpers.go @@ -208,6 +208,12 @@ func StateBeforeEvent(ctx context.Context, db storage.Database, info *types.Room return roomState.LoadCombinedStateAfterEvents(ctx, prevState) } +func MembershipAtEvent(ctx context.Context, db storage.Database, info *types.RoomInfo, eventIDs []string, stateKeyNID types.EventStateKeyNID) (map[string][]types.StateEntry, error) { + roomState := state.NewStateResolution(db, info) + // Fetch the state as it was when this event was fired + return roomState.LoadMembershipAtEvent(ctx, eventIDs, stateKeyNID) +} + func LoadEvents( ctx context.Context, db storage.Database, eventNIDs []types.EventNID, ) ([]*gomatrixserverlib.Event, error) { diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 866670d7a..81541260c 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -299,7 +299,7 @@ func (r *Inputer) processRoomEvent( // allowed at the time, and also to get the history visibility. We won't // bother doing this if the event was already rejected as it just ends up // burning CPU time. - historyVisibility := gomatrixserverlib.HistoryVisibilityJoined // Default to restrictive. + historyVisibility := gomatrixserverlib.HistoryVisibilityShared // Default to shared. if rejectionErr == nil && !isRejected && !softfail { var err error historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev) @@ -429,7 +429,7 @@ func (r *Inputer) processStateBefore( input *api.InputRoomEvent, missingPrev bool, ) (historyVisibility gomatrixserverlib.HistoryVisibility, rejectionErr error, err error) { - historyVisibility = gomatrixserverlib.HistoryVisibilityJoined // Default to restrictive. + historyVisibility = gomatrixserverlib.HistoryVisibilityShared // Default to shared. event := input.Event.Unwrap() isCreateEvent := event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") var stateBeforeEvent []*gomatrixserverlib.Event diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 5ba59b8f5..c41e1ea67 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -204,6 +204,54 @@ func (r *Queryer) QueryMembershipForUser( return err } +func (r *Queryer) QueryMembershipAtEvent( + ctx context.Context, + request *api.QueryMembershipAtEventRequest, + response *api.QueryMembershipAtEventResponse, +) error { + response.Memberships = make(map[string][]*gomatrixserverlib.HeaderedEvent) + info, err := r.DB.RoomInfo(ctx, request.RoomID) + if err != nil { + return fmt.Errorf("unable to get roomInfo: %w", err) + } + if info == nil { + return fmt.Errorf("no roomInfo found") + } + + // get the users stateKeyNID + stateKeyNIDs, err := r.DB.EventStateKeyNIDs(ctx, []string{request.UserID}) + if err != nil { + return fmt.Errorf("unable to get stateKeyNIDs for %s: %w", request.UserID, err) + } + if _, ok := stateKeyNIDs[request.UserID]; !ok { + return fmt.Errorf("requested stateKeyNID for %s was not found", request.UserID) + } + + stateEntries, err := helpers.MembershipAtEvent(ctx, r.DB, info, request.EventIDs, stateKeyNIDs[request.UserID]) + if err != nil { + return fmt.Errorf("unable to get state before event: %w", err) + } + + for _, eventID := range request.EventIDs { + stateEntry := stateEntries[eventID] + memberships, err := helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false) + if err != nil { + return fmt.Errorf("unable to get memberships at state: %w", err) + } + res := make([]*gomatrixserverlib.HeaderedEvent, 0, len(memberships)) + + for i := range memberships { + ev := memberships[i] + if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(request.UserID) { + res = append(res, ev.Headered(info.RoomVersion)) + } + } + response.Memberships[eventID] = res + } + + return nil +} + // QueryMembershipsForRoom implements api.RoomserverInternalAPI func (r *Queryer) QueryMembershipsForRoom( ctx context.Context, diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index d16f67c69..e9387bd97 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -5,14 +5,14 @@ import ( "errors" "net/http" + "github.com/matrix-org/gomatrixserverlib" + asAPI "github.com/matrix-org/dendrite/appservice/api" fsInputAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" - - "github.com/matrix-org/gomatrixserverlib" ) const ( @@ -61,6 +61,7 @@ const ( RoomserverQueryServerBannedFromRoomPath = "/roomserver/queryServerBannedFromRoom" RoomserverQueryAuthChainPath = "/roomserver/queryAuthChain" RoomserverQueryRestrictedJoinAllowed = "/roomserver/queryRestrictedJoinAllowed" + RoomserverQueryMembershipAtEventPath = "/roomserver/queryMembershipAtEvent" ) type httpRoomserverInternalAPI struct { @@ -529,3 +530,10 @@ func (h *httpRoomserverInternalAPI) PerformForget( ) } + +func (h *httpRoomserverInternalAPI) QueryMembershipAtEvent(ctx context.Context, request *api.QueryMembershipAtEventRequest, response *api.QueryMembershipAtEventResponse) error { + return httputil.CallInternalRPCAPI( + "QueryMembershiptAtEvent", h.roomserverURL+RoomserverQueryMembershipAtEventPath, + h.httpClient, ctx, request, response, + ) +} diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go index e325d76a5..3b688174a 100644 --- a/roomserver/inthttp/server.go +++ b/roomserver/inthttp/server.go @@ -2,6 +2,7 @@ package inthttp import ( "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/roomserver/api" ) @@ -193,4 +194,8 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) { RoomserverQueryRestrictedJoinAllowed, httputil.MakeInternalRPCAPI("RoomserverQueryRestrictedJoinAllowed", r.QueryRestrictedJoinAllowed), ) + internalAPIMux.Handle( + RoomserverQueryMembershipAtEventPath, + httputil.MakeInternalRPCAPI("RoomserverQueryMembershipAtEventPath", r.QueryMembershipAtEvent), + ) } diff --git a/roomserver/state/state.go b/roomserver/state/state.go index ca0c69f27..a40a2e9ba 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -23,12 +23,11 @@ import ( "sync" "time" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" - - "github.com/matrix-org/dendrite/roomserver/types" - "github.com/matrix-org/gomatrixserverlib" ) type StateResolutionStorage interface { @@ -124,6 +123,61 @@ func (v *StateResolution) LoadStateAtEvent( return stateEntries, nil } +func (v *StateResolution) LoadMembershipAtEvent( + ctx context.Context, eventIDs []string, stateKeyNID types.EventStateKeyNID, +) (map[string][]types.StateEntry, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadMembershipAtEvent") + defer span.Finish() + + // De-dupe snapshotNIDs + snapshotNIDMap := make(map[types.StateSnapshotNID][]string) // map from snapshot NID to eventIDs + for i := range eventIDs { + eventID := eventIDs[i] + snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID) + if err != nil { + return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err) + } + if snapshotNID == 0 { + return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID) + } + snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID) + } + + snapshotNIDs := make([]types.StateSnapshotNID, 0, len(snapshotNIDMap)) + for nid := range snapshotNIDMap { + snapshotNIDs = append(snapshotNIDs, nid) + } + + stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, snapshotNIDs) + if err != nil { + return nil, err + } + + result := make(map[string][]types.StateEntry) + for _, stateBlockNIDList := range stateBlockNIDLists { + // Query the membership event for the user at the given stateblocks + stateEntryLists, err := v.db.StateEntriesForTuples(ctx, stateBlockNIDList.StateBlockNIDs, []types.StateKeyTuple{ + { + EventTypeNID: types.MRoomMemberNID, + EventStateKeyNID: stateKeyNID, + }, + }) + if err != nil { + return nil, err + } + + evIDs := snapshotNIDMap[stateBlockNIDList.StateSnapshotNID] + + for _, evID := range evIDs { + for _, x := range stateEntryLists { + result[evID] = append(result[evID], x.StateEntries...) + } + } + } + + return result, nil +} + // LoadStateAtEvent loads the full state of a room before a particular event. func (v *StateResolution) LoadStateAtEventForHistoryVisibility( ctx context.Context, eventID string, diff --git a/syncapi/internal/history_visibility.go b/syncapi/internal/history_visibility.go new file mode 100644 index 000000000..e73c004e5 --- /dev/null +++ b/syncapi/internal/history_visibility.go @@ -0,0 +1,217 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "math" + "time" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/gomatrixserverlib" + "github.com/prometheus/client_golang/prometheus" + "github.com/tidwall/gjson" +) + +func init() { + prometheus.MustRegister(calculateHistoryVisibilityDuration) +} + +// calculateHistoryVisibilityDuration stores the time it takes to +// calculate the history visibility. In polylith mode the roundtrip +// to the roomserver is included in this time. +var calculateHistoryVisibilityDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dendrite", + Subsystem: "syncapi", + Name: "calculateHistoryVisibility_duration_millis", + Help: "How long it takes to calculate the history visibility", + Buckets: []float64{ // milliseconds + 5, 10, 25, 50, 75, 100, 250, 500, + 1000, 2000, 3000, 4000, 5000, 6000, + 7000, 8000, 9000, 10000, 15000, 20000, + }, + }, + []string{"api"}, +) + +var historyVisibilityPriority = map[gomatrixserverlib.HistoryVisibility]uint8{ + gomatrixserverlib.WorldReadable: 0, + gomatrixserverlib.HistoryVisibilityShared: 1, + gomatrixserverlib.HistoryVisibilityInvited: 2, + gomatrixserverlib.HistoryVisibilityJoined: 3, +} + +// eventVisibility contains the history visibility and membership state at a given event +type eventVisibility struct { + visibility gomatrixserverlib.HistoryVisibility + membershipAtEvent string + membershipCurrent string +} + +// allowed checks the eventVisibility if the user is allowed to see the event. +// Rules as defined by https://spec.matrix.org/v1.3/client-server-api/#server-behaviour-5 +func (ev eventVisibility) allowed() (allowed bool) { + switch ev.visibility { + case gomatrixserverlib.HistoryVisibilityWorldReadable: + // If the history_visibility was set to world_readable, allow. + return true + case gomatrixserverlib.HistoryVisibilityJoined: + // If the user’s membership was join, allow. + if ev.membershipAtEvent == gomatrixserverlib.Join { + return true + } + return false + case gomatrixserverlib.HistoryVisibilityShared: + // If the user’s membership was join, allow. + // If history_visibility was set to shared, and the user joined the room at any point after the event was sent, allow. + if ev.membershipAtEvent == gomatrixserverlib.Join || ev.membershipCurrent == gomatrixserverlib.Join { + return true + } + return false + case gomatrixserverlib.HistoryVisibilityInvited: + // If the user’s membership was join, allow. + if ev.membershipAtEvent == gomatrixserverlib.Join { + return true + } + if ev.membershipAtEvent == gomatrixserverlib.Invite { + return true + } + return false + default: + return false + } +} + +// ApplyHistoryVisibilityFilter applies the room history visibility filter on gomatrixserverlib.HeaderedEvents. +// Returns the filtered events and an error, if any. +func ApplyHistoryVisibilityFilter( + ctx context.Context, + syncDB storage.Database, + rsAPI api.SyncRoomserverAPI, + events []*gomatrixserverlib.HeaderedEvent, + alwaysIncludeEventIDs map[string]struct{}, + userID, endpoint string, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + if len(events) == 0 { + return events, nil + } + start := time.Now() + + // try to get the current membership of the user + membershipCurrent, _, err := syncDB.SelectMembershipForUser(ctx, events[0].RoomID(), userID, math.MaxInt64) + if err != nil { + return nil, err + } + + // Get the mapping from eventID -> eventVisibility + eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events)) + visibilities, err := visibilityForEvents(ctx, rsAPI, events, userID, events[0].RoomID()) + if err != nil { + return eventsFiltered, err + } + for _, ev := range events { + evVis := visibilities[ev.EventID()] + evVis.membershipCurrent = membershipCurrent + // Always include specific state events for /sync responses + if alwaysIncludeEventIDs != nil { + if _, ok := alwaysIncludeEventIDs[ev.EventID()]; ok { + eventsFiltered = append(eventsFiltered, ev) + continue + } + } + // NOTSPEC: Always allow user to see their own membership events (spec contains more "rules") + if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(userID) { + eventsFiltered = append(eventsFiltered, ev) + continue + } + // Always allow history evVis events on boundaries. This is done + // by setting the effective evVis to the least restrictive + // of the old vs new. + // https://spec.matrix.org/v1.3/client-server-api/#server-behaviour-5 + if hisVis, err := ev.HistoryVisibility(); err == nil { + prevHisVis := gjson.GetBytes(ev.Unsigned(), "prev_content.history_visibility").String() + oldPrio, ok := historyVisibilityPriority[gomatrixserverlib.HistoryVisibility(prevHisVis)] + // if we can't get the previous history visibility, default to shared. + if !ok { + oldPrio = historyVisibilityPriority[gomatrixserverlib.HistoryVisibilityShared] + } + // no OK check, since this should have been validated when setting the value + newPrio := historyVisibilityPriority[hisVis] + if oldPrio < newPrio { + evVis.visibility = gomatrixserverlib.HistoryVisibility(prevHisVis) + } + } + // do the actual check + allowed := evVis.allowed() + if allowed { + eventsFiltered = append(eventsFiltered, ev) + } + } + calculateHistoryVisibilityDuration.With(prometheus.Labels{"api": endpoint}).Observe(float64(time.Since(start).Milliseconds())) + return eventsFiltered, nil +} + +// visibilityForEvents returns a map from eventID to eventVisibility containing the visibility and the membership +// of `userID` at the given event. +// Returns an error if the roomserver can't calculate the memberships. +func visibilityForEvents( + ctx context.Context, + rsAPI api.SyncRoomserverAPI, + events []*gomatrixserverlib.HeaderedEvent, + userID, roomID string, +) (map[string]eventVisibility, error) { + eventIDs := make([]string, len(events)) + for i := range events { + eventIDs[i] = events[i].EventID() + } + + result := make(map[string]eventVisibility, len(eventIDs)) + + // get the membership events for all eventIDs + membershipResp := &api.QueryMembershipAtEventResponse{} + err := rsAPI.QueryMembershipAtEvent(ctx, &api.QueryMembershipAtEventRequest{ + RoomID: roomID, + EventIDs: eventIDs, + UserID: userID, + }, membershipResp) + if err != nil { + return result, err + } + + // Create a map from eventID -> eventVisibility + for _, event := range events { + eventID := event.EventID() + vis := eventVisibility{ + membershipAtEvent: gomatrixserverlib.Leave, // default to leave, to not expose events by accident + visibility: event.Visibility, + } + membershipEvs, ok := membershipResp.Memberships[eventID] + if !ok { + result[eventID] = vis + continue + } + for _, ev := range membershipEvs { + membership, err := ev.Membership() + if err != nil { + return result, err + } + vis.membershipAtEvent = membership + } + result[eventID] = vis + } + return result, nil +} diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index f6b4d15e0..13c4e9d89 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -21,10 +21,12 @@ import ( "fmt" "net/http" "strconv" + "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/caching" roomserver "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -95,24 +97,6 @@ func Context( ContainsURL: filter.ContainsURL, } - // TODO: Get the actual state at the last event returned by SelectContextAfterEvent - state, _ := syncDB.CurrentState(ctx, roomID, &stateFilter, nil) - // verify the user is allowed to see the context for this room/event - for _, x := range state { - var hisVis gomatrixserverlib.HistoryVisibility - hisVis, err = x.HistoryVisibility() - if err != nil { - continue - } - allowed := hisVis == gomatrixserverlib.WorldReadable || membershipRes.Membership == gomatrixserverlib.Join - if !allowed { - return util.JSONResponse{ - Code: http.StatusForbidden, - JSON: jsonerror.Forbidden("User is not allowed to query context"), - } - } - } - id, requestedEvent, err := syncDB.SelectContextEvent(ctx, roomID, eventID) if err != nil { if err == sql.ErrNoRows { @@ -125,6 +109,24 @@ func Context( return jsonerror.InternalServerError() } + // verify the user is allowed to see the context for this room/event + startTime := time.Now() + filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, []*gomatrixserverlib.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context") + if err != nil { + logrus.WithError(err).Error("unable to apply history visibility filter") + return jsonerror.InternalServerError() + } + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": roomID, + }).Debug("applied history visibility (context)") + if len(filteredEvents) == 0 { + return util.JSONResponse{ + Code: http.StatusForbidden, + JSON: jsonerror.Forbidden("User is not allowed to query context"), + } + } + eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, roomID, filter) if err != nil && err != sql.ErrNoRows { logrus.WithError(err).Error("unable to fetch before events") @@ -137,8 +139,27 @@ func Context( return jsonerror.InternalServerError() } - eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBefore, gomatrixserverlib.FormatAll) - eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatAll) + startTime = time.Now() + eventsBeforeFiltered, eventsAfterFiltered, err := applyHistoryVisibilityOnContextEvents(ctx, syncDB, rsAPI, eventsBefore, eventsAfter, device.UserID) + if err != nil { + logrus.WithError(err).Error("unable to apply history visibility filter") + return jsonerror.InternalServerError() + } + + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": roomID, + }).Debug("applied history visibility (context eventsBefore/eventsAfter)") + + // TODO: Get the actual state at the last event returned by SelectContextAfterEvent + state, err := syncDB.CurrentState(ctx, roomID, &stateFilter, nil) + if err != nil { + logrus.WithError(err).Error("unable to fetch current room state") + return jsonerror.InternalServerError() + } + + eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBeforeFiltered, gomatrixserverlib.FormatAll) + eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfterFiltered, gomatrixserverlib.FormatAll) newState := applyLazyLoadMembers(device, filter, eventsAfterClient, eventsBeforeClient, state, lazyLoadCache) response := ContextRespsonse{ @@ -162,6 +183,44 @@ func Context( } } +// applyHistoryVisibilityOnContextEvents is a helper function to avoid roundtrips to the roomserver +// by combining the events before and after the context event. Returns the filtered events, +// and an error, if any. +func applyHistoryVisibilityOnContextEvents( + ctx context.Context, syncDB storage.Database, rsAPI roomserver.SyncRoomserverAPI, + eventsBefore, eventsAfter []*gomatrixserverlib.HeaderedEvent, + userID string, +) (filteredBefore, filteredAfter []*gomatrixserverlib.HeaderedEvent, err error) { + eventIDsBefore := make(map[string]struct{}, len(eventsBefore)) + eventIDsAfter := make(map[string]struct{}, len(eventsAfter)) + + // Remember before/after eventIDs, so we can restore them + // after applying history visibility checks + for _, ev := range eventsBefore { + eventIDsBefore[ev.EventID()] = struct{}{} + } + for _, ev := range eventsAfter { + eventIDsAfter[ev.EventID()] = struct{}{} + } + + allEvents := append(eventsBefore, eventsAfter...) + filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, allEvents, nil, userID, "context") + if err != nil { + return nil, nil, err + } + + // "Restore" events in the correct context + for _, ev := range filteredEvents { + if _, ok := eventIDsBefore[ev.EventID()]; ok { + filteredBefore = append(filteredBefore, ev) + } + if _, ok := eventIDsAfter[ev.EventID()]; ok { + filteredAfter = append(filteredAfter, ev) + } + } + return filteredBefore, filteredAfter, nil +} + func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { if len(startEvents) > 0 { start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID()) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index b4c9a5423..9db3d8e17 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -19,6 +19,7 @@ import ( "fmt" "net/http" "sort" + "time" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -28,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" @@ -324,6 +326,9 @@ func (r *messagesReq) retrieveEvents() ( // reliable way to define it), it would be easier and less troublesome to // only have to change it in one place, i.e. the database. start, end, err = r.getStartEnd(events) + if err != nil { + return []gomatrixserverlib.ClientEvent{}, *r.from, *r.to, err + } // Sort the events to ensure we send them in the right order. if r.backwardOrdering { @@ -337,97 +342,18 @@ func (r *messagesReq) retrieveEvents() ( } events = reversed(events) } - events = r.filterHistoryVisible(events) if len(events) == 0 { return []gomatrixserverlib.ClientEvent{}, *r.from, *r.to, nil } - // Convert all of the events into client events. - clientEvents = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll) - return clientEvents, start, end, err -} - -func (r *messagesReq) filterHistoryVisible(events []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent { - // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the - // user shouldn't see, we check the recent events and remove any prior to the join event of the user - // which is equiv to history_visibility: joined - joinEventIndex := -1 - for i, ev := range events { - if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(r.device.UserID) { - membership, _ := ev.Membership() - if membership == "join" { - joinEventIndex = i - break - } - } - } - - var result []*gomatrixserverlib.HeaderedEvent - var eventsToCheck []*gomatrixserverlib.HeaderedEvent - if joinEventIndex != -1 { - if r.backwardOrdering { - result = events[:joinEventIndex+1] - eventsToCheck = append(eventsToCheck, result[0]) - } else { - result = events[joinEventIndex:] - eventsToCheck = append(eventsToCheck, result[len(result)-1]) - } - } else { - eventsToCheck = []*gomatrixserverlib.HeaderedEvent{events[0], events[len(events)-1]} - result = events - } - // make sure the user was in the room for both the earliest and latest events, we need this because - // some backpagination results will not have the join event (e.g if they hit /messages at the join event itself) - wasJoined := true - for _, ev := range eventsToCheck { - var queryRes api.QueryStateAfterEventsResponse - err := r.rsAPI.QueryStateAfterEvents(r.ctx, &api.QueryStateAfterEventsRequest{ - RoomID: ev.RoomID(), - PrevEventIDs: ev.PrevEventIDs(), - StateToFetch: []gomatrixserverlib.StateKeyTuple{ - {EventType: gomatrixserverlib.MRoomMember, StateKey: r.device.UserID}, - {EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}, - }, - }, &queryRes) - if err != nil { - wasJoined = false - break - } - var hisVisEvent, membershipEvent *gomatrixserverlib.HeaderedEvent - for i := range queryRes.StateEvents { - switch queryRes.StateEvents[i].Type() { - case gomatrixserverlib.MRoomMember: - membershipEvent = queryRes.StateEvents[i] - case gomatrixserverlib.MRoomHistoryVisibility: - hisVisEvent = queryRes.StateEvents[i] - } - } - if hisVisEvent == nil { - return events // apply no filtering as it defaults to Shared. - } - hisVis, _ := hisVisEvent.HistoryVisibility() - if hisVis == "shared" || hisVis == "world_readable" { - return events // apply no filtering - } - if membershipEvent == nil { - wasJoined = false - break - } - membership, err := membershipEvent.Membership() - if err != nil { - wasJoined = false - break - } - if membership != "join" { - wasJoined = false - break - } - } - if !wasJoined { - util.GetLogger(r.ctx).WithField("num_events", len(events)).Warnf("%s was not joined to room during these events, omitting them", r.device.UserID) - return []*gomatrixserverlib.HeaderedEvent{} - } - return result + // Apply room history visibility filter + startTime := time.Now() + filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, r.rsAPI, events, nil, r.device.UserID, "messages") + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": r.roomID, + }).Debug("applied history visibility (messages)") + return gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), start, end, err } func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 9751670b2..43a75da95 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -161,6 +161,10 @@ type Database interface { IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error + // SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found + // returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty + // string as the membership. + SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) } type Presence interface { diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go index 29008adef..d68ed8d5f 100644 --- a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go +++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go @@ -17,7 +17,10 @@ package deltas import ( "context" "database/sql" + "encoding/json" "fmt" + + "github.com/matrix-org/gomatrixserverlib" ) func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.Tx) error { @@ -31,6 +34,27 @@ func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.T return nil } +// UpSetHistoryVisibility sets the history visibility for already stored events. +// Requires current_room_state and output_room_events to be created. +func UpSetHistoryVisibility(ctx context.Context, tx *sql.Tx) error { + // get the current room history visibilities + historyVisibilities, err := currentHistoryVisibilities(ctx, tx) + if err != nil { + return err + } + + // update the history visibility + for roomID, hisVis := range historyVisibilities { + _, err = tx.ExecContext(ctx, `UPDATE syncapi_output_room_events SET history_visibility = $1 + WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2 AND history_visibility <> $1`, hisVis, roomID) + if err != nil { + return fmt.Errorf("failed to update history visibility: %w", err) + } + } + + return nil +} + func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error { _, err := tx.ExecContext(ctx, ` ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2; @@ -39,9 +63,40 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T if err != nil { return fmt.Errorf("failed to execute upgrade: %w", err) } + return nil } +// currentHistoryVisibilities returns a map from roomID to current history visibility. +// If the history visibility was changed after room creation, defaults to joined. +func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gomatrixserverlib.HistoryVisibility, error) { + rows, err := tx.QueryContext(ctx, `SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state + WHERE type = 'm.room.history_visibility' AND state_key = ''; +`) + if err != nil { + return nil, fmt.Errorf("failed to query current room state: %w", err) + } + defer rows.Close() // nolint: errcheck + var eventBytes []byte + var roomID string + var event gomatrixserverlib.HeaderedEvent + var hisVis gomatrixserverlib.HistoryVisibility + historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility) + for rows.Next() { + if err = rows.Scan(&roomID, &eventBytes); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + if err = json.Unmarshal(eventBytes, &event); err != nil { + return nil, fmt.Errorf("failed to unmarshal event: %w", err) + } + historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityJoined + if hisVis, err = event.HistoryVisibility(); err == nil && event.Depth() < 10 { + historyVisibilities[roomID] = hisVis + } + } + return historyVisibilities, nil +} + func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error { _, err := tx.ExecContext(ctx, ` ALTER TABLE syncapi_output_room_events DROP COLUMN IF EXISTS history_visibility; diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go index 00223c57a..939d6b3f5 100644 --- a/syncapi/storage/postgres/memberships_table.go +++ b/syncapi/storage/postgres/memberships_table.go @@ -66,10 +66,14 @@ const selectMembershipCountSQL = "" + const selectHeroesSQL = "" + "SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership = ANY($3) LIMIT 5" +const selectMembershipBeforeSQL = "" + + "SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1" + type membershipsStatements struct { - upsertMembershipStmt *sql.Stmt - selectMembershipCountStmt *sql.Stmt - selectHeroesStmt *sql.Stmt + upsertMembershipStmt *sql.Stmt + selectMembershipCountStmt *sql.Stmt + selectHeroesStmt *sql.Stmt + selectMembershipForUserStmt *sql.Stmt } func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) { @@ -82,6 +86,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) { {&s.upsertMembershipStmt, upsertMembershipSQL}, {&s.selectMembershipCountStmt, selectMembershipCountSQL}, {&s.selectHeroesStmt, selectHeroesSQL}, + {&s.selectMembershipForUserStmt, selectMembershipBeforeSQL}, }.Prepare(db) } @@ -132,3 +137,20 @@ func (s *membershipsStatements) SelectHeroes( } return heroes, rows.Err() } + +// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found +// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty +// string as the membership. +func (s *membershipsStatements) SelectMembershipForUser( + ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64, +) (membership string, topologyPos int, err error) { + stmt := sqlutil.TxStmt(txn, s.selectMembershipForUserStmt) + err = stmt.QueryRowContext(ctx, roomID, userID, pos).Scan(&membership, &topologyPos) + if err != nil { + if err == sql.ErrNoRows { + return "leave", 0, nil + } + return "", 0, err + } + return membership, topologyPos, nil +} diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 34ff6700f..8f633640e 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -191,10 +191,12 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { } m := sqlutil.NewMigrator(db) - m.AddMigrations(sqlutil.Migration{ - Version: "syncapi: add history visibility column (output_room_events)", - Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, - }) + m.AddMigrations( + sqlutil.Migration{ + Version: "syncapi: add history visibility column (output_room_events)", + Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, + }, + ) err = m.Up(context.Background()) if err != nil { return nil, err diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index a044716ce..979ff6647 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/shared" ) @@ -97,6 +98,20 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if err != nil { return nil, err } + + // apply migrations which need multiple tables + m := sqlutil.NewMigrator(d.db) + m.AddMigrations( + sqlutil.Migration{ + Version: "syncapi: set history visibility for existing events", + Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created. + }, + ) + err = m.Up(base.Context()) + if err != nil { + return nil, err + } + d.Database = shared.Database{ DB: d.db, Writer: d.writer, diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 932bda164..a46e55256 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -231,7 +231,7 @@ func (d *Database) AddPeek( return } -// DeletePeeks tracks the fact that a user has stopped peeking from the specified +// DeletePeek tracks the fact that a user has stopped peeking from the specified // device. If the peeks was successfully deleted this returns the stream ID it was // stored at. Returns an error if there was a problem communicating with the database. func (d *Database) DeletePeek( @@ -372,6 +372,7 @@ func (d *Database) WriteEvent( ) (pduPosition types.StreamPosition, returnErr error) { returnErr = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { var err error + ev.Visibility = historyVisibility pos, err := d.OutputEvents.InsertEvent( ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, historyVisibility, ) @@ -563,7 +564,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda return err } -// Retrieve the backward topology position, i.e. the position of the +// GetBackwardTopologyPos retrieves the backward topology position, i.e. the position of the // oldest event in the room's topology. func (d *Database) GetBackwardTopologyPos( ctx context.Context, @@ -674,7 +675,7 @@ func (d *Database) fetchMissingStateEvents( return events, nil } -// getStateDeltas returns the state deltas between fromPos and toPos, +// GetStateDeltas returns the state deltas between fromPos and toPos, // exclusive of oldPos, inclusive of newPos, for the rooms in which // the user has new membership events. // A list of joined room IDs is also returned in case the caller needs it. @@ -812,7 +813,7 @@ func (d *Database) GetStateDeltas( return deltas, joinedRoomIDs, nil } -// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync +// GetStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync // requests with full_state=true. // Fetches full state for all joined rooms and uses selectStateInRange to get // updates for other rooms. @@ -1039,37 +1040,41 @@ func (d *Database) GetUserUnreadNotificationCounts(ctx context.Context, userID s return d.NotificationData.SelectUserUnreadCounts(ctx, userID, from, to) } -func (s *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) { - return s.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID) +func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) { + return d.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID) } -func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { - return s.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter) +func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { + return d.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter) } -func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) { - return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter) +func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) { + return d.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter) } -func (s *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) { - return s.Ignores.SelectIgnores(ctx, userID) +func (d *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) { + return d.Ignores.SelectIgnores(ctx, userID) } -func (s *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error { - return s.Ignores.UpsertIgnores(ctx, userID, ignores) +func (d *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error { + return d.Ignores.UpsertIgnores(ctx, userID, ignores) } -func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) { - return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync) +func (d *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) { + return d.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync) } -func (s *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) { - return s.Presence.GetPresenceForUser(ctx, nil, userID) +func (d *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) { + return d.Presence.GetPresenceForUser(ctx, nil, userID) } -func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) { - return s.Presence.GetPresenceAfter(ctx, nil, after, filter) +func (d *Database) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) { + return d.Presence.GetPresenceAfter(ctx, nil, after, filter) } -func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) { - return s.Presence.GetMaxPresenceID(ctx, nil) +func (d *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) { + return d.Presence.GetMaxPresenceID(ctx, nil) +} + +func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) { + return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos) } diff --git a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go index 079177217..d23f07566 100644 --- a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go +++ b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go @@ -17,7 +17,10 @@ package deltas import ( "context" "database/sql" + "encoding/json" "fmt" + + "github.com/matrix-org/gomatrixserverlib" ) func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.Tx) error { @@ -37,6 +40,27 @@ func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.T return nil } +// UpSetHistoryVisibility sets the history visibility for already stored events. +// Requires current_room_state and output_room_events to be created. +func UpSetHistoryVisibility(ctx context.Context, tx *sql.Tx) error { + // get the current room history visibilities + historyVisibilities, err := currentHistoryVisibilities(ctx, tx) + if err != nil { + return err + } + + // update the history visibility + for roomID, hisVis := range historyVisibilities { + _, err = tx.ExecContext(ctx, `UPDATE syncapi_output_room_events SET history_visibility = $1 + WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2 AND history_visibility <> $1`, hisVis, roomID) + if err != nil { + return fmt.Errorf("failed to update history visibility: %w", err) + } + } + + return nil +} + func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error { // SQLite doesn't have "if exists", so check if the column exists. If the query doesn't return an error, it already exists. // Required for unit tests, as otherwise a duplicate column error will show up. @@ -51,9 +75,40 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T if err != nil { return fmt.Errorf("failed to execute upgrade: %w", err) } + return nil } +// currentHistoryVisibilities returns a map from roomID to current history visibility. +// If the history visibility was changed after room creation, defaults to joined. +func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gomatrixserverlib.HistoryVisibility, error) { + rows, err := tx.QueryContext(ctx, `SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state + WHERE type = 'm.room.history_visibility' AND state_key = ''; +`) + if err != nil { + return nil, fmt.Errorf("failed to query current room state: %w", err) + } + defer rows.Close() // nolint: errcheck + var eventBytes []byte + var roomID string + var event gomatrixserverlib.HeaderedEvent + var hisVis gomatrixserverlib.HistoryVisibility + historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility) + for rows.Next() { + if err = rows.Scan(&roomID, &eventBytes); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + if err = json.Unmarshal(eventBytes, &event); err != nil { + return nil, fmt.Errorf("failed to unmarshal event: %w", err) + } + historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityJoined + if hisVis, err = event.HistoryVisibility(); err == nil && event.Depth() < 10 { + historyVisibilities[roomID] = hisVis + } + } + return historyVisibilities, nil +} + func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error { // SQLite doesn't have "if exists", so check if the column exists. _, err := tx.QueryContext(ctx, "SELECT history_visibility FROM syncapi_output_room_events LIMIT 1") diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go index e4daa99c1..0c966fca0 100644 --- a/syncapi/storage/sqlite3/memberships_table.go +++ b/syncapi/storage/sqlite3/memberships_table.go @@ -66,11 +66,15 @@ const selectMembershipCountSQL = "" + const selectHeroesSQL = "" + "SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership IN ($3) LIMIT 5" +const selectMembershipBeforeSQL = "" + + "SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1" + type membershipsStatements struct { db *sql.DB upsertMembershipStmt *sql.Stmt selectMembershipCountStmt *sql.Stmt //selectHeroesStmt *sql.Stmt - prepared at runtime due to variadic + selectMembershipForUserStmt *sql.Stmt } func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) { @@ -84,6 +88,7 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) { return s, sqlutil.StatementList{ {&s.upsertMembershipStmt, upsertMembershipSQL}, {&s.selectMembershipCountStmt, selectMembershipCountSQL}, + {&s.selectMembershipForUserStmt, selectMembershipBeforeSQL}, // {&s.selectHeroesStmt, selectHeroesSQL}, - prepared at runtime due to variadic }.Prepare(db) } @@ -148,3 +153,20 @@ func (s *membershipsStatements) SelectHeroes( } return heroes, rows.Err() } + +// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found +// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty +// string as the membership. +func (s *membershipsStatements) SelectMembershipForUser( + ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64, +) (membership string, topologyPos int, err error) { + stmt := sqlutil.TxStmt(txn, s.selectMembershipForUserStmt) + err = stmt.QueryRowContext(ctx, roomID, userID, pos).Scan(&membership, &topologyPos) + if err != nil { + if err == sql.ErrNoRows { + return "leave", 0, nil + } + return "", 0, err + } + return membership, topologyPos, nil +} diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index de389fa9b..91fd35b5b 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -139,10 +139,12 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even } m := sqlutil.NewMigrator(db) - m.AddMigrations(sqlutil.Migration{ - Version: "syncapi: add history visibility column (output_room_events)", - Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, - }) + m.AddMigrations( + sqlutil.Migration{ + Version: "syncapi: add history visibility column (output_room_events)", + Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, + }, + ) err = m.Up(context.Background()) if err != nil { return nil, err diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 5c5eb0f55..a84e2bd16 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -16,12 +16,14 @@ package sqlite3 import ( + "context" "database/sql" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/shared" + "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" ) // SyncServerDatasource represents a sync server datasource which manages @@ -41,13 +43,13 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil { return nil, err } - if err = d.prepare(); err != nil { + if err = d.prepare(base.Context()); err != nil { return nil, err } return &d, nil } -func (d *SyncServerDatasource) prepare() (err error) { +func (d *SyncServerDatasource) prepare(ctx context.Context) (err error) { if err = d.streamID.Prepare(d.db); err != nil { return err } @@ -107,6 +109,19 @@ func (d *SyncServerDatasource) prepare() (err error) { if err != nil { return err } + + // apply migrations which need multiple tables + m := sqlutil.NewMigrator(d.db) + m.AddMigrations( + sqlutil.Migration{ + Version: "syncapi: set history visibility for existing events", + Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created. + }, + ) + err = m.Up(ctx) + if err != nil { + return err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer, diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index eda5ef3e6..a62818e9b 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -12,20 +12,22 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/test" + "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/gomatrixserverlib" ) var ctx = context.Background() -func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { +func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func(), func()) { connStr, close := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewSyncServerDatasource(nil, &config.DatabaseOptions{ + base, closeBase := testrig.CreateBaseDendrite(t, dbType) + db, err := storage.NewSyncServerDatasource(base, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }) if err != nil { t.Fatalf("NewSyncServerDatasource returned %s", err) } - return db, close + return db, close, closeBase } func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) { @@ -51,8 +53,9 @@ func TestWriteEvents(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { alice := test.NewUser(t) r := test.NewRoom(t, alice) - db, close := MustCreateDatabase(t, dbType) + db, close, closeBase := MustCreateDatabase(t, dbType) defer close() + defer closeBase() MustWriteEvents(t, db, r.Events()) }) } @@ -60,8 +63,9 @@ func TestWriteEvents(t *testing.T) { // These tests assert basic functionality of RecentEvents for PDUs func TestRecentEventsPDU(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - db, close := MustCreateDatabase(t, dbType) + db, close, closeBase := MustCreateDatabase(t, dbType) defer close() + defer closeBase() alice := test.NewUser(t) // dummy room to make sure SQL queries are filtering on room ID MustWriteEvents(t, db, test.NewRoom(t, alice).Events()) @@ -163,8 +167,9 @@ func TestRecentEventsPDU(t *testing.T) { // The purpose of this test is to ensure that backfill does indeed go backwards, using a topology token func TestGetEventsInRangeWithTopologyToken(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - db, close := MustCreateDatabase(t, dbType) + db, close, closeBase := MustCreateDatabase(t, dbType) defer close() + defer closeBase() alice := test.NewUser(t) r := test.NewRoom(t, alice) for i := 0; i < 10; i++ { @@ -404,8 +409,9 @@ func TestSendToDeviceBehaviour(t *testing.T) { bob := test.NewUser(t) deviceID := "one" test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - db, close := MustCreateDatabase(t, dbType) + db, close, closeBase := MustCreateDatabase(t, dbType) defer close() + defer closeBase() // At this point there should be no messages. We haven't sent anything // yet. _, events, err := db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100) diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index d68351d4c..468d26aca 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -185,6 +185,7 @@ type Memberships interface { UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error) SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error) + SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) } type NotificationData interface { diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 1ad3adc4c..136cbea5a 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -10,10 +10,13 @@ import ( "github.com/matrix-org/dendrite/internal/caching" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/internal" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "go.uber.org/atomic" @@ -123,7 +126,7 @@ func (p *PDUStreamProvider) CompleteSync( defer reqWaitGroup.Done() jr, jerr := p.getJoinResponseForCompleteSync( - ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, + ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false, ) if jerr != nil { req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed") @@ -149,7 +152,7 @@ func (p *PDUStreamProvider) CompleteSync( if !peek.Deleted { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, + ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -281,12 +284,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } } } - if len(recentEvents) > 0 { - updateLatestPosition(recentEvents[len(recentEvents)-1].EventID()) - } - if len(delta.StateEvents) > 0 { - updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) - } if stateFilter.LazyLoadMembers { delta.StateEvents, err = p.lazyLoadMembers( @@ -306,6 +303,19 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } } + // Applies the history visibility rules + events, err := applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents) + if err != nil { + logrus.WithError(err).Error("unable to apply history visibility filter") + } + + if len(events) > 0 { + updateLatestPosition(events[len(events)-1].EventID()) + } + if len(delta.StateEvents) > 0 { + updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) + } + switch delta.Membership { case gomatrixserverlib.Join: jr := types.NewJoinResponse() @@ -313,14 +323,17 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition) } jr.Timeline.PrevBatch = &prevBatch - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = limited + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) + // If we are limited by the filter AND the history visibility filter + // didn't "remove" events, return that the response is limited. + jr.Timeline.Limited = limited && len(events) == len(recentEvents) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.RoomID] = *jr case gomatrixserverlib.Peek: jr := types.NewJoinResponse() jr.Timeline.PrevBatch = &prevBatch + // TODO: Apply history visibility on peeked rooms jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) @@ -330,12 +343,12 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( fallthrough // transitions to leave are the same as ban case gomatrixserverlib.Ban: - // TODO: recentEvents may contain events that this user is not allowed to see because they are - // no longer in the room. lr := types.NewLeaveResponse() lr.Timeline.PrevBatch = &prevBatch - lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) + // If we are limited by the filter AND the history visibility filter + // didn't "remove" events, return that the response is limited. + lr.Timeline.Limited = limited && len(events) == len(recentEvents) lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) res.Rooms.Leave[delta.RoomID] = *lr } @@ -343,6 +356,41 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return latestPosition, nil } +// applyHistoryVisibilityFilter gets the current room state and supplies it to ApplyHistoryVisibilityFilter, to make +// sure we always return the required events in the timeline. +func applyHistoryVisibilityFilter( + ctx context.Context, + db storage.Database, + rsAPI roomserverAPI.SyncRoomserverAPI, + roomID, userID string, + limit int, + recentEvents []*gomatrixserverlib.HeaderedEvent, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + // We need to make sure we always include the latest states events, if they are in the timeline. + // We grep at least limit * 2 events, to ensure we really get the needed events. + stateEvents, err := db.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: limit * 2}, nil) + if err != nil { + // Not a fatal error, we can continue without the stateEvents, + // they are only needed if there are state events in the timeline. + logrus.WithError(err).Warnf("failed to get current room state") + } + alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents)) + for _, ev := range stateEvents { + alwaysIncludeIDs[ev.EventID()] = struct{}{} + } + startTime := time.Now() + events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync") + if err != nil { + + return nil, err + } + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": roomID, + }).Debug("applied history visibility (sync)") + return events, nil +} + func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) { // Work out how many members are in the room. joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition) @@ -390,6 +438,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( eventFilter *gomatrixserverlib.RoomEventFilter, wantFullState bool, device *userapi.Device, + isPeek bool, ) (jr *types.JoinResponse, err error) { jr = types.NewJoinResponse() // TODO: When filters are added, we may need to call this multiple times to get enough events. @@ -404,33 +453,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( return } - // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the - // user shouldn't see, we check the recent events and remove any prior to the join event of the user - // which is equiv to history_visibility: joined - joinEventIndex := -1 - for i := len(recentStreamEvents) - 1; i >= 0; i-- { - ev := recentStreamEvents[i] - if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) { - membership, _ := ev.Membership() - if membership == "join" { - joinEventIndex = i - if i > 0 { - // the create event happens before the first join, so we should cut it at that point instead - if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") { - joinEventIndex = i - 1 - break - } - } - break - } - } - } - if joinEventIndex != -1 { - // cut all events earlier than the join (but not the join itself) - recentStreamEvents = recentStreamEvents[joinEventIndex:] - limited = false // so clients know not to try to backpaginate - } - // Work our way through the timeline events and pick out the event IDs // of any state events that appear in the timeline. We'll specifically // exclude them at the next step, so that we don't get duplicate state @@ -474,6 +496,19 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) + events := recentEvents + // Only apply history visibility checks if the response is for joined rooms + if !isPeek { + events, err = applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, roomID, device.UserID, eventFilter.Limit, recentEvents) + if err != nil { + logrus.WithError(err).Error("unable to apply history visibility filter") + } + } + + // If we are limited by the filter AND the history visibility filter + // didn't "remove" events, return that the response is limited. + limited = limited && len(events) == len(recentEvents) + if stateFilter.LazyLoadMembers { if err != nil { return nil, err @@ -488,8 +523,10 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( } jr.Timeline.PrevBatch = prevBatch - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = limited + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) + // If we are limited by the filter AND the history visibility filter + // didn't "remove" events, return that the response is limited. + jr.Timeline.Limited = limited && len(events) == len(recentEvents) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) return jr, nil } diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index 931fef883..dc073a16e 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -12,6 +12,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/producers" keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/base" @@ -54,6 +55,16 @@ func (s *syncRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *rsap return nil } +func (s *syncRoomserverAPI) QueryMembershipForUser(ctx context.Context, req *rsapi.QueryMembershipForUserRequest, res *rsapi.QueryMembershipForUserResponse) error { + res.IsRoomForgotten = false + res.RoomExists = true + return nil +} + +func (s *syncRoomserverAPI) QueryMembershipAtEvent(ctx context.Context, req *rsapi.QueryMembershipAtEventRequest, res *rsapi.QueryMembershipAtEventResponse) error { + return nil +} + type syncUserAPI struct { userapi.SyncUserAPI accounts []userapi.Device @@ -107,7 +118,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) - msgs := toNATSMsgs(t, base, room.Events()) + msgs := toNATSMsgs(t, base, room.Events()...) AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{}) testrig.MustPublishMsgs(t, jsctx, msgs...) @@ -200,7 +211,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) { // m.room.power_levels // m.room.join_rules // m.room.history_visibility - msgs := toNATSMsgs(t, base, room.Events()) + msgs := toNATSMsgs(t, base, room.Events()...) sinceTokens := make([]string, len(msgs)) AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{}) for i, msg := range msgs { @@ -315,6 +326,174 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) { } +// This is mainly what Sytest is doing in "test_history_visibility" +func TestMessageHistoryVisibility(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + testHistoryVisibility(t, dbType) + }) +} + +func testHistoryVisibility(t *testing.T, dbType test.DBType) { + type result struct { + seeWithoutJoin bool + seeBeforeJoin bool + seeAfterInvite bool + } + + // create the users + alice := test.NewUser(t) + bob := test.NewUser(t) + + bobDev := userapi.Device{ + ID: "BOBID", + UserID: bob.ID, + AccessToken: "BOD_BEARER_TOKEN", + DisplayName: "BOB", + } + + ctx := context.Background() + // check guest and normal user accounts + for _, accType := range []userapi.AccountType{userapi.AccountTypeGuest, userapi.AccountTypeUser} { + testCases := []struct { + historyVisibility gomatrixserverlib.HistoryVisibility + wantResult result + }{ + { + historyVisibility: gomatrixserverlib.HistoryVisibilityWorldReadable, + wantResult: result{ + seeWithoutJoin: true, + seeBeforeJoin: true, + seeAfterInvite: true, + }, + }, + { + historyVisibility: gomatrixserverlib.HistoryVisibilityShared, + wantResult: result{ + seeWithoutJoin: false, + seeBeforeJoin: true, + seeAfterInvite: true, + }, + }, + { + historyVisibility: gomatrixserverlib.HistoryVisibilityInvited, + wantResult: result{ + seeWithoutJoin: false, + seeBeforeJoin: false, + seeAfterInvite: true, + }, + }, + { + historyVisibility: gomatrixserverlib.HistoryVisibilityJoined, + wantResult: result{ + seeWithoutJoin: false, + seeBeforeJoin: false, + seeAfterInvite: false, + }, + }, + } + + bobDev.AccountType = accType + userType := "guest" + if accType == userapi.AccountTypeUser { + userType = "real user" + } + + base, close := testrig.CreateBaseDendrite(t, dbType) + defer close() + + jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) + + // Use the actual internal roomserver API + rsAPI := roomserver.NewInternalAPI(base) + rsAPI.SetFederationAPI(nil, nil) + + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{bobDev}}, rsAPI, &syncKeyAPI{}) + + for _, tc := range testCases { + testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) + t.Run(testname, func(t *testing.T) { + // create a room with the given visibility + room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility)) + + // send the events/messages to NATS to create the rooms + beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)}) + eventsToSend := append(room.Events(), beforeJoinEv) + if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + + // There is only one event, we expect only to be able to see this, if the room is world_readable + w := httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{ + "access_token": bobDev.AccessToken, + "dir": "b", + }))) + if w.Code != 200 { + t.Logf("%s", w.Body.String()) + t.Fatalf("got HTTP %d want %d", w.Code, 200) + } + // We only care about the returned events at this point + var res struct { + Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` + } + if err := json.NewDecoder(w.Body).Decode(&res); err != nil { + t.Errorf("failed to decode response body: %s", err) + } + + verifyEventVisible(t, tc.wantResult.seeWithoutJoin, beforeJoinEv, res.Chunk) + + // Create invite, a message, join the room and create another message. + inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID)) + afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)}) + joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID)) + msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After join in a %s room", tc.historyVisibility)}) + + eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv) + + if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { + t.Fatalf("failed to send events: %v", err) + } + + // Verify the messages after/before invite are visible or not + w = httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{ + "access_token": bobDev.AccessToken, + "dir": "b", + }))) + if w.Code != 200 { + t.Logf("%s", w.Body.String()) + t.Fatalf("got HTTP %d want %d", w.Code, 200) + } + if err := json.NewDecoder(w.Body).Decode(&res); err != nil { + t.Errorf("failed to decode response body: %s", err) + } + // verify results + verifyEventVisible(t, tc.wantResult.seeBeforeJoin, beforeJoinEv, res.Chunk) + verifyEventVisible(t, tc.wantResult.seeAfterInvite, afterInviteEv, res.Chunk) + }) + } + } +} + +func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *gomatrixserverlib.HeaderedEvent, chunk []gomatrixserverlib.ClientEvent) { + t.Helper() + if wantVisible { + for _, ev := range chunk { + if ev.EventID == wantVisibleEvent.EventID() { + return + } + } + t.Fatalf("expected to see event %s but didn't: %+v", wantVisibleEvent.EventID(), chunk) + } else { + for _, ev := range chunk { + if ev.EventID == wantVisibleEvent.EventID() { + t.Fatalf("expected not to see event %s: %+v", wantVisibleEvent.EventID(), string(ev.Content)) + } + } + } +} + func TestSendToDevice(t *testing.T) { test.WithAllDatabases(t, testSendToDevice) } @@ -448,7 +627,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { } } -func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input []*gomatrixserverlib.HeaderedEvent) []*nats.Msg { +func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg { result := make([]*nats.Msg, len(input)) for i, ev := range input { var addsStateIDs []string @@ -460,6 +639,7 @@ func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input []*gomatrixserverli NewRoomEvent: &rsapi.OutputNewRoomEvent{ Event: ev, AddsStateEventIDs: addsStateIDs, + HistoryVisibility: ev.Visibility, }, }) } diff --git a/sytest-whitelist b/sytest-whitelist index 2a145291f..88dfe920b 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -110,8 +110,6 @@ Newly joined room is included in an incremental sync User is offline if they set_presence=offline in their sync Changes to state are included in an incremental sync A change to displayname should appear in incremental /sync -Current state appears in timeline in private history -Current state appears in timeline in private history with many messages before Rooms a user is invited to appear in an initial sync Rooms a user is invited to appear in an incremental sync Sync can be polled for updates @@ -458,7 +456,6 @@ After changing password, a different session no longer works by default Read markers appear in incremental v2 /sync Read markers appear in initial v2 /sync Read markers can be updated -Local users can peek into world_readable rooms by room ID We can't peek into rooms with shared history_visibility We can't peek into rooms with invited history_visibility We can't peek into rooms with joined history_visibility @@ -720,4 +717,26 @@ Setting state twice is idempotent Joining room twice is idempotent Inbound federation can return missing events for shared visibility Inbound federation ignores redactions from invalid servers room > v3 +Joining room twice is idempotent +Getting messages going forward is limited for a departed room (SPEC-216) +m.room.history_visibility == "shared" allows/forbids appropriately for Guest users +m.room.history_visibility == "invited" allows/forbids appropriately for Guest users +m.room.history_visibility == "default" allows/forbids appropriately for Guest users +m.room.history_visibility == "shared" allows/forbids appropriately for Real users +m.room.history_visibility == "invited" allows/forbids appropriately for Real users +m.room.history_visibility == "default" allows/forbids appropriately for Real users +Guest users can sync from world_readable guest_access rooms if joined +Guest users can sync from shared guest_access rooms if joined +Guest users can sync from invited guest_access rooms if joined +Guest users can sync from joined guest_access rooms if joined +Guest users can sync from default guest_access rooms if joined +Real users can sync from world_readable guest_access rooms if joined +Real users can sync from shared guest_access rooms if joined +Real users can sync from invited guest_access rooms if joined +Real users can sync from joined guest_access rooms if joined +Real users can sync from default guest_access rooms if joined +Only see history_visibility changes on boundaries +Current state appears in timeline in private history +Current state appears in timeline in private history with many messages before +Local users can peek into world_readable rooms by room ID Newly joined room includes presence in incremental sync \ No newline at end of file diff --git a/test/room.go b/test/room.go index 6ae403b3f..94eb51bbe 100644 --- a/test/room.go +++ b/test/room.go @@ -37,10 +37,11 @@ var ( ) type Room struct { - ID string - Version gomatrixserverlib.RoomVersion - preset Preset - creator *User + ID string + Version gomatrixserverlib.RoomVersion + preset Preset + visibility gomatrixserverlib.HistoryVisibility + creator *User authEvents gomatrixserverlib.AuthEvents currentState map[string]*gomatrixserverlib.HeaderedEvent @@ -61,6 +62,7 @@ func NewRoom(t *testing.T, creator *User, modifiers ...roomModifier) *Room { preset: PresetPublicChat, Version: gomatrixserverlib.RoomVersionV9, currentState: make(map[string]*gomatrixserverlib.HeaderedEvent), + visibility: gomatrixserverlib.HistoryVisibilityShared, } for _, m := range modifiers { m(t, r) @@ -97,10 +99,14 @@ func (r *Room) insertCreateEvents(t *testing.T) { fallthrough case PresetPrivateChat: joinRule.JoinRule = "invite" - hisVis.HistoryVisibility = "shared" + hisVis.HistoryVisibility = gomatrixserverlib.HistoryVisibilityShared case PresetPublicChat: joinRule.JoinRule = "public" - hisVis.HistoryVisibility = "shared" + hisVis.HistoryVisibility = gomatrixserverlib.HistoryVisibilityShared + } + + if r.visibility != "" { + hisVis.HistoryVisibility = r.visibility } r.CreateAndInsert(t, r.creator, gomatrixserverlib.MRoomCreate, map[string]interface{}{ @@ -183,7 +189,9 @@ func (r *Room) CreateEvent(t *testing.T, creator *User, eventType string, conten if err = gomatrixserverlib.Allowed(ev, &r.authEvents); err != nil { t.Fatalf("CreateEvent[%s]: failed to verify event was allowed: %s", eventType, err) } - return ev.Headered(r.Version) + headeredEvent := ev.Headered(r.Version) + headeredEvent.Visibility = r.visibility + return headeredEvent } // Add a new event to this room DAG. Not thread-safe. @@ -242,6 +250,12 @@ func RoomPreset(p Preset) roomModifier { } } +func RoomHistoryVisibility(vis gomatrixserverlib.HistoryVisibility) roomModifier { + return func(t *testing.T, r *Room) { + r.visibility = vis + } +} + func RoomVersion(ver gomatrixserverlib.RoomVersion) roomModifier { return func(t *testing.T, r *Room) { r.Version = ver diff --git a/test/user.go b/test/user.go index 0020098a5..692eae351 100644 --- a/test/user.go +++ b/test/user.go @@ -20,6 +20,7 @@ import ( "sync/atomic" "testing" + "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" ) @@ -45,7 +46,8 @@ var ( ) type User struct { - ID string + ID string + accountType api.AccountType // key ID and private key of the server who has this user, if known. keyID gomatrixserverlib.KeyID privKey ed25519.PrivateKey @@ -62,6 +64,12 @@ func WithSigningServer(srvName gomatrixserverlib.ServerName, keyID gomatrixserve } } +func WithAccountType(accountType api.AccountType) UserOpt { + return func(u *User) { + u.accountType = accountType + } +} + func NewUser(t *testing.T, opts ...UserOpt) *User { counter := atomic.AddInt64(&userIDCounter, 1) var u User