From 79137599217808d9f9585628408a6c2aa26ab11d Mon Sep 17 00:00:00 2001 From: Kegsay Date: Mon, 7 Sep 2020 12:38:09 +0100 Subject: [PATCH] Remove QueryBulkStateContent from current state server (#1404) * Remove QueryBulkStateContent from current state server Expected fail due to db impl not existing * Implement query bulk state content * Fix up rejecting invites over federation * Fix bulk content marshalling --- clientapi/routing/directory_public.go | 14 ++-- clientapi/routing/routing.go | 2 +- cmd/dendrite-demo-libp2p/publicrooms.go | 2 +- currentstateserver/api/api.go | 32 --------- currentstateserver/api/wrapper.go | 89 ------------------------- currentstateserver/internal/api.go | 24 ------- currentstateserver/inthttp/client.go | 15 ----- currentstateserver/inthttp/server.go | 19 +----- federationapi/routing/publicrooms.go | 18 ++--- federationapi/routing/routing.go | 2 +- roomserver/api/query.go | 33 +++++++++ roomserver/internal/query/query.go | 3 + roomserver/storage/interface.go | 2 +- roomserver/storage/shared/storage.go | 87 +++++++++++++++++++++++- roomserver/storage/tables/interface.go | 43 ++++++++++++ syncapi/consumers/keychange.go | 4 +- syncapi/internal/keychange.go | 12 ++-- syncapi/internal/keychange_test.go | 34 ++-------- syncapi/sync/requestpool.go | 2 +- 19 files changed, 198 insertions(+), 239 deletions(-) delete mode 100644 currentstateserver/api/wrapper.go diff --git a/clientapi/routing/directory_public.go b/clientapi/routing/directory_public.go index bae7e49bb..fd7bc1e86 100644 --- a/clientapi/routing/directory_public.go +++ b/clientapi/routing/directory_public.go @@ -26,7 +26,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/api" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal/config" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -51,7 +50,7 @@ type filter struct { // GetPostPublicRooms implements GET and POST /publicRooms func GetPostPublicRooms( - req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, + req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, federation *gomatrixserverlib.FederationClient, cfg *config.ClientAPI, @@ -75,7 +74,7 @@ func GetPostPublicRooms( } } - response, err := publicRooms(req.Context(), request, rsAPI, stateAPI, extRoomsProvider) + response, err := publicRooms(req.Context(), request, rsAPI, extRoomsProvider) if err != nil { util.GetLogger(req.Context()).WithError(err).Errorf("failed to work out public rooms") return jsonerror.InternalServerError() @@ -86,8 +85,8 @@ func GetPostPublicRooms( } } -func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, - stateAPI currentstateAPI.CurrentStateInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, +func publicRooms( + ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, ) (*gomatrixserverlib.RespPublicRooms, error) { response := gomatrixserverlib.RespPublicRooms{ @@ -110,7 +109,7 @@ func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI var rooms []gomatrixserverlib.PublicRoom if request.Since == "" { - rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider, stateAPI) + rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider) } else { rooms = getPublicRoomsFromCache() } @@ -226,7 +225,6 @@ func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) ( func refreshPublicRoomCache( ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, - stateAPI currentstateAPI.CurrentStateInternalAPI, ) []gomatrixserverlib.PublicRoom { cacheMu.Lock() defer cacheMu.Unlock() @@ -241,7 +239,7 @@ func refreshPublicRoomCache( util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") return publicRoomsCache } - pubRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, stateAPI) + pubRooms, err := roomserverAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, rsAPI) if err != nil { util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed") return publicRoomsCache diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 0445852dd..97ab03e33 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -336,7 +336,7 @@ func Setup( ).Methods(http.MethodPut, http.MethodOptions) r0mux.Handle("/publicRooms", httputil.MakeExternalAPI("public_rooms", func(req *http.Request) util.JSONResponse { - return GetPostPublicRooms(req, rsAPI, stateAPI, extRoomsProvider, federation, cfg) + return GetPostPublicRooms(req, rsAPI, extRoomsProvider, federation, cfg) }), ).Methods(http.MethodGet, http.MethodPost, http.MethodOptions) diff --git a/cmd/dendrite-demo-libp2p/publicrooms.go b/cmd/dendrite-demo-libp2p/publicrooms.go index 2160ddefd..838ba77b6 100644 --- a/cmd/dendrite-demo-libp2p/publicrooms.go +++ b/cmd/dendrite-demo-libp2p/publicrooms.go @@ -106,7 +106,7 @@ func (p *publicRoomsProvider) AdvertiseRooms() error { util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") return err } - ourRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.stateAPI) + ourRooms, err := roomserverAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.rsAPI) if err != nil { util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed") return err diff --git a/currentstateserver/api/api.go b/currentstateserver/api/api.go index f11422acf..536ae0eda 100644 --- a/currentstateserver/api/api.go +++ b/currentstateserver/api/api.go @@ -14,37 +14,5 @@ package api -import ( - "context" - - "github.com/matrix-org/gomatrixserverlib" -) - type CurrentStateInternalAPI interface { - // QueryBulkStateContent does a bulk query for state event content in the given rooms. - QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error -} - -type QueryBulkStateContentRequest struct { - // Returns state events in these rooms - RoomIDs []string - // If true, treats the '*' StateKey as "all state events of this type" rather than a literal value of '*' - AllowWildcards bool - // The state events to return. Only a small subset of tuples are allowed in this request as only certain events - // have their content fields extracted. Specifically, the tuple Type must be one of: - // m.room.avatar - // m.room.create - // m.room.canonical_alias - // m.room.guest_access - // m.room.history_visibility - // m.room.join_rules - // m.room.member - // m.room.name - // m.room.topic - // Any other tuple type will result in the query failing. - StateTuples []gomatrixserverlib.StateKeyTuple -} -type QueryBulkStateContentResponse struct { - // map of room ID -> tuple -> content_value - Rooms map[string]map[gomatrixserverlib.StateKeyTuple]string } diff --git a/currentstateserver/api/wrapper.go b/currentstateserver/api/wrapper.go deleted file mode 100644 index 20fae825f..000000000 --- a/currentstateserver/api/wrapper.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import ( - "context" - - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" -) - -// PopulatePublicRooms extracts PublicRoom information for all the provided room IDs. The IDs are not checked to see if they are visible in the -// published room directory. -// due to lots of switches -// nolint:gocyclo -func PopulatePublicRooms(ctx context.Context, roomIDs []string, stateAPI CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { - avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""} - nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""} - canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""} - topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""} - guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""} - visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""} - joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""} - - var stateRes QueryBulkStateContentResponse - err := stateAPI.QueryBulkStateContent(ctx, &QueryBulkStateContentRequest{ - RoomIDs: roomIDs, - AllowWildcards: true, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple, - {EventType: gomatrixserverlib.MRoomMember, StateKey: "*"}, - }, - }, &stateRes) - if err != nil { - util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed") - return nil, err - } - chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs)) - i := 0 - for roomID, data := range stateRes.Rooms { - pub := gomatrixserverlib.PublicRoom{ - RoomID: roomID, - } - joinCount := 0 - var joinRule, guestAccess string - for tuple, contentVal := range data { - if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" { - joinCount++ - continue - } - switch tuple { - case avatarTuple: - pub.AvatarURL = contentVal - case nameTuple: - pub.Name = contentVal - case topicTuple: - pub.Topic = contentVal - case canonicalTuple: - pub.CanonicalAlias = contentVal - case visibilityTuple: - pub.WorldReadable = contentVal == "world_readable" - // need both of these to determine whether guests can join - case joinRuleTuple: - joinRule = contentVal - case guestTuple: - guestAccess = contentVal - } - } - if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" { - pub.GuestCanJoin = true - } - pub.JoinedMembersCount = joinCount - chunk[i] = pub - i++ - } - return chunk, nil -} diff --git a/currentstateserver/internal/api.go b/currentstateserver/internal/api.go index 2d6da1e6e..f218fa19c 100644 --- a/currentstateserver/internal/api.go +++ b/currentstateserver/internal/api.go @@ -15,33 +15,9 @@ package internal import ( - "context" - - "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/currentstateserver/storage" - "github.com/matrix-org/gomatrixserverlib" ) type CurrentStateInternalAPI struct { DB storage.Database } - -func (a *CurrentStateInternalAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error { - events, err := a.DB.GetBulkStateContent(ctx, req.RoomIDs, req.StateTuples, req.AllowWildcards) - if err != nil { - return err - } - res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string) - for _, ev := range events { - if res.Rooms[ev.RoomID] == nil { - res.Rooms[ev.RoomID] = make(map[gomatrixserverlib.StateKeyTuple]string) - } - room := res.Rooms[ev.RoomID] - room[gomatrixserverlib.StateKeyTuple{ - EventType: ev.EventType, - StateKey: ev.StateKey, - }] = ev.ContentValue - res.Rooms[ev.RoomID] = room - } - return nil -} diff --git a/currentstateserver/inthttp/client.go b/currentstateserver/inthttp/client.go index 131307160..201768490 100644 --- a/currentstateserver/inthttp/client.go +++ b/currentstateserver/inthttp/client.go @@ -15,13 +15,10 @@ package inthttp import ( - "context" "errors" "net/http" "github.com/matrix-org/dendrite/currentstateserver/api" - "github.com/matrix-org/dendrite/internal/httputil" - "github.com/opentracing/opentracing-go" ) // HTTP paths for the internal HTTP APIs @@ -49,15 +46,3 @@ type httpCurrentStateInternalAPI struct { apiURL string httpClient *http.Client } - -func (h *httpCurrentStateInternalAPI) QueryBulkStateContent( - ctx context.Context, - request *api.QueryBulkStateContentRequest, - response *api.QueryBulkStateContentResponse, -) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBulkStateContent") - defer span.Finish() - - apiURL := h.apiURL + QueryBulkStateContentPath - return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) -} diff --git a/currentstateserver/inthttp/server.go b/currentstateserver/inthttp/server.go index 70d6ecfd5..3847344cb 100644 --- a/currentstateserver/inthttp/server.go +++ b/currentstateserver/inthttp/server.go @@ -15,27 +15,10 @@ package inthttp import ( - "encoding/json" - "net/http" - "github.com/gorilla/mux" "github.com/matrix-org/dendrite/currentstateserver/api" - "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/util" ) func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) { - internalAPIMux.Handle(QueryBulkStateContentPath, - httputil.MakeInternalAPI("queryBulkStateContent", func(req *http.Request) util.JSONResponse { - request := api.QueryBulkStateContentRequest{} - response := api.QueryBulkStateContentResponse{} - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - if err := intAPI.QueryBulkStateContent(req.Context(), &request, &response); err != nil { - return util.ErrorResponse(err) - } - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) + } diff --git a/federationapi/routing/publicrooms.go b/federationapi/routing/publicrooms.go index 3807a5183..d923a236a 100644 --- a/federationapi/routing/publicrooms.go +++ b/federationapi/routing/publicrooms.go @@ -7,7 +7,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -24,7 +23,7 @@ type filter struct { } // GetPostPublicRooms implements GET and POST /publicRooms -func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI) util.JSONResponse { +func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI) util.JSONResponse { var request PublicRoomReq if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil { return *fillErr @@ -32,7 +31,7 @@ func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInterna if request.Limit == 0 { request.Limit = 50 } - response, err := publicRooms(req.Context(), request, rsAPI, stateAPI) + response, err := publicRooms(req.Context(), request, rsAPI) if err != nil { return jsonerror.InternalServerError() } @@ -42,8 +41,9 @@ func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInterna } } -func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, - stateAPI currentstateAPI.CurrentStateInternalAPI) (*gomatrixserverlib.RespPublicRooms, error) { +func publicRooms( + ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, +) (*gomatrixserverlib.RespPublicRooms, error) { var response gomatrixserverlib.RespPublicRooms var limit int16 @@ -80,7 +80,7 @@ func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI nextIndex = len(queryRes.RoomIDs) } roomIDs := queryRes.RoomIDs[offset:nextIndex] - response.Chunk, err = fillInRooms(ctx, roomIDs, stateAPI) + response.Chunk, err = fillInRooms(ctx, roomIDs, rsAPI) return &response, err } @@ -112,7 +112,7 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO // due to lots of switches // nolint:gocyclo -func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI.CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { +func fillInRooms(ctx context.Context, roomIDs []string, rsAPI roomserverAPI.RoomserverInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""} nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""} canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""} @@ -121,8 +121,8 @@ func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""} joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""} - var stateRes currentstateAPI.QueryBulkStateContentResponse - err := stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + var stateRes roomserverAPI.QueryBulkStateContentResponse + err := rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{ RoomIDs: roomIDs, AllowWildcards: true, StateTuples: []gomatrixserverlib.StateKeyTuple{ diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index 4c43be273..7d60d15e4 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -390,7 +390,7 @@ func Setup( v1fedmux.Handle("/publicRooms", httputil.MakeExternalAPI("federation_public_rooms", func(req *http.Request) util.JSONResponse { - return GetPostPublicRooms(req, rsAPI, stateAPI) + return GetPostPublicRooms(req, rsAPI) }), ).Methods(http.MethodGet) diff --git a/roomserver/api/query.go b/roomserver/api/query.go index d0d0474d8..67a217c82 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -303,6 +303,39 @@ type QueryServerBannedFromRoomResponse struct { Banned bool `json:"banned"` } +// MarshalJSON stringifies the room ID and StateKeyTuple keys so they can be sent over the wire in HTTP API mode. +func (r *QueryBulkStateContentResponse) MarshalJSON() ([]byte, error) { + se := make(map[string]string) + for roomID, tupleToEvent := range r.Rooms { + for tuple, event := range tupleToEvent { + // use 0x1F (unit separator) as the delimiter between room ID/type/state key, + se[fmt.Sprintf("%s\x1F%s\x1F%s", roomID, tuple.EventType, tuple.StateKey)] = event + } + } + return json.Marshal(se) +} + +func (r *QueryBulkStateContentResponse) UnmarshalJSON(data []byte) error { + wireFormat := make(map[string]string) + err := json.Unmarshal(data, &wireFormat) + if err != nil { + return err + } + r.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string) + for roomTuple, value := range wireFormat { + fields := strings.Split(roomTuple, "\x1F") + roomID := fields[0] + if r.Rooms[roomID] == nil { + r.Rooms[roomID] = make(map[gomatrixserverlib.StateKeyTuple]string) + } + r.Rooms[roomID][gomatrixserverlib.StateKeyTuple{ + EventType: fields[1], + StateKey: fields[2], + }] = value + } + return nil +} + // MarshalJSON stringifies the StateKeyTuple keys so they can be sent over the wire in HTTP API mode. func (r *QueryCurrentStateResponse) MarshalJSON() ([]byte, error) { se := make(map[string]*gomatrixserverlib.HeaderedEvent, len(r.StateEvents)) diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index f76c93166..b34ae7701 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -140,6 +140,9 @@ func (r *Queryer) QueryMembershipForUser( if err != nil { return err } + if info == nil { + return fmt.Errorf("QueryMembershipForUser: unknown room %s", request.RoomID) + } membershipEventNID, stillInRoom, err := r.DB.GetMembership(ctx, info.RoomNID, request.UserID) if err != nil { diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index c4119f7ed..be724da62 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -17,9 +17,9 @@ package storage import ( "context" - "github.com/matrix-org/dendrite/currentstateserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage/shared" + "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index a081603f0..5c18c7250 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -7,7 +7,6 @@ import ( "fmt" "sort" - csstables "github.com/matrix-org/dendrite/currentstateserver/storage/tables" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" @@ -799,8 +798,90 @@ func (d *Database) GetRoomsByMembership(ctx context.Context, userID, membership // GetBulkStateContent returns all state events which match a given room ID and a given state key tuple. Both must be satisfied for a match. // If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned. -func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]csstables.StrippedEvent, error) { - return nil, fmt.Errorf("not implemented yet") +// nolint:gocyclo +func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error) { + eventTypes := make([]string, 0, len(tuples)) + for _, tuple := range tuples { + eventTypes = append(eventTypes, tuple.EventType) + } + // we don't bother failing the request if we get asked for event types we don't know about, as all that would result in is no matches which + // isn't a failure. + eventTypeNIDMap, err := d.EventTypesTable.BulkSelectEventTypeNID(ctx, eventTypes) + if err != nil { + return nil, fmt.Errorf("GetBulkStateContent: failed to map event type nids: %w", err) + } + typeNIDSet := make(map[types.EventTypeNID]bool) + for _, nid := range eventTypeNIDMap { + typeNIDSet[nid] = true + } + + allowWildcard := make(map[types.EventTypeNID]bool) + eventStateKeys := make([]string, 0, len(tuples)) + for _, tuple := range tuples { + if allowWildcards && tuple.StateKey == "*" { + allowWildcard[eventTypeNIDMap[tuple.EventType]] = true + continue + } + eventStateKeys = append(eventStateKeys, tuple.StateKey) + + } + + eventStateKeyNIDMap, err := d.EventStateKeysTable.BulkSelectEventStateKeyNID(ctx, eventStateKeys) + if err != nil { + return nil, fmt.Errorf("GetBulkStateContent: failed to map state key nids: %w", err) + } + stateKeyNIDSet := make(map[types.EventStateKeyNID]bool) + for _, nid := range eventStateKeyNIDMap { + stateKeyNIDSet[nid] = true + } + + var eventNIDs []types.EventNID + eventNIDToVer := make(map[types.EventNID]gomatrixserverlib.RoomVersion) + // TODO: This feels like this is going to be really slow... + for _, roomID := range roomIDs { + roomInfo, err2 := d.RoomInfo(ctx, roomID) + if err2 != nil { + return nil, fmt.Errorf("GetBulkStateContent: failed to load room info for room %s : %w", roomID, err2) + } + // for unknown rooms or rooms which we don't have the current state, skip them. + if roomInfo == nil || roomInfo.IsStub { + continue + } + entries, err2 := d.loadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID) + if err2 != nil { + return nil, fmt.Errorf("GetBulkStateContent: failed to load state for room %s : %w", roomID, err2) + } + for _, entry := range entries { + if typeNIDSet[entry.EventTypeNID] { + if allowWildcard[entry.EventTypeNID] || stateKeyNIDSet[entry.EventStateKeyNID] { + eventNIDs = append(eventNIDs, entry.EventNID) + eventNIDToVer[entry.EventNID] = roomInfo.RoomVersion + } + } + } + } + + events, err := d.EventJSONTable.BulkSelectEventJSON(ctx, eventNIDs) + if err != nil { + return nil, fmt.Errorf("GetBulkStateContent: failed to load event JSON for event nids: %w", err) + } + result := make([]tables.StrippedEvent, len(events)) + for i := range events { + roomVer := eventNIDToVer[events[i].EventNID] + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(events[i].EventJSON, false, roomVer) + if err != nil { + return nil, fmt.Errorf("GetBulkStateContent: failed to load event JSON for event NID %v : %w", events[i].EventNID, err) + } + hev := ev.Headered(roomVer) + result[i] = tables.StrippedEvent{ + EventType: ev.Type(), + RoomID: ev.RoomID(), + StateKey: *ev.StateKey(), + ContentValue: tables.ExtractContentValue(&hev), + } + } + + return result, nil } // JoinedUsersSetInRooms returns all joined users in the rooms given, along with the count of how many times they appear. diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index a142f2b1a..adb06212a 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -6,6 +6,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/tidwall/gjson" ) type EventJSONPair struct { @@ -155,3 +156,45 @@ type Redactions interface { // successfully redacted the event JSON. MarkRedactionValidated(ctx context.Context, txn *sql.Tx, redactionEventID string, validated bool) error } + +// StrippedEvent represents a stripped event for returning extracted content values. +type StrippedEvent struct { + RoomID string + EventType string + StateKey string + ContentValue string +} + +// ExtractContentValue from the given state event. For example, given an m.room.name event with: +// content: { name: "Foo" } +// this returns "Foo". +func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string { + content := ev.Content() + key := "" + switch ev.Type() { + case gomatrixserverlib.MRoomCreate: + key = "creator" + case gomatrixserverlib.MRoomCanonicalAlias: + key = "alias" + case gomatrixserverlib.MRoomHistoryVisibility: + key = "history_visibility" + case gomatrixserverlib.MRoomJoinRules: + key = "join_rule" + case gomatrixserverlib.MRoomMember: + key = "membership" + case gomatrixserverlib.MRoomName: + key = "name" + case "m.room.avatar": + key = "url" + case "m.room.topic": + key = "topic" + case "m.room.guest_access": + key = "guest_access" + } + result := gjson.GetBytes(content, key) + if !result.Exists() { + return "" + } + // this returns the empty string if this is not a string type + return result.Str +} diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 33797378e..5b50bac26 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -133,7 +133,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are now sharing rooms with which we previously were not and notify them about the joining // users keys: - changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) + changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) if err != nil { log.WithError(err).Error("OnJoinEvent: failed to work out changed users") return @@ -146,7 +146,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are no longer sharing any rooms with and notify them about the leaving user - _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) + _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) if err != nil { log.WithError(err).Error("OnLeaveEvent: failed to work out left users") return diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index f2f50aeff..090e0c658 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -19,7 +19,6 @@ import ( "strings" "github.com/Shopify/sarama" - currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -50,7 +49,6 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, // nolint:gocyclo func DeviceListCatchup( ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, - stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, res *types.Response, from, to types.StreamingToken, ) (hasNew bool, err error) { @@ -58,7 +56,7 @@ func DeviceListCatchup( newlyJoinedRooms := joinedRooms(res, userID) newlyLeftRooms := leftRooms(res) if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { - changed, left, err := TrackChangedUsers(ctx, rsAPI, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms) + changed, left, err := TrackChangedUsers(ctx, rsAPI, userID, newlyJoinedRooms, newlyLeftRooms) if err != nil { return false, err } @@ -144,7 +142,7 @@ func DeviceListCatchup( // TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. // nolint:gocyclo func TrackChangedUsers( - ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, + ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, ) (changed, left []string, err error) { // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. @@ -161,8 +159,8 @@ func TrackChangedUsers( if err != nil { return nil, nil, err } - var stateRes currentstateAPI.QueryBulkStateContentResponse - err = stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + var stateRes roomserverAPI.QueryBulkStateContentResponse + err = rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{ RoomIDs: newlyLeftRooms, StateTuples: []gomatrixserverlib.StateKeyTuple{ { @@ -202,7 +200,7 @@ func TrackChangedUsers( if err != nil { return nil, left, err } - err = stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + err = rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{ RoomIDs: newlyJoinedRooms, StateTuples: []gomatrixserverlib.StateKeyTuple{ { diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 7d7394302..c25011814 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/Shopify/sarama" - stateapi "github.com/matrix-org/dendrite/currentstateserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" @@ -108,25 +107,6 @@ func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.Query return nil } -type mockStateAPI struct { - rsAPI *mockRoomserverAPI -} - -// QueryBulkStateContent does a bulk query for state event content in the given rooms. -func (s *mockStateAPI) QueryBulkStateContent(ctx context.Context, req *stateapi.QueryBulkStateContentRequest, res *stateapi.QueryBulkStateContentResponse) error { - var res2 api.QueryBulkStateContentResponse - err := s.rsAPI.QueryBulkStateContent(ctx, &api.QueryBulkStateContentRequest{ - RoomIDs: req.RoomIDs, - AllowWildcards: req.AllowWildcards, - StateTuples: req.StateTuples, - }, &res2) - if err != nil { - return err - } - res.Rooms = res2.Rooms - return nil -} - type wantCatchup struct { hasNew bool changed []string @@ -200,7 +180,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { "!another:room": {syncingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -223,7 +203,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { "!another:room": {syncingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -246,7 +226,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { "!another:room": {syncingUser, existingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -268,7 +248,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { "!another:room": {syncingUser, existingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -327,7 +307,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { roomID: {syncingUser, existingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -355,7 +335,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { "!another:room": {syncingUser}, }, } - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -441,7 +421,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { }, } hasNew, err := DeviceListCatchup( - context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken, + context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken, ) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 2859da710..319a8149c 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -267,7 +267,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea func (rp *RequestPool) appendDeviceLists( data *types.Response, userID string, since, to types.StreamingToken, ) (*types.Response, error) { - _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, rp.stateAPI, userID, data, since, to) + _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, userID, data, since, to) if err != nil { return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err) }