diff --git a/cmd/dendrite-federation-api-server/main.go b/cmd/dendrite-federation-api-server/main.go index e3bf5edc8..1bde56368 100644 --- a/cmd/dendrite-federation-api-server/main.go +++ b/cmd/dendrite-federation-api-server/main.go @@ -33,7 +33,7 @@ func main() { federationapi.AddPublicRoutes( base.PublicAPIMux, base.Cfg, userAPI, federation, keyRing, - rsAPI, fsAPI, base.EDUServerClient(), + rsAPI, fsAPI, base.EDUServerClient(), base.CurrentStateAPIClient(), ) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI)) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 11f339b0f..36ce9f65b 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -173,6 +173,7 @@ func main() { cfg.Database.RoomServer = "file:/idb/dendritejs_roomserver.db" cfg.Database.ServerKey = "file:/idb/dendritejs_serverkey.db" cfg.Database.SyncAPI = "file:/idb/dendritejs_syncapi.db" + cfg.Database.CurrentState = "file:/idb/dendritejs_currentstate.db" cfg.Kafka.Topics.OutputTypingEvent = "output_typing_event" cfg.Kafka.Topics.OutputSendToDeviceEvent = "output_send_to_device_event" cfg.Kafka.Topics.OutputClientData = "output_client_data" diff --git a/currentstateserver/storage/postgres/current_room_state_table.go b/currentstateserver/storage/postgres/current_room_state_table.go index 79a8d40f8..bd2e075f0 100644 --- a/currentstateserver/storage/postgres/current_room_state_table.go +++ b/currentstateserver/storage/postgres/current_room_state_table.go @@ -71,12 +71,20 @@ const selectStateEventSQL = "" + const selectEventsWithEventIDsSQL = "" + "SELECT headered_event_json FROM currentstate_current_room_state WHERE event_id = ANY($1)" +const selectBulkStateContentSQL = "" + + "SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id = ANY($1) AND type = ANY($2) AND state_key = ANY($3)" + +const selectBulkStateContentWildSQL = "" + + "SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id = ANY($1) AND type = ANY($2)" + type currentRoomStateStatements struct { upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt selectEventsWithEventIDsStmt *sql.Stmt selectStateEventStmt *sql.Stmt + selectBulkStateContentStmt *sql.Stmt + selectBulkStateContentWildStmt *sql.Stmt } func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) { @@ -100,6 +108,12 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { return nil, err } + if s.selectBulkStateContentStmt, err = db.Prepare(selectBulkStateContentSQL); err != nil { + return nil, err + } + if s.selectBulkStateContentWildStmt, err = db.Prepare(selectBulkStateContentWildSQL); err != nil { + return nil, err + } return s, nil } @@ -207,5 +221,52 @@ func (s *currentRoomStateStatements) SelectStateEvent( func (s *currentRoomStateStatements) SelectBulkStateContent( ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool, ) ([]tables.StrippedEvent, error) { - return nil, nil + hasWildcards := false + eventTypeSet := make(map[string]bool) + stateKeySet := make(map[string]bool) + var eventTypes []string + var stateKeys []string + for _, tuple := range tuples { + if !eventTypeSet[tuple.EventType] { + eventTypeSet[tuple.EventType] = true + eventTypes = append(eventTypes, tuple.EventType) + } + if !stateKeySet[tuple.StateKey] { + stateKeySet[tuple.StateKey] = true + stateKeys = append(stateKeys, tuple.StateKey) + } + if tuple.StateKey == "*" { + hasWildcards = true + } + } + var rows *sql.Rows + var err error + if hasWildcards && allowWildcards { + rows, err = s.selectBulkStateContentWildStmt.QueryContext(ctx, pq.StringArray(roomIDs), pq.StringArray(eventTypes)) + } else { + rows, err = s.selectBulkStateContentStmt.QueryContext( + ctx, pq.StringArray(roomIDs), pq.StringArray(eventTypes), pq.StringArray(stateKeys), + ) + } + if err != nil { + return nil, err + } + strippedEvents := []tables.StrippedEvent{} + defer internal.CloseAndLogIfError(ctx, rows, "SelectBulkStateContent: rows.close() failed") + for rows.Next() { + var roomID string + var eventType string + var stateKey string + var contentVal string + if err = rows.Scan(&roomID, &eventType, &stateKey, &contentVal); err != nil { + return nil, err + } + strippedEvents = append(strippedEvents, tables.StrippedEvent{ + RoomID: roomID, + ContentValue: contentVal, + EventType: eventType, + StateKey: stateKey, + }) + } + return strippedEvents, rows.Err() } diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index c0c000434..7d1994b25 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -16,6 +16,7 @@ package federationapi import ( "github.com/gorilla/mux" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" @@ -36,11 +37,12 @@ func AddPublicRoutes( rsAPI roomserverAPI.RoomserverInternalAPI, federationSenderAPI federationSenderAPI.FederationSenderInternalAPI, eduAPI eduserverAPI.EDUServerInputAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, ) { routing.Setup( router, cfg, rsAPI, eduAPI, federationSenderAPI, keyRing, - federation, userAPI, + federation, userAPI, stateAPI, ) } diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go index cc85c61bf..6bbe9d80e 100644 --- a/federationapi/federationapi_test.go +++ b/federationapi/federationapi_test.go @@ -31,7 +31,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) { fsAPI := base.FederationSenderHTTPClient() // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing. - federationapi.AddPublicRoutes(base.PublicAPIMux, cfg, nil, nil, keyRing, nil, fsAPI, nil) + federationapi.AddPublicRoutes(base.PublicAPIMux, cfg, nil, nil, keyRing, nil, fsAPI, nil, nil) httputil.SetupHTTPAPI( base.BaseMux, base.PublicAPIMux, diff --git a/federationapi/routing/publicrooms.go b/federationapi/routing/publicrooms.go new file mode 100644 index 000000000..3807a5183 --- /dev/null +++ b/federationapi/routing/publicrooms.go @@ -0,0 +1,178 @@ +package routing + +import ( + "context" + "net/http" + "strconv" + + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +type PublicRoomReq struct { + Since string `json:"since,omitempty"` + Limit int16 `json:"limit,omitempty"` + Filter filter `json:"filter,omitempty"` +} + +type filter struct { + SearchTerms string `json:"generic_search_term,omitempty"` +} + +// GetPostPublicRooms implements GET and POST /publicRooms +func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI) util.JSONResponse { + var request PublicRoomReq + if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil { + return *fillErr + } + if request.Limit == 0 { + request.Limit = 50 + } + response, err := publicRooms(req.Context(), request, rsAPI, stateAPI) + if err != nil { + return jsonerror.InternalServerError() + } + return util.JSONResponse{ + Code: http.StatusOK, + JSON: response, + } +} + +func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI) (*gomatrixserverlib.RespPublicRooms, error) { + + var response gomatrixserverlib.RespPublicRooms + var limit int16 + var offset int64 + limit = request.Limit + offset, err := strconv.ParseInt(request.Since, 10, 64) + // ParseInt returns 0 and an error when trying to parse an empty string + // In that case, we want to assign 0 so we ignore the error + if err != nil && len(request.Since) > 0 { + util.GetLogger(ctx).WithError(err).Error("strconv.ParseInt failed") + return nil, err + } + + var queryRes roomserverAPI.QueryPublishedRoomsResponse + err = rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") + return nil, err + } + response.TotalRoomCountEstimate = len(queryRes.RoomIDs) + + if offset > 0 { + response.PrevBatch = strconv.Itoa(int(offset) - 1) + } + nextIndex := int(offset) + int(limit) + if response.TotalRoomCountEstimate > nextIndex { + response.NextBatch = strconv.Itoa(nextIndex) + } + + if offset < 0 { + offset = 0 + } + if nextIndex > len(queryRes.RoomIDs) { + nextIndex = len(queryRes.RoomIDs) + } + roomIDs := queryRes.RoomIDs[offset:nextIndex] + response.Chunk, err = fillInRooms(ctx, roomIDs, stateAPI) + return &response, err +} + +// fillPublicRoomsReq fills the Limit, Since and Filter attributes of a GET or POST request +// on /publicRooms by parsing the incoming HTTP request +// Filter is only filled for POST requests +func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSONResponse { + if httpReq.Method == http.MethodGet { + limit, err := strconv.Atoi(httpReq.FormValue("limit")) + // Atoi returns 0 and an error when trying to parse an empty string + // In that case, we want to assign 0 so we ignore the error + if err != nil && len(httpReq.FormValue("limit")) > 0 { + util.GetLogger(httpReq.Context()).WithError(err).Error("strconv.Atoi failed") + reqErr := jsonerror.InternalServerError() + return &reqErr + } + request.Limit = int16(limit) + request.Since = httpReq.FormValue("since") + return nil + } else if httpReq.Method == http.MethodPost { + return httputil.UnmarshalJSONRequest(httpReq, request) + } + + return &util.JSONResponse{ + Code: http.StatusMethodNotAllowed, + JSON: jsonerror.NotFound("Bad method"), + } +} + +// due to lots of switches +// nolint:gocyclo +func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI.CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { + avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""} + nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""} + canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""} + topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""} + guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""} + visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""} + joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""} + + var stateRes currentstateAPI.QueryBulkStateContentResponse + err := stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + RoomIDs: roomIDs, + AllowWildcards: true, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple, + {EventType: gomatrixserverlib.MRoomMember, StateKey: "*"}, + }, + }, &stateRes) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed") + return nil, err + } + util.GetLogger(ctx).Infof("room IDs: %+v", roomIDs) + util.GetLogger(ctx).Infof("State res: %+v", stateRes.Rooms) + chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs)) + i := 0 + for roomID, data := range stateRes.Rooms { + pub := gomatrixserverlib.PublicRoom{ + RoomID: roomID, + } + joinCount := 0 + var joinRule, guestAccess string + for tuple, contentVal := range data { + if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" { + joinCount++ + continue + } + switch tuple { + case avatarTuple: + pub.AvatarURL = contentVal + case nameTuple: + pub.Name = contentVal + case topicTuple: + pub.Topic = contentVal + case canonicalTuple: + pub.CanonicalAlias = contentVal + case visibilityTuple: + pub.WorldReadable = contentVal == "world_readable" + // need both of these to determine whether guests can join + case joinRuleTuple: + joinRule = contentVal + case guestTuple: + guestAccess = contentVal + } + } + if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" { + pub.GuestCanJoin = true + } + pub.JoinedMembersCount = joinCount + chunk[i] = pub + i++ + } + return chunk, nil +} diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index 0afea7d04..cd97f2978 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -19,6 +19,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/jsonerror" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" @@ -52,6 +53,7 @@ func Setup( keys gomatrixserverlib.JSONVerifier, federation *gomatrixserverlib.FederationClient, userAPI userapi.UserInternalAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, ) { v2keysmux := publicAPIMux.PathPrefix(pathPrefixV2Keys).Subrouter() v1fedmux := publicAPIMux.PathPrefix(pathPrefixV1Federation).Subrouter() @@ -291,4 +293,10 @@ func Setup( return Backfill(httpReq, request, rsAPI, vars["roomID"], cfg) }, )).Methods(http.MethodGet) + + v1fedmux.Handle("/publicRooms", + httputil.MakeExternalAPI("federation_public_rooms", func(req *http.Request) util.JSONResponse { + return GetPostPublicRooms(req, rsAPI, stateAPI) + }), + ).Methods(http.MethodGet) } diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index 86275e28d..d4ae0915c 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -27,7 +27,6 @@ import ( "github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/mediaapi" - "github.com/matrix-org/dendrite/publicroomsapi" "github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/publicroomsapi/types" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -81,13 +80,14 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) { federationapi.AddPublicRoutes( publicMux, m.Config, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI, - m.EDUInternalAPI, + m.EDUInternalAPI, m.StateAPI, ) mediaapi.AddPublicRoutes(publicMux, m.Config, m.UserAPI, m.Client) - publicroomsapi.AddPublicRoutes( - publicMux, m.Config, m.KafkaConsumer, m.UserAPI, m.PublicRoomsDB, m.RoomserverAPI, m.FedClient, - m.ExtPublicRoomsProvider, - ) + /* + publicroomsapi.AddPublicRoutes( + publicMux, m.Config, m.KafkaConsumer, m.UserAPI, m.PublicRoomsDB, m.RoomserverAPI, m.FedClient, + m.ExtPublicRoomsProvider, + ) */ syncapi.AddPublicRoutes( publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config, )