Add pinecone demo toggle for dis/enabling relaying for other nodes

This commit is contained in:
Devon Hudson 2023-01-29 12:26:16 -07:00
parent 63df85db6d
commit 0f998e3af3
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
13 changed files with 119 additions and 25 deletions

View file

@ -90,6 +90,7 @@ type DendriteMonolith struct {
httpServer *http.Server httpServer *http.Server
userAPI userapiAPI.UserInternalAPI userAPI userapiAPI.UserInternalAPI
federationAPI api.FederationInternalAPI federationAPI api.FederationInternalAPI
relayAPI relayServerAPI.RelayInternalAPI
relayRetriever RelayServerRetriever relayRetriever RelayServerRetriever
} }
@ -313,6 +314,14 @@ func (m *DendriteMonolith) GetRelayServers(nodeID string) string {
return relaysString return relaysString
} }
func (m *DendriteMonolith) RelayingEnabled() bool {
return m.relayAPI.RelayingEnabled()
}
func (m *DendriteMonolith) SetRelayingEnabled(enabled bool) {
m.relayAPI.SetRelayingEnabled(enabled)
}
func (m *DendriteMonolith) DisconnectType(peertype int) { func (m *DendriteMonolith) DisconnectType(peertype int) {
for _, p := range m.PineconeRouter.Peers() { for _, p := range m.PineconeRouter.Peers() {
if int(peertype) == p.PeerType { if int(peertype) == p.PeerType {
@ -528,7 +537,7 @@ func (m *DendriteMonolith) Start() {
Config: &base.Cfg.FederationAPI, Config: &base.Cfg.FederationAPI,
UserAPI: m.userAPI, UserAPI: m.userAPI,
} }
relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer) m.relayAPI = relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer, false)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
@ -541,7 +550,7 @@ func (m *DendriteMonolith) Start() {
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
UserAPI: m.userAPI, UserAPI: m.userAPI,
KeyAPI: keyAPI, KeyAPI: keyAPI,
RelayAPI: relayAPI, RelayAPI: m.relayAPI,
ExtPublicRoomsProvider: roomProvider, ExtPublicRoomsProvider: roomProvider,
ExtUserDirectoryProvider: userProvider, ExtUserDirectoryProvider: userProvider,
} }

View file

@ -244,7 +244,7 @@ func main() {
Config: &base.Cfg.FederationAPI, Config: &base.Cfg.FederationAPI,
UserAPI: userAPI, UserAPI: userAPI,
} }
relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer) relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer, true)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,

View file

@ -30,6 +30,12 @@ type RelayInternalAPI interface {
userID gomatrixserverlib.UserID, userID gomatrixserverlib.UserID,
relayServer gomatrixserverlib.ServerName, relayServer gomatrixserverlib.ServerName,
) error ) error
// Tells the relayapi whether or not it should act as a relay server for external servers.
SetRelayingEnabled(bool)
// Obtain whether the relayapi is currently configured to act as a relay server for external servers.
RelayingEnabled() bool
} }
// RelayServerAPI exposes the store & query transaction functionality of a relay server. // RelayServerAPI exposes the store & query transaction functionality of a relay server.

View file

