Maybe make libp2p demo work again

This commit is contained in:
Kegan Dougal 2020-07-03 12:19:40 +01:00
parent 31ee8c9201
commit 9adff8a42d
4 changed files with 244 additions and 83 deletions

View file

@ -144,71 +144,6 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO
return nil return nil
} }
// due to lots of switches
// nolint:gocyclo
func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI.CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}
canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""}
topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""}
guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""}
visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}
joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}
var stateRes currentstateAPI.QueryBulkStateContentResponse
err := stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
RoomIDs: roomIDs,
AllowWildcards: true,
StateTuples: []gomatrixserverlib.StateKeyTuple{
nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple,
{EventType: gomatrixserverlib.MRoomMember, StateKey: "*"},
},
}, &stateRes)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed")
return nil, err
}
chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs))
i := 0
for roomID, data := range stateRes.Rooms {
pub := gomatrixserverlib.PublicRoom{
RoomID: roomID,
}
joinCount := 0
var joinRule, guestAccess string
for tuple, contentVal := range data {
if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" {
joinCount++
continue
}
switch tuple {
case avatarTuple:
pub.AvatarURL = contentVal
case nameTuple:
pub.Name = contentVal
case topicTuple:
pub.Topic = contentVal
case canonicalTuple:
pub.CanonicalAlias = contentVal
case visibilityTuple:
pub.WorldReadable = contentVal == "world_readable"
// need both of these to determine whether guests can join
case joinRuleTuple:
joinRule = contentVal
case guestTuple:
guestAccess = contentVal
}
}
if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" {
pub.GuestCanJoin = true
}
pub.JoinedMembersCount = joinCount
chunk[i] = pub
i++
}
return chunk, nil
}
// sliceInto returns a subslice of `slice` which honours the since/limit values given. // sliceInto returns a subslice of `slice` which honours the since/limit values given.
// //
// 0 1 2 3 4 5 6 index // 0 1 2 3 4 5 6 index
@ -260,9 +195,9 @@ func refreshPublicRoomCache(
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
return publicRoomsCache return publicRoomsCache
} }
pubRooms, err := fillInRooms(ctx, queryRes.RoomIDs, stateAPI) pubRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, stateAPI)
if err != nil { if err != nil {
util.GetLogger(ctx).WithError(err).Error("fillInRooms failed") util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed")
return publicRoomsCache return publicRoomsCache
} }
publicRoomsCache = []gomatrixserverlib.PublicRoom{} publicRoomsCache = []gomatrixserverlib.PublicRoom{}

View file

@ -1,6 +1,4 @@
// Copyright 2017 Vector Creations Ltd // Copyright 2020 The Matrix.org Foundation C.I.C.
// Copyright 2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -162,13 +160,9 @@ func main() {
&base.Base, federation, rsAPI, keyRing, &base.Base, federation, rsAPI, keyRing,
) )
rsAPI.SetFederationSenderAPI(fsAPI) rsAPI.SetFederationSenderAPI(fsAPI)
/* TODO:
publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
}
*/
stateAPI := currentstateserver.NewInternalAPI(base.Base.Cfg, base.Base.KafkaConsumer) stateAPI := currentstateserver.NewInternalAPI(base.Base.Cfg, base.Base.KafkaConsumer)
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI)
provider.Start()
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Base.Cfg, Config: base.Base.Cfg,
@ -180,13 +174,14 @@ func main() {
KafkaConsumer: base.Base.KafkaConsumer, KafkaConsumer: base.Base.KafkaConsumer,
KafkaProducer: base.Base.KafkaProducer, KafkaProducer: base.Base.KafkaProducer,
AppserviceAPI: asAPI, AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,
FederationSenderAPI: fsAPI, FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI, ServerKeyAPI: serverKeyAPI,
StateAPI: stateAPI, StateAPI: stateAPI,
UserAPI: userAPI, UserAPI: userAPI,
ExtPublicRoomsProvider: provider,
} }
monolith.AddAllPublicRoutes(base.Base.PublicAPIMux) monolith.AddAllPublicRoutes(base.Base.PublicAPIMux)

View file

