From 31ee8c9201758f7acc4fbb7c3b0b112a7d5fccf4 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 3 Jul 2020 11:15:17 +0100 Subject: [PATCH] Make dendritejs work again --- build/docker/DendriteJS.Dockerfile | 2 +- clientapi/routing/directory_public.go | 160 ++++---------------------- cmd/dendritejs/main.go | 2 +- cmd/dendritejs/publicrooms.go | 85 +++++++++++++- 4 files changed, 104 insertions(+), 145 deletions(-) diff --git a/build/docker/DendriteJS.Dockerfile b/build/docker/DendriteJS.Dockerfile index 4467c9c70..90391c45c 100644 --- a/build/docker/DendriteJS.Dockerfile +++ b/build/docker/DendriteJS.Dockerfile @@ -5,7 +5,7 @@ # $ docker build -t dendritejs -f DendriteJS.Dockerfile . # $ docker run --rm -p 8888:80 dendritejs # Then visit http://localhost:8888 -FROM golang:1.13.7-alpine3.11 AS gobuild +FROM golang:1.14-alpine AS gobuild # Download and build dendrite WORKDIR /build diff --git a/clientapi/routing/directory_public.go b/clientapi/routing/directory_public.go index 366f89e80..de980792e 100644 --- a/clientapi/routing/directory_public.go +++ b/clientapi/routing/directory_public.go @@ -67,146 +67,6 @@ func GetPostPublicRooms( } } -// GetPostPublicRoomsWithExternal is the same as GetPostPublicRooms but also mixes in public rooms from the provider supplied. -func GetPostPublicRoomsWithExternal( - req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, - fedClient *gomatrixserverlib.FederationClient, extRoomsProvider api.ExtraPublicRoomsProvider, -) util.JSONResponse { - var request PublicRoomReq - if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil { - return *fillErr - } - response, err := publicRooms(req.Context(), request, rsAPI, stateAPI, extRoomsProvider) - if err != nil { - return jsonerror.InternalServerError() - } - - if request.Since != "" { - // TODO: handle pagination tokens sensibly rather than ignoring them. - // ignore paginated requests since we don't handle them yet over federation. - // Only the initial request will contain federated rooms. - return util.JSONResponse{ - Code: http.StatusOK, - JSON: response, - } - } - - // If we have already hit the limit on the number of rooms, bail. - var limit int - if request.Limit > 0 { - limit = int(request.Limit) - len(response.Chunk) - if limit <= 0 { - return util.JSONResponse{ - Code: http.StatusOK, - JSON: response, - } - } - } - - // downcasting `limit` is safe as we know it isn't bigger than request.Limit which is int16 - //fedRooms := bulkFetchPublicRoomsFromServers(req.Context(), fedClient, extRoomsProvider.Homeservers(), int16(limit)) - //response.Chunk = append(response.Chunk, fedRooms...) - - // de-duplicate rooms with the same room ID. We can join the room via any of these aliases as we know these servers - // are alive and well, so we arbitrarily pick one (purposefully shuffling them to spread the load a bit) - var publicRooms []gomatrixserverlib.PublicRoom - haveRoomIDs := make(map[string]bool) - rand.Shuffle(len(response.Chunk), func(i, j int) { - response.Chunk[i], response.Chunk[j] = response.Chunk[j], response.Chunk[i] - }) - for _, r := range response.Chunk { - if haveRoomIDs[r.RoomID] { - continue - } - haveRoomIDs[r.RoomID] = true - publicRooms = append(publicRooms, r) - } - // sort by member count - sort.SliceStable(publicRooms, func(i, j int) bool { - return publicRooms[i].JoinedMembersCount > publicRooms[j].JoinedMembersCount - }) - - response.Chunk = publicRooms - - return util.JSONResponse{ - Code: http.StatusOK, - JSON: response, - } -} - -/* -// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers. -// Returns a list of public rooms up to the limit specified. -func bulkFetchPublicRoomsFromServers( - ctx context.Context, fedClient *gomatrixserverlib.FederationClient, homeservers []string, limit int16, -) (publicRooms []gomatrixserverlib.PublicRoom) { - // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. - // goroutines send rooms to this channel - roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit)) - // signalling channel to tell goroutines to stop sending rooms and quit - done := make(chan bool) - // signalling to say when we can close the room channel - var wg sync.WaitGroup - wg.Add(len(homeservers)) - // concurrently query for public rooms - for _, hs := range homeservers { - go func(homeserverDomain string) { - defer wg.Done() - util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms") - fres, err := fedClient.GetPublicRooms(ctx, gomatrixserverlib.ServerName(homeserverDomain), int(limit), "", false, "") - if err != nil { - util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn( - "bulkFetchPublicRoomsFromServers: failed to query hs", - ) - return - } - for _, room := range fres.Chunk { - // atomically send a room or stop - select { - case roomCh <- room: - case <-done: - util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms") - return - } - } - }(hs) - } - - // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request. - // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be - // closed. - go func() { - wg.Wait() - util.GetLogger(ctx).Info("Cleaning up resources") - close(roomCh) - }() - - // fan-in results with timeout. We stop when we reach the limit. -FanIn: - for len(publicRooms) < int(limit) || limit == 0 { - // add a room or timeout - select { - case room, ok := <-roomCh: - if !ok { - util.GetLogger(ctx).Info("All homeservers have been queried, returning results.") - break FanIn - } - publicRooms = append(publicRooms, room) - case <-time.After(15 * time.Second): // we've waited long enough, let's tell the client what we got. - util.GetLogger(ctx).Info("Waited 15s for federated public rooms, returning early") - break FanIn - case <-ctx.Done(): // the client hung up on us, let's stop. - util.GetLogger(ctx).Info("Client hung up, returning early") - break FanIn - } - } - // tell goroutines to stop - close(done) - - return publicRooms -} -*/ - func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, ) (*gomatrixserverlib.RespPublicRooms, error) { @@ -408,6 +268,8 @@ func refreshPublicRoomCache( publicRoomsCache = []gomatrixserverlib.PublicRoom{} publicRoomsCache = append(publicRoomsCache, pubRooms...) publicRoomsCache = append(publicRoomsCache, extraRooms...) + publicRoomsCache = dedupeAndShuffle(publicRoomsCache) + // sort by total joined member count (big to small) sort.SliceStable(publicRoomsCache, func(i, j int) bool { return publicRoomsCache[i].JoinedMembersCount > publicRoomsCache[j].JoinedMembersCount @@ -420,3 +282,21 @@ func getPublicRoomsFromCache() []gomatrixserverlib.PublicRoom { defer cacheMu.Unlock() return publicRoomsCache } + +func dedupeAndShuffle(in []gomatrixserverlib.PublicRoom) []gomatrixserverlib.PublicRoom { + // de-duplicate rooms with the same room ID. We can join the room via any of these aliases as we know these servers + // are alive and well, so we arbitrarily pick one (purposefully shuffling them to spread the load a bit) + var publicRooms []gomatrixserverlib.PublicRoom + haveRoomIDs := make(map[string]bool) + rand.Shuffle(len(in), func(i, j int) { + in[i], in[j] = in[j], in[i] + }) + for _, r := range in { + if haveRoomIDs[r.RoomID] { + continue + } + haveRoomIDs[r.RoomID] = true + publicRooms = append(publicRooms, r) + } + return publicRooms +} diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 6e2bdafec..1443bc182 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -211,7 +211,7 @@ func main() { ) fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing) rsAPI.SetFederationSenderAPI(fedSenderAPI) - p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI) + p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) diff --git a/cmd/dendritejs/publicrooms.go b/cmd/dendritejs/publicrooms.go index f339f71ce..a4623ba32 100644 --- a/cmd/dendritejs/publicrooms.go +++ b/cmd/dendritejs/publicrooms.go @@ -18,22 +18,29 @@ package main import ( "context" + "sync" + "time" "github.com/matrix-org/dendrite/federationsender/api" go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" ) type libp2pPublicRoomsProvider struct { node *go_http_js_libp2p.P2pLocalNode providers []go_http_js_libp2p.PeerInfo fedSender api.FederationSenderInternalAPI + fedClient *gomatrixserverlib.FederationClient } -func NewLibP2PPublicRoomsProvider(node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI) *libp2pPublicRoomsProvider { +func NewLibP2PPublicRoomsProvider( + node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI, fedClient *gomatrixserverlib.FederationClient, +) *libp2pPublicRoomsProvider { p := &libp2pPublicRoomsProvider{ node: node, fedSender: fedSender, + fedClient: fedClient, } node.RegisterFoundProviders(p.foundProviders) return p @@ -63,13 +70,85 @@ func (p *libp2pPublicRoomsProvider) foundProviders(peerInfos []go_http_js_libp2p } func (p *libp2pPublicRoomsProvider) Rooms() []gomatrixserverlib.PublicRoom { - return nil + return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, p.homeservers()) } -func (p *libp2pPublicRoomsProvider) Homeservers() []string { +func (p *libp2pPublicRoomsProvider) homeservers() []string { result := make([]string, len(p.providers)) for i := range p.providers { result[i] = p.providers[i].Id } return result } + +// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers. +// Returns a list of public rooms. +func bulkFetchPublicRoomsFromServers( + ctx context.Context, fedClient *gomatrixserverlib.FederationClient, homeservers []string, +) (publicRooms []gomatrixserverlib.PublicRoom) { + limit := 200 + // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. + // goroutines send rooms to this channel + roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit)) + // signalling channel to tell goroutines to stop sending rooms and quit + done := make(chan bool) + // signalling to say when we can close the room channel + var wg sync.WaitGroup + wg.Add(len(homeservers)) + // concurrently query for public rooms + for _, hs := range homeservers { + go func(homeserverDomain string) { + defer wg.Done() + util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms") + fres, err := fedClient.GetPublicRooms(ctx, gomatrixserverlib.ServerName(homeserverDomain), int(limit), "", false, "") + if err != nil { + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchPublicRoomsFromServers: failed to query hs", + ) + return + } + for _, room := range fres.Chunk { + // atomically send a room or stop + select { + case roomCh <- room: + case <-done: + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms") + return + } + } + }(hs) + } + + // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request. + // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be + // closed. + go func() { + wg.Wait() + util.GetLogger(ctx).Info("Cleaning up resources") + close(roomCh) + }() + + // fan-in results with timeout. We stop when we reach the limit. +FanIn: + for len(publicRooms) < int(limit) || limit == 0 { + // add a room or timeout + select { + case room, ok := <-roomCh: + if !ok { + util.GetLogger(ctx).Info("All homeservers have been queried, returning results.") + break FanIn + } + publicRooms = append(publicRooms, room) + case <-time.After(15 * time.Second): // we've waited long enough, let's tell the client what we got. + util.GetLogger(ctx).Info("Waited 15s for federated public rooms, returning early") + break FanIn + case <-ctx.Done(): // the client hung up on us, let's stop. + util.GetLogger(ctx).Info("Client hung up, returning early") + break FanIn + } + } + // tell goroutines to stop + close(done) + + return publicRooms +}