mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 05:43:09 -06:00
Make dendritejs work again
This commit is contained in:
parent
cd3da772de
commit
31ee8c9201
|
|
@ -5,7 +5,7 @@
|
||||||
# $ docker build -t dendritejs -f DendriteJS.Dockerfile .
|
# $ docker build -t dendritejs -f DendriteJS.Dockerfile .
|
||||||
# $ docker run --rm -p 8888:80 dendritejs
|
# $ docker run --rm -p 8888:80 dendritejs
|
||||||
# Then visit http://localhost:8888
|
# Then visit http://localhost:8888
|
||||||
FROM golang:1.13.7-alpine3.11 AS gobuild
|
FROM golang:1.14-alpine AS gobuild
|
||||||
|
|
||||||
# Download and build dendrite
|
# Download and build dendrite
|
||||||
WORKDIR /build
|
WORKDIR /build
|
||||||
|
|
|
||||||
|
|
@ -67,146 +67,6 @@ func GetPostPublicRooms(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPostPublicRoomsWithExternal is the same as GetPostPublicRooms but also mixes in public rooms from the provider supplied.
|
|
||||||
func GetPostPublicRoomsWithExternal(
|
|
||||||
req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
|
|
||||||
fedClient *gomatrixserverlib.FederationClient, extRoomsProvider api.ExtraPublicRoomsProvider,
|
|
||||||
) util.JSONResponse {
|
|
||||||
var request PublicRoomReq
|
|
||||||
if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil {
|
|
||||||
return *fillErr
|
|
||||||
}
|
|
||||||
response, err := publicRooms(req.Context(), request, rsAPI, stateAPI, extRoomsProvider)
|
|
||||||
if err != nil {
|
|
||||||
return jsonerror.InternalServerError()
|
|
||||||
}
|
|
||||||
|
|
||||||
if request.Since != "" {
|
|
||||||
// TODO: handle pagination tokens sensibly rather than ignoring them.
|
|
||||||
// ignore paginated requests since we don't handle them yet over federation.
|
|
||||||
// Only the initial request will contain federated rooms.
|
|
||||||
return util.JSONResponse{
|
|
||||||
Code: http.StatusOK,
|
|
||||||
JSON: response,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have already hit the limit on the number of rooms, bail.
|
|
||||||
var limit int
|
|
||||||
if request.Limit > 0 {
|
|
||||||
limit = int(request.Limit) - len(response.Chunk)
|
|
||||||
if limit <= 0 {
|
|
||||||
return util.JSONResponse{
|
|
||||||
Code: http.StatusOK,
|
|
||||||
JSON: response,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// downcasting `limit` is safe as we know it isn't bigger than request.Limit which is int16
|
|
||||||
//fedRooms := bulkFetchPublicRoomsFromServers(req.Context(), fedClient, extRoomsProvider.Homeservers(), int16(limit))
|
|
||||||
//response.Chunk = append(response.Chunk, fedRooms...)
|
|
||||||
|
|
||||||
// de-duplicate rooms with the same room ID. We can join the room via any of these aliases as we know these servers
|
|
||||||
// are alive and well, so we arbitrarily pick one (purposefully shuffling them to spread the load a bit)
|
|
||||||
var publicRooms []gomatrixserverlib.PublicRoom
|
|
||||||
haveRoomIDs := make(map[string]bool)
|
|
||||||
rand.Shuffle(len(response.Chunk), func(i, j int) {
|
|
||||||
response.Chunk[i], response.Chunk[j] = response.Chunk[j], response.Chunk[i]
|
|
||||||
})
|
|
||||||
for _, r := range response.Chunk {
|
|
||||||
if haveRoomIDs[r.RoomID] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
haveRoomIDs[r.RoomID] = true
|
|
||||||
publicRooms = append(publicRooms, r)
|
|
||||||
}
|
|
||||||
// sort by member count
|
|
||||||
sort.SliceStable(publicRooms, func(i, j int) bool {
|
|
||||||
return publicRooms[i].JoinedMembersCount > publicRooms[j].JoinedMembersCount
|
|
||||||
})
|
|
||||||
|
|
||||||
response.Chunk = publicRooms
|
|
||||||
|
|
||||||
return util.JSONResponse{
|
|
||||||
Code: http.StatusOK,
|
|
||||||
JSON: response,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers.
|
|
||||||
// Returns a list of public rooms up to the limit specified.
|
|
||||||
func bulkFetchPublicRoomsFromServers(
|
|
||||||
ctx context.Context, fedClient *gomatrixserverlib.FederationClient, homeservers []string, limit int16,
|
|
||||||
) (publicRooms []gomatrixserverlib.PublicRoom) {
|
|
||||||
// follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
|
|
||||||
// goroutines send rooms to this channel
|
|
||||||
roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit))
|
|
||||||
// signalling channel to tell goroutines to stop sending rooms and quit
|
|
||||||
done := make(chan bool)
|
|
||||||
// signalling to say when we can close the room channel
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(len(homeservers))
|
|
||||||
// concurrently query for public rooms
|
|
||||||
for _, hs := range homeservers {
|
|
||||||
go func(homeserverDomain string) {
|
|
||||||
defer wg.Done()
|
|
||||||
util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
|
|
||||||
fres, err := fedClient.GetPublicRooms(ctx, gomatrixserverlib.ServerName(homeserverDomain), int(limit), "", false, "")
|
|
||||||
if err != nil {
|
|
||||||
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn(
|
|
||||||
"bulkFetchPublicRoomsFromServers: failed to query hs",
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, room := range fres.Chunk {
|
|
||||||
// atomically send a room or stop
|
|
||||||
select {
|
|
||||||
case roomCh <- room:
|
|
||||||
case <-done:
|
|
||||||
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(hs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request.
|
|
||||||
// This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be
|
|
||||||
// closed.
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
util.GetLogger(ctx).Info("Cleaning up resources")
|
|
||||||
close(roomCh)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// fan-in results with timeout. We stop when we reach the limit.
|
|
||||||
FanIn:
|
|
||||||
for len(publicRooms) < int(limit) || limit == 0 {
|
|
||||||
// add a room or timeout
|
|
||||||
select {
|
|
||||||
case room, ok := <-roomCh:
|
|
||||||
if !ok {
|
|
||||||
util.GetLogger(ctx).Info("All homeservers have been queried, returning results.")
|
|
||||||
break FanIn
|
|
||||||
}
|
|
||||||
publicRooms = append(publicRooms, room)
|
|
||||||
case <-time.After(15 * time.Second): // we've waited long enough, let's tell the client what we got.
|
|
||||||
util.GetLogger(ctx).Info("Waited 15s for federated public rooms, returning early")
|
|
||||||
break FanIn
|
|
||||||
case <-ctx.Done(): // the client hung up on us, let's stop.
|
|
||||||
util.GetLogger(ctx).Info("Client hung up, returning early")
|
|
||||||
break FanIn
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// tell goroutines to stop
|
|
||||||
close(done)
|
|
||||||
|
|
||||||
return publicRooms
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI,
|
func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
stateAPI currentstateAPI.CurrentStateInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
|
stateAPI currentstateAPI.CurrentStateInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
|
||||||
) (*gomatrixserverlib.RespPublicRooms, error) {
|
) (*gomatrixserverlib.RespPublicRooms, error) {
|
||||||
|
|
@ -408,6 +268,8 @@ func refreshPublicRoomCache(
|
||||||
publicRoomsCache = []gomatrixserverlib.PublicRoom{}
|
publicRoomsCache = []gomatrixserverlib.PublicRoom{}
|
||||||
publicRoomsCache = append(publicRoomsCache, pubRooms...)
|
publicRoomsCache = append(publicRoomsCache, pubRooms...)
|
||||||
publicRoomsCache = append(publicRoomsCache, extraRooms...)
|
publicRoomsCache = append(publicRoomsCache, extraRooms...)
|
||||||
|
publicRoomsCache = dedupeAndShuffle(publicRoomsCache)
|
||||||
|
|
||||||
// sort by total joined member count (big to small)
|
// sort by total joined member count (big to small)
|
||||||
sort.SliceStable(publicRoomsCache, func(i, j int) bool {
|
sort.SliceStable(publicRoomsCache, func(i, j int) bool {
|
||||||
return publicRoomsCache[i].JoinedMembersCount > publicRoomsCache[j].JoinedMembersCount
|
return publicRoomsCache[i].JoinedMembersCount > publicRoomsCache[j].JoinedMembersCount
|
||||||
|
|
@ -420,3 +282,21 @@ func getPublicRoomsFromCache() []gomatrixserverlib.PublicRoom {
|
||||||
defer cacheMu.Unlock()
|
defer cacheMu.Unlock()
|
||||||
return publicRoomsCache
|
return publicRoomsCache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func dedupeAndShuffle(in []gomatrixserverlib.PublicRoom) []gomatrixserverlib.PublicRoom {
|
||||||
|
// de-duplicate rooms with the same room ID. We can join the room via any of these aliases as we know these servers
|
||||||
|
// are alive and well, so we arbitrarily pick one (purposefully shuffling them to spread the load a bit)
|
||||||
|
var publicRooms []gomatrixserverlib.PublicRoom
|
||||||
|
haveRoomIDs := make(map[string]bool)
|
||||||
|
rand.Shuffle(len(in), func(i, j int) {
|
||||||
|
in[i], in[j] = in[j], in[i]
|
||||||
|
})
|
||||||
|
for _, r := range in {
|
||||||
|
if haveRoomIDs[r.RoomID] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
haveRoomIDs[r.RoomID] = true
|
||||||
|
publicRooms = append(publicRooms, r)
|
||||||
|
}
|
||||||
|
return publicRooms
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -211,7 +211,7 @@ func main() {
|
||||||
)
|
)
|
||||||
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing)
|
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing)
|
||||||
rsAPI.SetFederationSenderAPI(fedSenderAPI)
|
rsAPI.SetFederationSenderAPI(fedSenderAPI)
|
||||||
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI)
|
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
|
||||||
|
|
||||||
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
|
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,22 +18,29 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/api"
|
"github.com/matrix-org/dendrite/federationsender/api"
|
||||||
go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p"
|
go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type libp2pPublicRoomsProvider struct {
|
type libp2pPublicRoomsProvider struct {
|
||||||
node *go_http_js_libp2p.P2pLocalNode
|
node *go_http_js_libp2p.P2pLocalNode
|
||||||
providers []go_http_js_libp2p.PeerInfo
|
providers []go_http_js_libp2p.PeerInfo
|
||||||
fedSender api.FederationSenderInternalAPI
|
fedSender api.FederationSenderInternalAPI
|
||||||
|
fedClient *gomatrixserverlib.FederationClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLibP2PPublicRoomsProvider(node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI) *libp2pPublicRoomsProvider {
|
func NewLibP2PPublicRoomsProvider(
|
||||||
|
node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI, fedClient *gomatrixserverlib.FederationClient,
|
||||||
|
) *libp2pPublicRoomsProvider {
|
||||||
p := &libp2pPublicRoomsProvider{
|
p := &libp2pPublicRoomsProvider{
|
||||||
node: node,
|
node: node,
|
||||||
fedSender: fedSender,
|
fedSender: fedSender,
|
||||||
|
fedClient: fedClient,
|
||||||
}
|
}
|
||||||
node.RegisterFoundProviders(p.foundProviders)
|
node.RegisterFoundProviders(p.foundProviders)
|
||||||
return p
|
return p
|
||||||
|
|
@ -63,13 +70,85 @@ func (p *libp2pPublicRoomsProvider) foundProviders(peerInfos []go_http_js_libp2p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *libp2pPublicRoomsProvider) Rooms() []gomatrixserverlib.PublicRoom {
|
func (p *libp2pPublicRoomsProvider) Rooms() []gomatrixserverlib.PublicRoom {
|
||||||
return nil
|
return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, p.homeservers())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *libp2pPublicRoomsProvider) Homeservers() []string {
|
func (p *libp2pPublicRoomsProvider) homeservers() []string {
|
||||||
result := make([]string, len(p.providers))
|
result := make([]string, len(p.providers))
|
||||||
for i := range p.providers {
|
for i := range p.providers {
|
||||||
result[i] = p.providers[i].Id
|
result[i] = p.providers[i].Id
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers.
|
||||||
|
// Returns a list of public rooms.
|
||||||
|
func bulkFetchPublicRoomsFromServers(
|
||||||
|
ctx context.Context, fedClient *gomatrixserverlib.FederationClient, homeservers []string,
|
||||||
|
) (publicRooms []gomatrixserverlib.PublicRoom) {
|
||||||
|
limit := 200
|
||||||
|
// follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
|
||||||
|
// goroutines send rooms to this channel
|
||||||
|
roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit))
|
||||||
|
// signalling channel to tell goroutines to stop sending rooms and quit
|
||||||
|
done := make(chan bool)
|
||||||
|
// signalling to say when we can close the room channel
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(homeservers))
|
||||||
|
// concurrently query for public rooms
|
||||||
|
for _, hs := range homeservers {
|
||||||
|
go func(homeserverDomain string) {
|
||||||
|
defer wg.Done()
|
||||||
|
util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
|
||||||
|
fres, err := fedClient.GetPublicRooms(ctx, gomatrixserverlib.ServerName(homeserverDomain), int(limit), "", false, "")
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn(
|
||||||
|
"bulkFetchPublicRoomsFromServers: failed to query hs",
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, room := range fres.Chunk {
|
||||||
|
// atomically send a room or stop
|
||||||
|
select {
|
||||||
|
case roomCh <- room:
|
||||||
|
case <-done:
|
||||||
|
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(hs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request.
|
||||||
|
// This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be
|
||||||
|
// closed.
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
util.GetLogger(ctx).Info("Cleaning up resources")
|
||||||
|
close(roomCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// fan-in results with timeout. We stop when we reach the limit.
|
||||||
|
FanIn:
|
||||||
|
for len(publicRooms) < int(limit) || limit == 0 {
|
||||||
|
// add a room or timeout
|
||||||
|
select {
|
||||||
|
case room, ok := <-roomCh:
|
||||||
|
if !ok {
|
||||||
|
util.GetLogger(ctx).Info("All homeservers have been queried, returning results.")
|
||||||
|
break FanIn
|
||||||
|
}
|
||||||
|
publicRooms = append(publicRooms, room)
|
||||||
|
case <-time.After(15 * time.Second): // we've waited long enough, let's tell the client what we got.
|
||||||
|
util.GetLogger(ctx).Info("Waited 15s for federated public rooms, returning early")
|
||||||
|
break FanIn
|
||||||
|
case <-ctx.Done(): // the client hung up on us, let's stop.
|
||||||
|
util.GetLogger(ctx).Info("Client hung up, returning early")
|
||||||
|
break FanIn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// tell goroutines to stop
|
||||||
|
close(done)
|
||||||
|
|
||||||
|
return publicRooms
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue