Handle SetRelayServer api call properly for own relays

This commit is contained in:
Devon Hudson 2023-01-24 13:59:50 -07:00
parent 5b73592f5a
commit f7fb7b0cba
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628

View file

@ -78,19 +78,19 @@ const (
)
type DendriteMonolith struct {
logger logrus.Logger
baseDendrite *base.BaseDendrite
PineconeRouter *pineconeRouter.Router
PineconeMulticast *pineconeMulticast.Multicast
PineconeQUIC *pineconeSessions.Sessions
PineconeManager *pineconeConnections.ConnectionManager
StorageDirectory string
CacheDirectory string
listener net.Listener
httpServer *http.Server
userAPI userapiAPI.UserInternalAPI
federationAPI api.FederationInternalAPI
relayServersQueried map[gomatrixserverlib.ServerName]bool
logger logrus.Logger
baseDendrite *base.BaseDendrite
PineconeRouter *pineconeRouter.Router
PineconeMulticast *pineconeMulticast.Multicast
PineconeQUIC *pineconeSessions.Sessions
PineconeManager *pineconeConnections.ConnectionManager
StorageDirectory string
CacheDirectory string
listener net.Listener
httpServer *http.Server
userAPI userapiAPI.UserInternalAPI
federationAPI api.FederationInternalAPI
relayRetriever RelayServerRetriever
}
func (m *DendriteMonolith) PublicKey() string {
@ -167,6 +167,29 @@ func (m *DendriteMonolith) SetStaticPeer(uri string) {
}
}
func (m *DendriteMonolith) SetRelayServer(nodeKey string, uri string) {
relays := []gomatrixserverlib.ServerName{}
for _, uri := range strings.Split(uri, ",") {
uri = strings.TrimSpace(uri)
if len(uri) == 0 {
continue
}
if userID, err := gomatrixserverlib.NewUserID(uri, false); err == nil {
relays = append(relays, userID.Domain())
} else {
relays = append(relays, gomatrixserverlib.ServerName(uri))
}
}
if nodeKey == m.PublicKey() {
logrus.Infof("Setting relay servers to: %v", relays)
m.relayRetriever.SetRelayServers(relays)
} else {
// TODO: add relay/s for other node
}
}
func (m *DendriteMonolith) DisconnectType(peertype int) {
for _, p := range m.PineconeRouter.Peers() {
if int(peertype) == p.PeerType {
@ -458,24 +481,23 @@ func (m *DendriteMonolith) Start() {
eLog := logrus.WithField("pinecone", "events")
stopRelayServerSync := make(chan bool)
relayRetriever := RelayServerRetriever{
m.relayRetriever = RelayServerRetriever{
Context: context.Background(),
ServerName: gomatrixserverlib.ServerName(m.PineconeRouter.PublicKey().String()),
FederationAPI: m.federationAPI,
relayServersQueried: make(map[gomatrixserverlib.ServerName]bool),
RelayAPI: monolith.RelayAPI,
running: *atomic.NewBool(false),
quit: stopRelayServerSync,
}
relayRetriever.InitializeRelayServers(eLog)
m.relayRetriever.InitializeRelayServers(eLog)
for event := range ch {
switch e := event.(type) {
case pineconeEvents.PeerAdded:
if !relayRetriever.running.Load() {
go relayRetriever.SyncRelayServers(stopRelayServerSync)
}
m.relayRetriever.StartSync()
case pineconeEvents.PeerRemoved:
if relayRetriever.running.Load() && m.PineconeRouter.TotalPeerCount() == 0 {
if m.relayRetriever.running.Load() && m.PineconeRouter.TotalPeerCount() == 0 {
stopRelayServerSync <- true
}
case pineconeEvents.BroadcastReceived:
@ -511,32 +533,55 @@ type RelayServerRetriever struct {
relayServersQueried map[gomatrixserverlib.ServerName]bool
queriedServersMutex sync.Mutex
running atomic.Bool
quit <-chan bool
}
func (m *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) {
request := api.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(m.ServerName)}
func (r *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) {
request := api.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(r.ServerName)}
response := api.P2PQueryRelayServersResponse{}
err := m.FederationAPI.P2PQueryRelayServers(m.Context, &request, &response)
err := r.FederationAPI.P2PQueryRelayServers(r.Context, &request, &response)
if err != nil {
eLog.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error())
}
r.queriedServersMutex.Lock()
defer r.queriedServersMutex.Unlock()
for _, server := range response.RelayServers {
m.relayServersQueried[server] = false
r.relayServersQueried[server] = false
}
eLog.Infof("Registered relay servers: %v", response.RelayServers)
}
func (m *RelayServerRetriever) SyncRelayServers(stop <-chan bool) {
defer m.running.Store(false)
func (r *RelayServerRetriever) SetRelayServers(servers []gomatrixserverlib.ServerName) {
r.queriedServersMutex.Lock()
defer r.queriedServersMutex.Unlock()
r.relayServersQueried = make(map[gomatrixserverlib.ServerName]bool)
for _, server := range servers {
// TODO : add servers to dendrite database
r.relayServersQueried[server] = false
}
r.StartSync()
}
func (r *RelayServerRetriever) StartSync() {
if !r.running.Load() {
logrus.Info("Starting relay server sync")
go r.SyncRelayServers(r.quit)
}
}
func (r *RelayServerRetriever) SyncRelayServers(stop <-chan bool) {
defer r.running.Store(false)
t := time.NewTimer(relayServerRetryInterval)
for {
relayServersToQuery := []gomatrixserverlib.ServerName{}
func() {
m.queriedServersMutex.Lock()
defer m.queriedServersMutex.Unlock()
for server, complete := range m.relayServersQueried {
r.queriedServersMutex.Lock()
defer r.queriedServersMutex.Unlock()
for server, complete := range r.relayServersQueried {
if !complete {
relayServersToQuery = append(relayServersToQuery, server)
}
@ -544,9 +589,10 @@ func (m *RelayServerRetriever) SyncRelayServers(stop <-chan bool) {
}()
if len(relayServersToQuery) == 0 {
// All relay servers have been synced.
logrus.Info("Finished syncing with all known relays")
return
}
m.queryRelayServers(relayServersToQuery)
r.queryRelayServers(relayServersToQuery)
t.Reset(relayServerRetryInterval)
select {
@ -560,30 +606,32 @@ func (m *RelayServerRetriever) SyncRelayServers(stop <-chan bool) {
}
}
func (m *RelayServerRetriever) GetQueriedServerStatus() map[gomatrixserverlib.ServerName]bool {
m.queriedServersMutex.Lock()
defer m.queriedServersMutex.Unlock()
func (r *RelayServerRetriever) GetQueriedServerStatus() map[gomatrixserverlib.ServerName]bool {
r.queriedServersMutex.Lock()
defer r.queriedServersMutex.Unlock()
result := map[gomatrixserverlib.ServerName]bool{}
for server, queried := range m.relayServersQueried {
for server, queried := range r.relayServersQueried {
result[server] = queried
}
return result
}
func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) {
logrus.Info("querying relay servers for any available transactions")
func (r *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) {
logrus.Info("Querying relay servers for any available transactions")
for _, server := range relayServers {
userID, err := gomatrixserverlib.NewUserID("@user:"+string(m.ServerName), false)
userID, err := gomatrixserverlib.NewUserID("@user:"+string(r.ServerName), false)
if err != nil {
return
}
err = m.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server)
logrus.Infof("Syncing with relay: %s", string(server))
err = r.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server)
if err == nil {
func() {
m.queriedServersMutex.Lock()
defer m.queriedServersMutex.Unlock()
m.relayServersQueried[server] = true
r.queriedServersMutex.Lock()
defer r.queriedServersMutex.Unlock()
r.relayServersQueried[server] = true
}()
// TODO : What happens if your relay receives new messages after this point?
// Should you continue to check with them, or should they try and contact you?