@ -15,6 +15,8 @@
package internal package internal
import ( import (
"sync"
fedAPI "github.com/matrix-org/dendrite/federationapi/api" fedAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/relayapi/storage" "github.com/matrix-org/dendrite/relayapi/storage"
@ -30,6 +32,8 @@ type RelayInternalAPI struct {
producer *producers.SyncAPIProducer producer *producers.SyncAPIProducer
presenceEnabledInbound bool presenceEnabledInbound bool
serverName gomatrixserverlib.ServerName serverName gomatrixserverlib.ServerName
relayingEnabledMutex sync.Mutex
relayingEnabled bool
} }
func NewRelayInternalAPI( func NewRelayInternalAPI(
@ -40,6 +44,7 @@ func NewRelayInternalAPI(
producer *producers.SyncAPIProducer, producer *producers.SyncAPIProducer,
presenceEnabledInbound bool, presenceEnabledInbound bool,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
relayingEnabled bool,
) *RelayInternalAPI { ) *RelayInternalAPI {
return &RelayInternalAPI{ return &RelayInternalAPI{
db: db, db: db,
@ -49,5 +54,6 @@ func NewRelayInternalAPI(
producer: producer, producer: producer,
presenceEnabledInbound: presenceEnabledInbound, presenceEnabledInbound: presenceEnabledInbound,
serverName: serverName, serverName: serverName,
relayingEnabled: relayingEnabled,
} }
} }

View file

@ -24,6 +24,20 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// SetRelayingEnabled implements api.RelayInternalAPI
func (r *RelayInternalAPI) SetRelayingEnabled(enabled bool) {
r.relayingEnabledMutex.Lock()
defer r.relayingEnabledMutex.Unlock()
r.relayingEnabled = enabled
}
// RelayingEnabled implements api.RelayInternalAPI
func (r *RelayInternalAPI) RelayingEnabled() bool {
r.relayingEnabledMutex.Lock()
defer r.relayingEnabledMutex.Unlock()
return r.relayingEnabled
}
// PerformRelayServerSync implements api.RelayInternalAPI // PerformRelayServerSync implements api.RelayInternalAPI
func (r *RelayInternalAPI) PerformRelayServerSync( func (r *RelayInternalAPI) PerformRelayServerSync(
ctx context.Context, ctx context.Context,

View file

@ -72,7 +72,7 @@ func TestPerformRelayServerSync(t *testing.T) {
fedClient := &testFedClient{} fedClient := &testFedClient{}
relayAPI := NewRelayInternalAPI( relayAPI := NewRelayInternalAPI(
&db, fedClient, nil, nil, nil, false, "", &db, fedClient, nil, nil, nil, false, "", true,
) )
err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay")) err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay"))
@ -92,7 +92,7 @@ func TestPerformRelayServerSyncFedError(t *testing.T) {
fedClient := &testFedClient{shouldFail: true} fedClient := &testFedClient{shouldFail: true}
relayAPI := NewRelayInternalAPI( relayAPI := NewRelayInternalAPI(
&db, fedClient, nil, nil, nil, false, "", &db, fedClient, nil, nil, nil, false, "", true,
) )
err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay")) err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay"))
@ -112,7 +112,7 @@ func TestPerformRelayServerSyncRunsUntilQueueEmpty(t *testing.T) {
fedClient := &testFedClient{queueDepth: 2} fedClient := &testFedClient{queueDepth: 2}
relayAPI := NewRelayInternalAPI( relayAPI := NewRelayInternalAPI(
&db, fedClient, nil, nil, nil, false, "", &db, fedClient, nil, nil, nil, false, "", true,
) )
err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay")) err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay"))

View file

@ -54,6 +54,7 @@ func NewRelayInternalAPI(
rsAPI rsAPI.RoomserverInternalAPI, rsAPI rsAPI.RoomserverInternalAPI,
keyRing *gomatrixserverlib.KeyRing, keyRing *gomatrixserverlib.KeyRing,
producer *producers.SyncAPIProducer, producer *producers.SyncAPIProducer,
relayingEnabled bool,
) api.RelayInternalAPI { ) api.RelayInternalAPI {
cfg := &base.Cfg.RelayAPI cfg := &base.Cfg.RelayAPI
@ -70,5 +71,6 @@ func NewRelayInternalAPI(
producer, producer,
base.Cfg.Global.Presence.EnableInbound, base.Cfg.Global.Presence.EnableInbound,
base.Cfg.Global.ServerName, base.Cfg.Global.ServerName,
relayingEnabled,
) )
} }

View file

@ -36,7 +36,7 @@ func TestCreateNewRelayInternalAPI(t *testing.T) {
base, close := testrig.CreateBaseDendrite(t, dbType) base, close := testrig.CreateBaseDendrite(t, dbType)
defer close() defer close()
relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true)
assert.NotNil(t, relayAPI) assert.NotNil(t, relayAPI)
}) })
} }
@ -52,7 +52,7 @@ func TestCreateRelayInternalInvalidDatabasePanics(t *testing.T) {
defer close() defer close()
assert.Panics(t, func() { assert.Panics(t, func() {
relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true)
}) })
}) })
} }
@ -108,7 +108,7 @@ func TestCreateRelayPublicRoutes(t *testing.T) {
base, close := testrig.CreateBaseDendrite(t, dbType) base, close := testrig.CreateBaseDendrite(t, dbType)
defer close() defer close()
relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true)
assert.NotNil(t, relayAPI) assert.NotNil(t, relayAPI)
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
@ -116,10 +116,9 @@ func TestCreateRelayPublicRoutes(t *testing.T) {
relayapi.AddPublicRoutes(base, keyRing, relayAPI) relayapi.AddPublicRoutes(base, keyRing, relayAPI)
testCases := []struct { testCases := []struct {
name string name string
req *http.Request req *http.Request
wantCode int wantCode int
wantJoinedRooms []string
}{ }{
{ {
name: "relay_txn invalid user id", name: "relay_txn invalid user id",
@ -152,3 +151,42 @@ func TestCreateRelayPublicRoutes(t *testing.T) {
} }
}) })
} }
func TestDisableRelayPublicRoutes(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, false)
assert.NotNil(t, relayAPI)
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
relayapi.AddPublicRoutes(base, keyRing, relayAPI)
testCases := []struct {
name string
req *http.Request
wantCode int
}{
{
name: "relay_txn valid user id",
req: createGetRelayTxnHTTPRequest(base.Cfg.Global.ServerName, "@user:local"),
wantCode: 404,
},
{
name: "send_relay valid user id",
req: createSendRelayTxnHTTPRequest(base.Cfg.Global.ServerName, "123", "@user:local"),
wantCode: 404,
},
}
for _, tc := range testCases {
w := httptest.NewRecorder()
base.PublicFederationAPIMux.ServeHTTP(w, tc.req)
if w.Code != tc.wantCode {
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
}
}
})
}

View file

