From 9adff8a42dd8122a699cc3530eed74825d12d704 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 3 Jul 2020 12:19:40 +0100 Subject: [PATCH] Maybe make libp2p demo work again --- clientapi/routing/directory_public.go | 69 +---------- cmd/dendrite-demo-libp2p/main.go | 27 ++-- cmd/dendrite-demo-libp2p/publicrooms.go | 156 ++++++++++++++++++++++++ currentstateserver/api/wrapper.go | 75 ++++++++++++ 4 files changed, 244 insertions(+), 83 deletions(-) create mode 100644 cmd/dendrite-demo-libp2p/publicrooms.go create mode 100644 currentstateserver/api/wrapper.go diff --git a/clientapi/routing/directory_public.go b/clientapi/routing/directory_public.go index de980792e..925c1b8aa 100644 --- a/clientapi/routing/directory_public.go +++ b/clientapi/routing/directory_public.go @@ -144,71 +144,6 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO return nil } -// due to lots of switches -// nolint:gocyclo -func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI.CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { - avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""} - nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""} - canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""} - topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""} - guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""} - visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""} - joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""} - - var stateRes currentstateAPI.QueryBulkStateContentResponse - err := stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ - RoomIDs: roomIDs, - AllowWildcards: true, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple, - {EventType: gomatrixserverlib.MRoomMember, StateKey: "*"}, - }, - }, &stateRes) - if err != nil { - util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed") - return nil, err - } - chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs)) - i := 0 - for roomID, data := range stateRes.Rooms { - pub := gomatrixserverlib.PublicRoom{ - RoomID: roomID, - } - joinCount := 0 - var joinRule, guestAccess string - for tuple, contentVal := range data { - if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" { - joinCount++ - continue - } - switch tuple { - case avatarTuple: - pub.AvatarURL = contentVal - case nameTuple: - pub.Name = contentVal - case topicTuple: - pub.Topic = contentVal - case canonicalTuple: - pub.CanonicalAlias = contentVal - case visibilityTuple: - pub.WorldReadable = contentVal == "world_readable" - // need both of these to determine whether guests can join - case joinRuleTuple: - joinRule = contentVal - case guestTuple: - guestAccess = contentVal - } - } - if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" { - pub.GuestCanJoin = true - } - pub.JoinedMembersCount = joinCount - chunk[i] = pub - i++ - } - return chunk, nil -} - // sliceInto returns a subslice of `slice` which honours the since/limit values given. // // 0 1 2 3 4 5 6 index @@ -260,9 +195,9 @@ func refreshPublicRoomCache( util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") return publicRoomsCache } - pubRooms, err := fillInRooms(ctx, queryRes.RoomIDs, stateAPI) + pubRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, stateAPI) if err != nil { - util.GetLogger(ctx).WithError(err).Error("fillInRooms failed") + util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed") return publicRoomsCache } publicRoomsCache = []gomatrixserverlib.PublicRoom{} diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 988f4aa7f..a6a968996 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -1,6 +1,4 @@ -// Copyright 2017 Vector Creations Ltd -// Copyright 2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -162,13 +160,9 @@ func main() { &base.Base, federation, rsAPI, keyRing, ) rsAPI.SetFederationSenderAPI(fsAPI) - /* TODO: - publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName) - if err != nil { - logrus.WithError(err).Panicf("failed to connect to public rooms db") - } - */ stateAPI := currentstateserver.NewInternalAPI(base.Base.Cfg, base.Base.KafkaConsumer) + provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI) + provider.Start() monolith := setup.Monolith{ Config: base.Base.Cfg, @@ -180,13 +174,14 @@ func main() { KafkaConsumer: base.Base.KafkaConsumer, KafkaProducer: base.Base.KafkaProducer, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationSenderAPI: fsAPI, - RoomserverAPI: rsAPI, - ServerKeyAPI: serverKeyAPI, - StateAPI: stateAPI, - UserAPI: userAPI, + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationSenderAPI: fsAPI, + RoomserverAPI: rsAPI, + ServerKeyAPI: serverKeyAPI, + StateAPI: stateAPI, + UserAPI: userAPI, + ExtPublicRoomsProvider: provider, } monolith.AddAllPublicRoutes(base.Base.PublicAPIMux) diff --git a/cmd/dendrite-demo-libp2p/publicrooms.go b/cmd/dendrite-demo-libp2p/publicrooms.go new file mode 100644 index 000000000..2160ddefd --- /dev/null +++ b/cmd/dendrite-demo-libp2p/publicrooms.go @@ -0,0 +1,156 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +const MaintenanceInterval = time.Second * 10 + +type discoveredRoom struct { + time time.Time + room gomatrixserverlib.PublicRoom +} + +type publicRoomsProvider struct { + pubsub *pubsub.PubSub + topic *pubsub.Topic + subscription *pubsub.Subscription + foundRooms map[string]discoveredRoom // additional rooms we have learned about from the DHT + foundRoomsMutex sync.RWMutex // protects foundRooms + maintenanceTimer *time.Timer // + roomsAdvertised atomic.Value // stores int + rsAPI roomserverAPI.RoomserverInternalAPI + stateAPI currentstateAPI.CurrentStateInternalAPI +} + +func newPublicRoomsProvider(ps *pubsub.PubSub, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI) *publicRoomsProvider { + return &publicRoomsProvider{ + foundRooms: make(map[string]discoveredRoom), + pubsub: ps, + rsAPI: rsAPI, + stateAPI: stateAPI, + } +} + +func (p *publicRoomsProvider) Start() error { + if topic, err := p.pubsub.Join("/matrix/publicRooms"); err != nil { + return err + } else if sub, err := topic.Subscribe(); err == nil { + p.topic = topic + p.subscription = sub + go p.MaintenanceTimer() + go p.FindRooms() + p.roomsAdvertised.Store(0) + } else { + return err + } + return nil +} + +func (p *publicRoomsProvider) MaintenanceTimer() { + if p.maintenanceTimer != nil && !p.maintenanceTimer.Stop() { + <-p.maintenanceTimer.C + } + p.Interval() +} + +func (p *publicRoomsProvider) Interval() { + p.foundRoomsMutex.Lock() + for k, v := range p.foundRooms { + if time.Since(v.time) > time.Minute { + delete(p.foundRooms, k) + } + } + p.foundRoomsMutex.Unlock() + if err := p.AdvertiseRooms(); err != nil { + fmt.Println("Failed to advertise room in DHT:", err) + } + p.foundRoomsMutex.RLock() + defer p.foundRoomsMutex.RUnlock() + fmt.Println("Found", len(p.foundRooms), "room(s), advertised", p.roomsAdvertised.Load(), "room(s)") + p.maintenanceTimer = time.AfterFunc(MaintenanceInterval, p.Interval) +} + +func (p *publicRoomsProvider) AdvertiseRooms() error { + ctx := context.Background() + var queryRes roomserverAPI.QueryPublishedRoomsResponse + // Query published rooms on our server. This will not invoke clientapi.ExtraPublicRoomsProvider + err := p.rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") + return err + } + ourRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.stateAPI) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed") + return err + } + advertised := 0 + for _, room := range ourRooms { + if j, err := json.Marshal(room); err == nil { + if err := p.topic.Publish(context.TODO(), j); err != nil { + fmt.Println("Failed to publish public room:", err) + } else { + advertised++ + } + } + } + + p.roomsAdvertised.Store(advertised) + return nil +} + +func (p *publicRoomsProvider) FindRooms() { + for { + msg, err := p.subscription.Next(context.Background()) + if err != nil { + continue + } + received := discoveredRoom{ + time: time.Now(), + } + if err := json.Unmarshal(msg.Data, &received.room); err != nil { + fmt.Println("Unmarshal error:", err) + continue + } + fmt.Printf("received %+v \n", received) + p.foundRoomsMutex.Lock() + p.foundRooms[received.room.RoomID] = received + p.foundRoomsMutex.Unlock() + } +} + +func (p *publicRoomsProvider) Rooms() (rooms []gomatrixserverlib.PublicRoom) { + p.foundRoomsMutex.RLock() + defer p.foundRoomsMutex.RUnlock() + for _, dr := range p.foundRooms { + rooms = append(rooms, dr.room) + } + return +} diff --git a/currentstateserver/api/wrapper.go b/currentstateserver/api/wrapper.go new file mode 100644 index 000000000..9f7486a02 --- /dev/null +++ b/currentstateserver/api/wrapper.go @@ -0,0 +1,75 @@ +package api + +import ( + "context" + + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +// PopulatePublicRooms extracts PublicRoom information for all the provided room IDs. The IDs are not checked to see if they are visible in the +// published room directory. +// due to lots of switches +// nolint:gocyclo +func PopulatePublicRooms(ctx context.Context, roomIDs []string, stateAPI CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { + avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""} + nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""} + canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""} + topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""} + guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""} + visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""} + joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""} + + var stateRes QueryBulkStateContentResponse + err := stateAPI.QueryBulkStateContent(ctx, &QueryBulkStateContentRequest{ + RoomIDs: roomIDs, + AllowWildcards: true, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple, + {EventType: gomatrixserverlib.MRoomMember, StateKey: "*"}, + }, + }, &stateRes) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed") + return nil, err + } + chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs)) + i := 0 + for roomID, data := range stateRes.Rooms { + pub := gomatrixserverlib.PublicRoom{ + RoomID: roomID, + } + joinCount := 0 + var joinRule, guestAccess string + for tuple, contentVal := range data { + if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" { + joinCount++ + continue + } + switch tuple { + case avatarTuple: + pub.AvatarURL = contentVal + case nameTuple: + pub.Name = contentVal + case topicTuple: + pub.Topic = contentVal + case canonicalTuple: + pub.CanonicalAlias = contentVal + case visibilityTuple: + pub.WorldReadable = contentVal == "world_readable" + // need both of these to determine whether guests can join + case joinRuleTuple: + joinRule = contentVal + case guestTuple: + guestAccess = contentVal + } + } + if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" { + pub.GuestCanJoin = true + } + pub.JoinedMembersCount = joinCount + chunk[i] = pub + i++ + } + return chunk, nil +}