Pinecone room directory support (early)

This commit is contained in:
Neil Alexander 2020-11-26 15:31:07 +00:00
parent 21c62ff489
commit aafd9418a6
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 160 additions and 9 deletions

View file

@ -18,6 +18,7 @@ import (
"github.com/hjson/hjson-go" "github.com/hjson/hjson-go"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
@ -223,7 +224,7 @@ func (m *DendriteMonolith) Start() {
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
UserAPI: userAPI, UserAPI: userAPI,
KeyAPI: keyAPI, KeyAPI: keyAPI,
ExtPublicRoomsProvider: nil, ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pSwitch, pRouter, pQUIC, fsAPI, federation),
} }
monolith.AddAllPublicRoutes( monolith.AddAllPublicRoutes(
base.PublicClientAPIMux, base.PublicClientAPIMux,

View file

@ -36,12 +36,12 @@ func (h mimeFixingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
func Embed(rootMux *mux.Router, listenPort int, serverName string) { func Embed(rootMux *mux.Router, listenPort int, serverName string) {
url := fmt.Sprintf("http://localhost:%d", listenPort)
embeddedFS := _escFS(false) embeddedFS := _escFS(false)
embeddedServ := mimeFixingHandler{http.FileServer(embeddedFS)} embeddedServ := mimeFixingHandler{http.FileServer(embeddedFS)}
rootMux.NotFoundHandler = embeddedServ rootMux.NotFoundHandler = embeddedServ
rootMux.HandleFunc("/config.json", func(w http.ResponseWriter, _ *http.Request) { rootMux.HandleFunc("/config.json", func(w http.ResponseWriter, r *http.Request) {
url := fmt.Sprintf("http://%s:%d", r.Header("Host"), listenPort)
configFile, err := embeddedFS.Open("/config.sample.json") configFile, err := embeddedFS.Open("/config.sample.json")
if err != nil { if err != nil {
w.WriteHeader(500) w.WriteHeader(500)

View file

@ -32,6 +32,7 @@ import (
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
@ -174,6 +175,7 @@ func main() {
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
UserAPI: userAPI, UserAPI: userAPI,
KeyAPI: keyAPI, KeyAPI: keyAPI,
ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pSwitch, pRouter, pQUIC, fsAPI, federation),
} }
monolith.AddAllPublicRoutes( monolith.AddAllPublicRoutes(
base.PublicClientAPIMux, base.PublicClientAPIMux,

View file

@ -0,0 +1,148 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rooms
import (
"context"
"crypto/ed25519"
"encoding/hex"
"sync"
"time"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
pineconeSwitch "github.com/matrix-org/pinecone/packetswitch"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
)
type PineconeRoomProvider struct {
s *pineconeSwitch.Switch
r *pineconeRouter.Router
q *pineconeSessions.QUIC
fedSender api.FederationSenderInternalAPI
fedClient *gomatrixserverlib.FederationClient
}
func NewPineconeRoomProvider(
s *pineconeSwitch.Switch,
r *pineconeRouter.Router,
q *pineconeSessions.QUIC,
fedSender api.FederationSenderInternalAPI,
fedClient *gomatrixserverlib.FederationClient,
) *PineconeRoomProvider {
p := &PineconeRoomProvider{
s: s,
r: r,
q: q,
fedSender: fedSender,
fedClient: fedClient,
}
return p
}
func (p *PineconeRoomProvider) Rooms() []gomatrixserverlib.PublicRoom {
known := []ed25519.PublicKey{}
known = append(known, p.s.KnownNodes()...)
if successor := p.r.DHTSuccessor(); successor != nil {
known = append(known, successor)
}
if predecessor := p.r.DHTPredecessor(); predecessor != nil {
known = append(known, predecessor)
}
known = append(known, p.q.Sessions()...)
list := []gomatrixserverlib.ServerName{}
for _, k := range known {
if len(k) == ed25519.PublicKeySize {
list = append(list, gomatrixserverlib.ServerName(hex.EncodeToString(k)))
}
}
return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, list)
}
// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers.
// Returns a list of public rooms.
func bulkFetchPublicRoomsFromServers(
ctx context.Context, fedClient *gomatrixserverlib.FederationClient,
homeservers []gomatrixserverlib.ServerName,
) (publicRooms []gomatrixserverlib.PublicRoom) {
limit := 200
// follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
// goroutines send rooms to this channel
roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit))
// signalling channel to tell goroutines to stop sending rooms and quit
done := make(chan bool)
// signalling to say when we can close the room channel
var wg sync.WaitGroup
wg.Add(len(homeservers))
// concurrently query for public rooms
for _, hs := range homeservers {
go func(homeserverDomain gomatrixserverlib.ServerName) {
defer wg.Done()
util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
fres, err := fedClient.GetPublicRooms(ctx, homeserverDomain, int(limit), "", false, "")
if err != nil {
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn(
"bulkFetchPublicRoomsFromServers: failed to query hs",
)
return
}
for _, room := range fres.Chunk {
// atomically send a room or stop
select {
case roomCh <- room:
case <-done:
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
return
}
}
}(hs)
}
// Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request.
// This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be
// closed.
go func() {
wg.Wait()
util.GetLogger(ctx).Info("Cleaning up resources")
close(roomCh)
}()
// fan-in results with timeout. We stop when we reach the limit.
FanIn:
for len(publicRooms) < int(limit) || limit == 0 {
// add a room or timeout
select {
case room, ok := <-roomCh:
if !ok {
util.GetLogger(ctx).Info("All homeservers have been queried, returning results.")
break FanIn
}
publicRooms = append(publicRooms, room)
case <-time.After(5 * time.Second): // we've waited long enough, let's tell the client what we got.
util.GetLogger(ctx).Info("Waited 5s for federated public rooms, returning early")
break FanIn
case <-ctx.Done(): // the client hung up on us, let's stop.
util.GetLogger(ctx).Info("Client hung up, returning early")
break FanIn
}
}
// tell goroutines to stop
close(done)
return publicRooms
}