diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index b8d014ed3..c9805f257 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -18,6 +18,7 @@ import ( "github.com/hjson/hjson-go" "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" @@ -223,7 +224,7 @@ func (m *DendriteMonolith) Start() { RoomserverAPI: rsAPI, UserAPI: userAPI, KeyAPI: keyAPI, - ExtPublicRoomsProvider: nil, + ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pSwitch, pRouter, pQUIC, fsAPI, federation), } monolith.AddAllPublicRoutes( base.PublicClientAPIMux, diff --git a/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go b/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go index 9ee4e626b..d25745ca6 100644 --- a/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go +++ b/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go @@ -36,12 +36,12 @@ func (h mimeFixingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func Embed(rootMux *mux.Router, listenPort int, serverName string) { - url := fmt.Sprintf("http://localhost:%d", listenPort) embeddedFS := _escFS(false) embeddedServ := mimeFixingHandler{http.FileServer(embeddedFS)} rootMux.NotFoundHandler = embeddedServ - rootMux.HandleFunc("/config.json", func(w http.ResponseWriter, _ *http.Request) { + rootMux.HandleFunc("/config.json", func(w http.ResponseWriter, r *http.Request) { + url := fmt.Sprintf("http://%s:%d", r.Header("Host"), listenPort) configFile, err := embeddedFS.Open("/config.sample.json") if err != nil { w.WriteHeader(500) diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 09529f790..9d7e9e811 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -32,6 +32,7 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" @@ -168,12 +169,13 @@ func main() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationSenderAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationSenderAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, + ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pSwitch, pRouter, pQUIC, fsAPI, federation), } monolith.AddAllPublicRoutes( base.PublicClientAPIMux, diff --git a/cmd/dendrite-demo-pinecone/rooms/rooms.go b/cmd/dendrite-demo-pinecone/rooms/rooms.go new file mode 100644 index 000000000..a4a898439 --- /dev/null +++ b/cmd/dendrite-demo-pinecone/rooms/rooms.go @@ -0,0 +1,148 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rooms + +import ( + "context" + "crypto/ed25519" + "encoding/hex" + "sync" + "time" + + "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + + pineconeSwitch "github.com/matrix-org/pinecone/packetswitch" + pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeSessions "github.com/matrix-org/pinecone/sessions" +) + +type PineconeRoomProvider struct { + s *pineconeSwitch.Switch + r *pineconeRouter.Router + q *pineconeSessions.QUIC + fedSender api.FederationSenderInternalAPI + fedClient *gomatrixserverlib.FederationClient +} + +func NewPineconeRoomProvider( + s *pineconeSwitch.Switch, + r *pineconeRouter.Router, + q *pineconeSessions.QUIC, + fedSender api.FederationSenderInternalAPI, + fedClient *gomatrixserverlib.FederationClient, +) *PineconeRoomProvider { + p := &PineconeRoomProvider{ + s: s, + r: r, + q: q, + fedSender: fedSender, + fedClient: fedClient, + } + return p +} + +func (p *PineconeRoomProvider) Rooms() []gomatrixserverlib.PublicRoom { + known := []ed25519.PublicKey{} + known = append(known, p.s.KnownNodes()...) + if successor := p.r.DHTSuccessor(); successor != nil { + known = append(known, successor) + } + if predecessor := p.r.DHTPredecessor(); predecessor != nil { + known = append(known, predecessor) + } + known = append(known, p.q.Sessions()...) + list := []gomatrixserverlib.ServerName{} + for _, k := range known { + if len(k) == ed25519.PublicKeySize { + list = append(list, gomatrixserverlib.ServerName(hex.EncodeToString(k))) + } + } + return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, list) +} + +// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers. +// Returns a list of public rooms. +func bulkFetchPublicRoomsFromServers( + ctx context.Context, fedClient *gomatrixserverlib.FederationClient, + homeservers []gomatrixserverlib.ServerName, +) (publicRooms []gomatrixserverlib.PublicRoom) { + limit := 200 + // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. + // goroutines send rooms to this channel + roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit)) + // signalling channel to tell goroutines to stop sending rooms and quit + done := make(chan bool) + // signalling to say when we can close the room channel + var wg sync.WaitGroup + wg.Add(len(homeservers)) + // concurrently query for public rooms + for _, hs := range homeservers { + go func(homeserverDomain gomatrixserverlib.ServerName) { + defer wg.Done() + util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms") + fres, err := fedClient.GetPublicRooms(ctx, homeserverDomain, int(limit), "", false, "") + if err != nil { + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchPublicRoomsFromServers: failed to query hs", + ) + return + } + for _, room := range fres.Chunk { + // atomically send a room or stop + select { + case roomCh <- room: + case <-done: + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms") + return + } + } + }(hs) + } + + // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request. + // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be + // closed. + go func() { + wg.Wait() + util.GetLogger(ctx).Info("Cleaning up resources") + close(roomCh) + }() + + // fan-in results with timeout. We stop when we reach the limit. +FanIn: + for len(publicRooms) < int(limit) || limit == 0 { + // add a room or timeout + select { + case room, ok := <-roomCh: + if !ok { + util.GetLogger(ctx).Info("All homeservers have been queried, returning results.") + break FanIn + } + publicRooms = append(publicRooms, room) + case <-time.After(5 * time.Second): // we've waited long enough, let's tell the client what we got. + util.GetLogger(ctx).Info("Waited 5s for federated public rooms, returning early") + break FanIn + case <-ctx.Done(): // the client hung up on us, let's stop. + util.GetLogger(ctx).Info("Client hung up, returning early") + break FanIn + } + } + // tell goroutines to stop + close(done) + + return publicRooms +}