@ -31,7 +31,7 @@ type RelayTransactionResponse struct {
EntriesQueued bool `json:"entries_queued"` EntriesQueued bool `json:"entries_queued"`
} }
// GetTransactionFromRelay implements /_matrix/federation/v1/relay_txn/{userID} // GetTransactionFromRelay implements GET /_matrix/federation/v1/relay_txn/{userID}
// This endpoint can be extracted into a separate relay server service. // This endpoint can be extracted into a separate relay server service.
func GetTransactionFromRelay( func GetTransactionFromRelay(
httpReq *http.Request, httpReq *http.Request,
@ -39,7 +39,7 @@ func GetTransactionFromRelay(
relayAPI api.RelayInternalAPI, relayAPI api.RelayInternalAPI,
userID gomatrixserverlib.UserID, userID gomatrixserverlib.UserID,
) util.JSONResponse { ) util.JSONResponse {
logrus.Infof("Handling relay_txn for %s", userID.Raw()) logrus.Infof("Processing relay_txn for %s", userID.Raw())
previousEntry := gomatrixserverlib.RelayEntry{} previousEntry := gomatrixserverlib.RelayEntry{}
if err := json.Unmarshal(fedReq.Content(), &previousEntry); err != nil { if err := json.Unmarshal(fedReq.Content(), &previousEntry); err != nil {

View file

@ -57,7 +57,7 @@ func TestGetEmptyDatabaseReturnsNothing(t *testing.T) {
assert.NoError(t, err, "Failed to store transaction") assert.NoError(t, err, "Failed to store transaction")
relayAPI := internal.NewRelayInternalAPI( relayAPI := internal.NewRelayInternalAPI(
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "", true,
) )
request := createQuery(*userID, gomatrixserverlib.RelayEntry{}) request := createQuery(*userID, gomatrixserverlib.RelayEntry{})
@ -90,7 +90,7 @@ func TestGetInvalidPrevEntryFails(t *testing.T) {
assert.NoError(t, err, "Failed to store transaction") assert.NoError(t, err, "Failed to store transaction")
relayAPI := internal.NewRelayInternalAPI( relayAPI := internal.NewRelayInternalAPI(
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "", true,
) )
request := createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) request := createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1})
@ -123,7 +123,7 @@ func TestGetReturnsSavedTransaction(t *testing.T) {
assert.NoError(t, err, "Failed to associate transaction with user") assert.NoError(t, err, "Failed to associate transaction with user")
relayAPI := internal.NewRelayInternalAPI( relayAPI := internal.NewRelayInternalAPI(
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "", true,
) )
request := createQuery(*userID, gomatrixserverlib.RelayEntry{}) request := createQuery(*userID, gomatrixserverlib.RelayEntry{})
@ -186,7 +186,7 @@ func TestGetReturnsMultipleSavedTransactions(t *testing.T) {
assert.NoError(t, err, "Failed to associate transaction with user") assert.NoError(t, err, "Failed to associate transaction with user")
relayAPI := internal.NewRelayInternalAPI( relayAPI := internal.NewRelayInternalAPI(
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "", true,
) )
request := createQuery(*userID, gomatrixserverlib.RelayEntry{}) request := createQuery(*userID, gomatrixserverlib.RelayEntry{})

View file

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
// Setup registers HTTP handlers with the given ServeMux. // Setup registers HTTP handlers with the given ServeMux.
@ -48,6 +49,14 @@ func Setup(
v1fedmux.Handle("/send_relay/{txnID}/{userID}", MakeRelayAPI( v1fedmux.Handle("/send_relay/{txnID}/{userID}", MakeRelayAPI(
"send_relay_transaction", "", cfg.Matrix.IsLocalServerName, keys, "send_relay_transaction", "", cfg.Matrix.IsLocalServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
logrus.Infof("Handling send_relay from: %s", request.Origin())
if !relayAPI.RelayingEnabled() {
logrus.Warnf("Dropping send_relay from: %s", request.Origin())
return util.JSONResponse{
Code: http.StatusNotFound,
}
}
userID, err := gomatrixserverlib.NewUserID(vars["userID"], false) userID, err := gomatrixserverlib.NewUserID(vars["userID"], false)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
@ -65,6 +74,14 @@ func Setup(
v1fedmux.Handle("/relay_txn/{userID}", MakeRelayAPI( v1fedmux.Handle("/relay_txn/{userID}", MakeRelayAPI(
"get_relay_transaction", "", cfg.Matrix.IsLocalServerName, keys, "get_relay_transaction", "", cfg.Matrix.IsLocalServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
logrus.Infof("Handling relay_txn from: %s", request.Origin())
if !relayAPI.RelayingEnabled() {
logrus.Warnf("Dropping relay_txn from: %s", request.Origin())
return util.JSONResponse{
Code: http.StatusNotFound,
}
}
userID, err := gomatrixserverlib.NewUserID(vars["userID"], false) userID, err := gomatrixserverlib.NewUserID(vars["userID"], false)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{

View file

@ -25,7 +25,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// SendTransactionToRelay implements PUT /_matrix/federation/v1/relay_txn/{txnID}/{userID} // SendTransactionToRelay implements PUT /_matrix/federation/v1/send_relay/{txnID}/{userID}
// This endpoint can be extracted into a separate relay server service. // This endpoint can be extracted into a separate relay server service.
func SendTransactionToRelay( func SendTransactionToRelay(
httpReq *http.Request, httpReq *http.Request,
@ -34,6 +34,8 @@ func SendTransactionToRelay(
txnID gomatrixserverlib.TransactionID, txnID gomatrixserverlib.TransactionID,
userID gomatrixserverlib.UserID, userID gomatrixserverlib.UserID,
) util.JSONResponse { ) util.JSONResponse {
logrus.Infof("Processing send_relay for %s", userID.Raw())
var txnEvents struct { var txnEvents struct {
PDUs []json.RawMessage `json:"pdus"` PDUs []json.RawMessage `json:"pdus"`
EDUs []gomatrixserverlib.EDU `json:"edus"` EDUs []gomatrixserverlib.EDU `json:"edus"`

View file

@ -72,7 +72,7 @@ func TestForwardEmptyReturnsOk(t *testing.T) {
request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, txn) request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, txn)
relayAPI := internal.NewRelayInternalAPI( relayAPI := internal.NewRelayInternalAPI(
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "", true,
) )
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID) response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
@ -101,7 +101,7 @@ func TestForwardBadJSONReturnsError(t *testing.T) {
request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content) request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content)
relayAPI := internal.NewRelayInternalAPI( relayAPI := internal.NewRelayInternalAPI(
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "", true,
) )
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID) response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
@ -135,7 +135,7 @@ func TestForwardTooManyPDUsReturnsError(t *testing.T) {
request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content) request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content)
relayAPI := internal.NewRelayInternalAPI( relayAPI := internal.NewRelayInternalAPI(
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "", true,
) )
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID) response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
@ -169,7 +169,7 @@ func TestForwardTooManyEDUsReturnsError(t *testing.T) {
request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content) request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content)
relayAPI := internal.NewRelayInternalAPI( relayAPI := internal.NewRelayInternalAPI(
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "", true,
) )
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID) response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
@ -192,7 +192,7 @@ func TestUniqueTransactionStoredInDatabase(t *testing.T) {
request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, txn) request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, txn)
relayAPI := internal.NewRelayInternalAPI( relayAPI := internal.NewRelayInternalAPI(
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "", true,
) )
response := routing.SendTransactionToRelay( response := routing.SendTransactionToRelay(