diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 5e8e5875c..a219621b1 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -90,6 +90,7 @@ type DendriteMonolith struct { httpServer *http.Server userAPI userapiAPI.UserInternalAPI federationAPI api.FederationInternalAPI + relayAPI relayServerAPI.RelayInternalAPI relayRetriever RelayServerRetriever } @@ -313,6 +314,14 @@ func (m *DendriteMonolith) GetRelayServers(nodeID string) string { return relaysString } +func (m *DendriteMonolith) RelayingEnabled() bool { + return m.relayAPI.RelayingEnabled() +} + +func (m *DendriteMonolith) SetRelayingEnabled(enabled bool) { + m.relayAPI.SetRelayingEnabled(enabled) +} + func (m *DendriteMonolith) DisconnectType(peertype int) { for _, p := range m.PineconeRouter.Peers() { if int(peertype) == p.PeerType { @@ -528,7 +537,7 @@ func (m *DendriteMonolith) Start() { Config: &base.Cfg.FederationAPI, UserAPI: m.userAPI, } - relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer) + m.relayAPI = relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer, false) monolith := setup.Monolith{ Config: base.Cfg, @@ -541,7 +550,7 @@ func (m *DendriteMonolith) Start() { RoomserverAPI: rsAPI, UserAPI: m.userAPI, KeyAPI: keyAPI, - RelayAPI: relayAPI, + RelayAPI: m.relayAPI, ExtPublicRoomsProvider: roomProvider, ExtUserDirectoryProvider: userProvider, } diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index a813c37a2..83655b69c 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -244,7 +244,7 @@ func main() { Config: &base.Cfg.FederationAPI, UserAPI: userAPI, } - relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer) + relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer, true) monolith := setup.Monolith{ Config: base.Cfg, diff --git a/relayapi/api/api.go b/relayapi/api/api.go index 9db393225..9b4b62e58 100644 --- a/relayapi/api/api.go +++ b/relayapi/api/api.go @@ -30,6 +30,12 @@ type RelayInternalAPI interface { userID gomatrixserverlib.UserID, relayServer gomatrixserverlib.ServerName, ) error + + // Tells the relayapi whether or not it should act as a relay server for external servers. + SetRelayingEnabled(bool) + + // Obtain whether the relayapi is currently configured to act as a relay server for external servers. + RelayingEnabled() bool } // RelayServerAPI exposes the store & query transaction functionality of a relay server. diff --git a/relayapi/internal/api.go b/relayapi/internal/api.go index 3ff8c2add..3a5762fb8 100644 --- a/relayapi/internal/api.go +++ b/relayapi/internal/api.go @@ -15,6 +15,8 @@ package internal import ( + "sync" + fedAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/relayapi/storage" @@ -30,6 +32,8 @@ type RelayInternalAPI struct { producer *producers.SyncAPIProducer presenceEnabledInbound bool serverName gomatrixserverlib.ServerName + relayingEnabledMutex sync.Mutex + relayingEnabled bool } func NewRelayInternalAPI( @@ -40,6 +44,7 @@ func NewRelayInternalAPI( producer *producers.SyncAPIProducer, presenceEnabledInbound bool, serverName gomatrixserverlib.ServerName, + relayingEnabled bool, ) *RelayInternalAPI { return &RelayInternalAPI{ db: db, @@ -49,5 +54,6 @@ func NewRelayInternalAPI( producer: producer, presenceEnabledInbound: presenceEnabledInbound, serverName: serverName, + relayingEnabled: relayingEnabled, } } diff --git a/relayapi/internal/perform.go b/relayapi/internal/perform.go index 594299334..c00d0d0ed 100644 --- a/relayapi/internal/perform.go +++ b/relayapi/internal/perform.go @@ -24,6 +24,20 @@ import ( "github.com/sirupsen/logrus" ) +// SetRelayingEnabled implements api.RelayInternalAPI +func (r *RelayInternalAPI) SetRelayingEnabled(enabled bool) { + r.relayingEnabledMutex.Lock() + defer r.relayingEnabledMutex.Unlock() + r.relayingEnabled = enabled +} + +// RelayingEnabled implements api.RelayInternalAPI +func (r *RelayInternalAPI) RelayingEnabled() bool { + r.relayingEnabledMutex.Lock() + defer r.relayingEnabledMutex.Unlock() + return r.relayingEnabled +} + // PerformRelayServerSync implements api.RelayInternalAPI func (r *RelayInternalAPI) PerformRelayServerSync( ctx context.Context, diff --git a/relayapi/internal/perform_test.go b/relayapi/internal/perform_test.go index fb71b7d0e..5673b1994 100644 --- a/relayapi/internal/perform_test.go +++ b/relayapi/internal/perform_test.go @@ -72,7 +72,7 @@ func TestPerformRelayServerSync(t *testing.T) { fedClient := &testFedClient{} relayAPI := NewRelayInternalAPI( - &db, fedClient, nil, nil, nil, false, "", + &db, fedClient, nil, nil, nil, false, "", true, ) err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay")) @@ -92,7 +92,7 @@ func TestPerformRelayServerSyncFedError(t *testing.T) { fedClient := &testFedClient{shouldFail: true} relayAPI := NewRelayInternalAPI( - &db, fedClient, nil, nil, nil, false, "", + &db, fedClient, nil, nil, nil, false, "", true, ) err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay")) @@ -112,7 +112,7 @@ func TestPerformRelayServerSyncRunsUntilQueueEmpty(t *testing.T) { fedClient := &testFedClient{queueDepth: 2} relayAPI := NewRelayInternalAPI( - &db, fedClient, nil, nil, nil, false, "", + &db, fedClient, nil, nil, nil, false, "", true, ) err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay")) diff --git a/relayapi/relayapi.go b/relayapi/relayapi.go index f9f9d4ff9..200a1814a 100644 --- a/relayapi/relayapi.go +++ b/relayapi/relayapi.go @@ -54,6 +54,7 @@ func NewRelayInternalAPI( rsAPI rsAPI.RoomserverInternalAPI, keyRing *gomatrixserverlib.KeyRing, producer *producers.SyncAPIProducer, + relayingEnabled bool, ) api.RelayInternalAPI { cfg := &base.Cfg.RelayAPI @@ -70,5 +71,6 @@ func NewRelayInternalAPI( producer, base.Cfg.Global.Presence.EnableInbound, base.Cfg.Global.ServerName, + relayingEnabled, ) } diff --git a/relayapi/relayapi_test.go b/relayapi/relayapi_test.go index dfa06811d..f1b3262aa 100644 --- a/relayapi/relayapi_test.go +++ b/relayapi/relayapi_test.go @@ -36,7 +36,7 @@ func TestCreateNewRelayInternalAPI(t *testing.T) { base, close := testrig.CreateBaseDendrite(t, dbType) defer close() - relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) + relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true) assert.NotNil(t, relayAPI) }) } @@ -52,7 +52,7 @@ func TestCreateRelayInternalInvalidDatabasePanics(t *testing.T) { defer close() assert.Panics(t, func() { - relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) + relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true) }) }) } @@ -108,7 +108,7 @@ func TestCreateRelayPublicRoutes(t *testing.T) { base, close := testrig.CreateBaseDendrite(t, dbType) defer close() - relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) + relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true) assert.NotNil(t, relayAPI) serverKeyAPI := &signing.YggdrasilKeys{} @@ -116,10 +116,9 @@ func TestCreateRelayPublicRoutes(t *testing.T) { relayapi.AddPublicRoutes(base, keyRing, relayAPI) testCases := []struct { - name string - req *http.Request - wantCode int - wantJoinedRooms []string + name string + req *http.Request + wantCode int }{ { name: "relay_txn invalid user id", @@ -152,3 +151,42 @@ func TestCreateRelayPublicRoutes(t *testing.T) { } }) } + +func TestDisableRelayPublicRoutes(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + base, close := testrig.CreateBaseDendrite(t, dbType) + defer close() + + relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, false) + assert.NotNil(t, relayAPI) + + serverKeyAPI := &signing.YggdrasilKeys{} + keyRing := serverKeyAPI.KeyRing() + relayapi.AddPublicRoutes(base, keyRing, relayAPI) + + testCases := []struct { + name string + req *http.Request + wantCode int + }{ + { + name: "relay_txn valid user id", + req: createGetRelayTxnHTTPRequest(base.Cfg.Global.ServerName, "@user:local"), + wantCode: 404, + }, + { + name: "send_relay valid user id", + req: createSendRelayTxnHTTPRequest(base.Cfg.Global.ServerName, "123", "@user:local"), + wantCode: 404, + }, + } + + for _, tc := range testCases { + w := httptest.NewRecorder() + base.PublicFederationAPIMux.ServeHTTP(w, tc.req) + if w.Code != tc.wantCode { + t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode) + } + } + }) +} diff --git a/relayapi/routing/relaytxn.go b/relayapi/routing/relaytxn.go index 1b11b0ecd..76241a33b 100644 --- a/relayapi/routing/relaytxn.go +++ b/relayapi/routing/relaytxn.go @@ -31,7 +31,7 @@ type RelayTransactionResponse struct { EntriesQueued bool `json:"entries_queued"` } -// GetTransactionFromRelay implements /_matrix/federation/v1/relay_txn/{userID} +// GetTransactionFromRelay implements GET /_matrix/federation/v1/relay_txn/{userID} // This endpoint can be extracted into a separate relay server service. func GetTransactionFromRelay( httpReq *http.Request, @@ -39,7 +39,7 @@ func GetTransactionFromRelay( relayAPI api.RelayInternalAPI, userID gomatrixserverlib.UserID, ) util.JSONResponse { - logrus.Infof("Handling relay_txn for %s", userID.Raw()) + logrus.Infof("Processing relay_txn for %s", userID.Raw()) previousEntry := gomatrixserverlib.RelayEntry{} if err := json.Unmarshal(fedReq.Content(), &previousEntry); err != nil { diff --git a/relayapi/routing/relaytxn_test.go b/relayapi/routing/relaytxn_test.go index a47fdb198..3cb4c8adc 100644 --- a/relayapi/routing/relaytxn_test.go +++ b/relayapi/routing/relaytxn_test.go @@ -57,7 +57,7 @@ func TestGetEmptyDatabaseReturnsNothing(t *testing.T) { assert.NoError(t, err, "Failed to store transaction") relayAPI := internal.NewRelayInternalAPI( - &db, nil, nil, nil, nil, false, "", + &db, nil, nil, nil, nil, false, "", true, ) request := createQuery(*userID, gomatrixserverlib.RelayEntry{}) @@ -90,7 +90,7 @@ func TestGetInvalidPrevEntryFails(t *testing.T) { assert.NoError(t, err, "Failed to store transaction") relayAPI := internal.NewRelayInternalAPI( - &db, nil, nil, nil, nil, false, "", + &db, nil, nil, nil, nil, false, "", true, ) request := createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) @@ -123,7 +123,7 @@ func TestGetReturnsSavedTransaction(t *testing.T) { assert.NoError(t, err, "Failed to associate transaction with user") relayAPI := internal.NewRelayInternalAPI( - &db, nil, nil, nil, nil, false, "", + &db, nil, nil, nil, nil, false, "", true, ) request := createQuery(*userID, gomatrixserverlib.RelayEntry{}) @@ -186,7 +186,7 @@ func TestGetReturnsMultipleSavedTransactions(t *testing.T) { assert.NoError(t, err, "Failed to associate transaction with user") relayAPI := internal.NewRelayInternalAPI( - &db, nil, nil, nil, nil, false, "", + &db, nil, nil, nil, nil, false, "", true, ) request := createQuery(*userID, gomatrixserverlib.RelayEntry{}) diff --git a/relayapi/routing/routing.go b/relayapi/routing/routing.go index 6df0cdc5f..c908e950a 100644 --- a/relayapi/routing/routing.go +++ b/relayapi/routing/routing.go @@ -27,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) // Setup registers HTTP handlers with the given ServeMux. @@ -48,6 +49,14 @@ func Setup( v1fedmux.Handle("/send_relay/{txnID}/{userID}", MakeRelayAPI( "send_relay_transaction", "", cfg.Matrix.IsLocalServerName, keys, func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { + logrus.Infof("Handling send_relay from: %s", request.Origin()) + if !relayAPI.RelayingEnabled() { + logrus.Warnf("Dropping send_relay from: %s", request.Origin()) + return util.JSONResponse{ + Code: http.StatusNotFound, + } + } + userID, err := gomatrixserverlib.NewUserID(vars["userID"], false) if err != nil { return util.JSONResponse{ @@ -65,6 +74,14 @@ func Setup( v1fedmux.Handle("/relay_txn/{userID}", MakeRelayAPI( "get_relay_transaction", "", cfg.Matrix.IsLocalServerName, keys, func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { + logrus.Infof("Handling relay_txn from: %s", request.Origin()) + if !relayAPI.RelayingEnabled() { + logrus.Warnf("Dropping relay_txn from: %s", request.Origin()) + return util.JSONResponse{ + Code: http.StatusNotFound, + } + } + userID, err := gomatrixserverlib.NewUserID(vars["userID"], false) if err != nil { return util.JSONResponse{ diff --git a/relayapi/routing/sendrelay.go b/relayapi/routing/sendrelay.go index a7027f293..85a722c8a 100644 --- a/relayapi/routing/sendrelay.go +++ b/relayapi/routing/sendrelay.go @@ -25,7 +25,7 @@ import ( "github.com/sirupsen/logrus" ) -// SendTransactionToRelay implements PUT /_matrix/federation/v1/relay_txn/{txnID}/{userID} +// SendTransactionToRelay implements PUT /_matrix/federation/v1/send_relay/{txnID}/{userID} // This endpoint can be extracted into a separate relay server service. func SendTransactionToRelay( httpReq *http.Request, @@ -34,6 +34,8 @@ func SendTransactionToRelay( txnID gomatrixserverlib.TransactionID, userID gomatrixserverlib.UserID, ) util.JSONResponse { + logrus.Infof("Processing send_relay for %s", userID.Raw()) + var txnEvents struct { PDUs []json.RawMessage `json:"pdus"` EDUs []gomatrixserverlib.EDU `json:"edus"` diff --git a/relayapi/routing/sendrelay_test.go b/relayapi/routing/sendrelay_test.go index d9ed75002..66594c47c 100644 --- a/relayapi/routing/sendrelay_test.go +++ b/relayapi/routing/sendrelay_test.go @@ -72,7 +72,7 @@ func TestForwardEmptyReturnsOk(t *testing.T) { request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, txn) relayAPI := internal.NewRelayInternalAPI( - &db, nil, nil, nil, nil, false, "", + &db, nil, nil, nil, nil, false, "", true, ) response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID) @@ -101,7 +101,7 @@ func TestForwardBadJSONReturnsError(t *testing.T) { request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content) relayAPI := internal.NewRelayInternalAPI( - &db, nil, nil, nil, nil, false, "", + &db, nil, nil, nil, nil, false, "", true, ) response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID) @@ -135,7 +135,7 @@ func TestForwardTooManyPDUsReturnsError(t *testing.T) { request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content) relayAPI := internal.NewRelayInternalAPI( - &db, nil, nil, nil, nil, false, "", + &db, nil, nil, nil, nil, false, "", true, ) response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID) @@ -169,7 +169,7 @@ func TestForwardTooManyEDUsReturnsError(t *testing.T) { request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content) relayAPI := internal.NewRelayInternalAPI( - &db, nil, nil, nil, nil, false, "", + &db, nil, nil, nil, nil, false, "", true, ) response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID) @@ -192,7 +192,7 @@ func TestUniqueTransactionStoredInDatabase(t *testing.T) { request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, txn) relayAPI := internal.NewRelayInternalAPI( - &db, nil, nil, nil, nil, false, "", + &db, nil, nil, nil, nil, false, "", true, ) response := routing.SendTransactionToRelay(