@ -0,0 +1,156 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
const MaintenanceInterval = time.Second * 10
type discoveredRoom struct {
time time.Time
room gomatrixserverlib.PublicRoom
}
type publicRoomsProvider struct {
pubsub *pubsub.PubSub
topic *pubsub.Topic
subscription *pubsub.Subscription
foundRooms map[string]discoveredRoom // additional rooms we have learned about from the DHT
foundRoomsMutex sync.RWMutex // protects foundRooms
maintenanceTimer *time.Timer //
roomsAdvertised atomic.Value // stores int
rsAPI roomserverAPI.RoomserverInternalAPI
stateAPI currentstateAPI.CurrentStateInternalAPI
}
func newPublicRoomsProvider(ps *pubsub.PubSub, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI) *publicRoomsProvider {
return &publicRoomsProvider{
foundRooms: make(map[string]discoveredRoom),
pubsub: ps,
rsAPI: rsAPI,
stateAPI: stateAPI,
}
}
func (p *publicRoomsProvider) Start() error {
if topic, err := p.pubsub.Join("/matrix/publicRooms"); err != nil {
return err
} else if sub, err := topic.Subscribe(); err == nil {
p.topic = topic
p.subscription = sub
go p.MaintenanceTimer()
go p.FindRooms()
p.roomsAdvertised.Store(0)
} else {
return err
}
return nil
}
func (p *publicRoomsProvider) MaintenanceTimer() {
if p.maintenanceTimer != nil && !p.maintenanceTimer.Stop() {
<-p.maintenanceTimer.C
}
p.Interval()
}
func (p *publicRoomsProvider) Interval() {
p.foundRoomsMutex.Lock()
for k, v := range p.foundRooms {
if time.Since(v.time) > time.Minute {
delete(p.foundRooms, k)
}
}
p.foundRoomsMutex.Unlock()
if err := p.AdvertiseRooms(); err != nil {
fmt.Println("Failed to advertise room in DHT:", err)
}
p.foundRoomsMutex.RLock()
defer p.foundRoomsMutex.RUnlock()
fmt.Println("Found", len(p.foundRooms), "room(s), advertised", p.roomsAdvertised.Load(), "room(s)")
p.maintenanceTimer = time.AfterFunc(MaintenanceInterval, p.Interval)
}
func (p *publicRoomsProvider) AdvertiseRooms() error {
ctx := context.Background()
var queryRes roomserverAPI.QueryPublishedRoomsResponse
// Query published rooms on our server. This will not invoke clientapi.ExtraPublicRoomsProvider
err := p.rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
return err
}
ourRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.stateAPI)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed")
return err
}
advertised := 0
for _, room := range ourRooms {
if j, err := json.Marshal(room); err == nil {
if err := p.topic.Publish(context.TODO(), j); err != nil {
fmt.Println("Failed to publish public room:", err)
} else {
advertised++
}
}
}
p.roomsAdvertised.Store(advertised)
return nil
}
func (p *publicRoomsProvider) FindRooms() {
for {
msg, err := p.subscription.Next(context.Background())
if err != nil {
continue
}
received := discoveredRoom{
time: time.Now(),
}
if err := json.Unmarshal(msg.Data, &received.room); err != nil {
fmt.Println("Unmarshal error:", err)
continue
}
fmt.Printf("received %+v \n", received)
p.foundRoomsMutex.Lock()
p.foundRooms[received.room.RoomID] = received
p.foundRoomsMutex.Unlock()
}
}
func (p *publicRoomsProvider) Rooms() (rooms []gomatrixserverlib.PublicRoom) {
p.foundRoomsMutex.RLock()
defer p.foundRoomsMutex.RUnlock()
for _, dr := range p.foundRooms {
rooms = append(rooms, dr.room)
}
return
}

View file

@ -0,0 +1,75 @@
package api
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// PopulatePublicRooms extracts PublicRoom information for all the provided room IDs. The IDs are not checked to see if they are visible in the
// published room directory.
// due to lots of switches
// nolint:gocyclo
func PopulatePublicRooms(ctx context.Context, roomIDs []string, stateAPI CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}
canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""}
topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""}
guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""}
visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}
joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}
var stateRes QueryBulkStateContentResponse
err := stateAPI.QueryBulkStateContent(ctx, &QueryBulkStateContentRequest{
RoomIDs: roomIDs,
AllowWildcards: true,
StateTuples: []gomatrixserverlib.StateKeyTuple{
nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple,
{EventType: gomatrixserverlib.MRoomMember, StateKey: "*"},
},
}, &stateRes)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed")
return nil, err
}
chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs))
i := 0
for roomID, data := range stateRes.Rooms {
pub := gomatrixserverlib.PublicRoom{
RoomID: roomID,
}
joinCount := 0
var joinRule, guestAccess string
for tuple, contentVal := range data {
if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" {
joinCount++
continue
}
switch tuple {
case avatarTuple:
pub.AvatarURL = contentVal
case nameTuple:
pub.Name = contentVal
case topicTuple:
pub.Topic = contentVal
case canonicalTuple:
pub.CanonicalAlias = contentVal
case visibilityTuple:
pub.WorldReadable = contentVal == "world_readable"
// need both of these to determine whether guests can join
case joinRuleTuple:
joinRule = contentVal
case guestTuple:
guestAccess = contentVal
}
}
if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" {
pub.GuestCanJoin = true
}
pub.JoinedMembersCount = joinCount
chunk[i] = pub
i++
}
return chunk, nil
}