diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index dc14b3ef7..d0b465ce9 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -581,12 +581,7 @@ func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverli if err != nil { return } - request := relayServerAPI.PerformRelayServerSyncRequest{ - UserID: *userID, - RelayServer: server, - } - response := relayServerAPI.PerformRelayServerSyncResponse{} - err = m.RelayAPI.PerformRelayServerSync(context.Background(), &request, &response) + err = m.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server) if err == nil { func() { m.queriedServersMutex.Lock() diff --git a/build/gobind-pinecone/monolith_test.go b/build/gobind-pinecone/monolith_test.go index f2796d2a1..c7805fa6a 100644 --- a/build/gobind-pinecone/monolith_test.go +++ b/build/gobind-pinecone/monolith_test.go @@ -142,7 +142,7 @@ type FakeRelayAPI struct { relayServerAPI.RelayInternalAPI } -func (r *FakeRelayAPI) PerformRelayServerSync(ctx context.Context, req *relayServerAPI.PerformRelayServerSyncRequest, res *relayServerAPI.PerformRelayServerSyncResponse) error { +func (r *FakeRelayAPI) PerformRelayServerSync(ctx context.Context, userID gomatrixserverlib.UserID, relayServer gomatrixserverlib.ServerName) error { return nil } diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index e9b0f7360..a813c37a2 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -428,12 +428,7 @@ func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverli if err != nil { return } - request := relayServerAPI.PerformRelayServerSyncRequest{ - UserID: *userID, - RelayServer: server, - } - response := relayServerAPI.PerformRelayServerSyncResponse{} - err = m.RelayAPI.PerformRelayServerSync(context.Background(), &request, &response) + err = m.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server) if err == nil { m.RelayServersQueried[server] = true // TODO : What happens if your relay receives new messages after this point? diff --git a/relayapi/api/api.go b/relayapi/api/api.go index e58c78026..9db393225 100644 --- a/relayapi/api/api.go +++ b/relayapi/api/api.go @@ -24,56 +24,29 @@ import ( type RelayInternalAPI interface { RelayServerAPI + // Retrieve from external relay server all transactions stored for us and process them. PerformRelayServerSync( ctx context.Context, - request *PerformRelayServerSyncRequest, - response *PerformRelayServerSyncResponse, + userID gomatrixserverlib.UserID, + relayServer gomatrixserverlib.ServerName, ) error } +// RelayServerAPI exposes the store & query transaction functionality of a relay server. type RelayServerAPI interface { // Store transactions for forwarding to the destination at a later time. PerformStoreTransaction( ctx context.Context, - request *PerformStoreTransactionRequest, - response *PerformStoreTransactionResponse, + transaction gomatrixserverlib.Transaction, + userID gomatrixserverlib.UserID, ) error // Obtain the oldest stored transaction for the specified userID. QueryTransactions( ctx context.Context, - request *QueryRelayTransactionsRequest, - response *QueryRelayTransactionsResponse, - ) error -} - -type PerformRelayServerSyncRequest struct { - UserID gomatrixserverlib.UserID `json:"user_id"` - RelayServer gomatrixserverlib.ServerName `json:"relay_server"` -} - -type PerformRelayServerSyncResponse struct { -} - -type QueryRelayServersRequest struct { - Server gomatrixserverlib.ServerName -} - -type QueryRelayServersResponse struct { - RelayServers []gomatrixserverlib.ServerName -} - -type PerformStoreTransactionRequest struct { - Txn gomatrixserverlib.Transaction `json:"transaction"` - UserID gomatrixserverlib.UserID `json:"user_id"` -} - -type PerformStoreTransactionResponse struct { -} - -type QueryRelayTransactionsRequest struct { - UserID gomatrixserverlib.UserID `json:"user_id"` - PreviousEntry gomatrixserverlib.RelayEntry `json:"prev_entry,omitempty"` + userID gomatrixserverlib.UserID, + previousEntry gomatrixserverlib.RelayEntry, + ) (QueryRelayTransactionsResponse, error) } type QueryRelayTransactionsResponse struct { diff --git a/relayapi/internal/perform.go b/relayapi/internal/perform.go index d5999abea..f526f0e90 100644 --- a/relayapi/internal/perform.go +++ b/relayapi/internal/perform.go @@ -27,11 +27,11 @@ import ( // PerformRelayServerSync implements api.RelayInternalAPI func (r *RelayInternalAPI) PerformRelayServerSync( ctx context.Context, - request *api.PerformRelayServerSyncRequest, - response *api.PerformRelayServerSyncResponse, + userID gomatrixserverlib.UserID, + relayServer gomatrixserverlib.ServerName, ) error { prevEntry := gomatrixserverlib.RelayEntry{EntryID: -1} - asyncResponse, err := r.fedClient.P2PGetTransactionFromRelay(ctx, request.UserID, prevEntry, request.RelayServer) + asyncResponse, err := r.fedClient.P2PGetTransactionFromRelay(ctx, userID, prevEntry, relayServer) if err != nil { logrus.Errorf("P2PGetTransactionFromRelay: %s", err.Error()) return err @@ -40,7 +40,7 @@ func (r *RelayInternalAPI) PerformRelayServerSync( for asyncResponse.EntriesQueued { logrus.Infof("Retrieving next entry from relay, previous: %v", prevEntry) - asyncResponse, err = r.fedClient.P2PGetTransactionFromRelay(ctx, request.UserID, prevEntry, request.RelayServer) + asyncResponse, err = r.fedClient.P2PGetTransactionFromRelay(ctx, userID, prevEntry, relayServer) prevEntry = gomatrixserverlib.RelayEntry{EntryID: asyncResponse.EntryID} if err != nil { logrus.Errorf("P2PGetTransactionFromRelay: %s", err.Error()) @@ -55,11 +55,11 @@ func (r *RelayInternalAPI) PerformRelayServerSync( // PerformStoreTransaction implements api.RelayInternalAPI func (r *RelayInternalAPI) PerformStoreTransaction( ctx context.Context, - request *api.PerformStoreTransactionRequest, - response *api.PerformStoreTransactionResponse, + transaction gomatrixserverlib.Transaction, + userID gomatrixserverlib.UserID, ) error { - logrus.Warnf("Storing transaction for %v", request.UserID) - receipt, err := r.db.StoreTransaction(ctx, request.Txn) + logrus.Warnf("Storing transaction for %v", userID) + receipt, err := r.db.StoreTransaction(ctx, transaction) if err != nil { logrus.Errorf("db.StoreTransaction: %s", err.Error()) return err @@ -67,9 +67,9 @@ func (r *RelayInternalAPI) PerformStoreTransaction( err = r.db.AssociateTransactionWithDestinations( ctx, map[gomatrixserverlib.UserID]struct{}{ - request.UserID: {}, + userID: {}, }, - request.Txn.TransactionID, + transaction.TransactionID, receipt) return err @@ -78,41 +78,42 @@ func (r *RelayInternalAPI) PerformStoreTransaction( // QueryTransactions implements api.RelayInternalAPI func (r *RelayInternalAPI) QueryTransactions( ctx context.Context, - request *api.QueryRelayTransactionsRequest, - response *api.QueryRelayTransactionsResponse, -) error { - logrus.Infof("QueryTransactions for %s", request.UserID.Raw()) - if request.PreviousEntry.EntryID >= 0 { + userID gomatrixserverlib.UserID, + previousEntry gomatrixserverlib.RelayEntry, +) (api.QueryRelayTransactionsResponse, error) { + logrus.Infof("QueryTransactions for %s", userID.Raw()) + if previousEntry.EntryID >= 0 { logrus.Infof("Cleaning previous entry (%v) from db for %s", - request.PreviousEntry.EntryID, - request.UserID.Raw(), + previousEntry.EntryID, + userID.Raw(), ) - prevReceipt := receipt.NewReceipt(request.PreviousEntry.EntryID) - err := r.db.CleanTransactions(ctx, request.UserID, []*receipt.Receipt{&prevReceipt}) + prevReceipt := receipt.NewReceipt(previousEntry.EntryID) + err := r.db.CleanTransactions(ctx, userID, []*receipt.Receipt{&prevReceipt}) if err != nil { logrus.Errorf("db.CleanTransactions: %s", err.Error()) - return err + return api.QueryRelayTransactionsResponse{}, err } } - transaction, receipt, err := r.db.GetTransaction(ctx, request.UserID) + transaction, receipt, err := r.db.GetTransaction(ctx, userID) if err != nil { logrus.Errorf("db.GetTransaction: %s", err.Error()) - return err + return api.QueryRelayTransactionsResponse{}, err } + response := api.QueryRelayTransactionsResponse{} if transaction != nil && receipt != nil { - logrus.Infof("Obtained transaction (%v) for %s", transaction.TransactionID, request.UserID.Raw()) + logrus.Infof("Obtained transaction (%v) for %s", transaction.TransactionID, userID.Raw()) response.Transaction = *transaction response.EntryID = receipt.GetNID() response.EntriesQueued = true } else { - logrus.Infof("No more entries in the queue for %s", request.UserID.Raw()) + logrus.Infof("No more entries in the queue for %s", userID.Raw()) response.EntryID = -1 response.EntriesQueued = false } - return nil + return response, nil } func (r *RelayInternalAPI) processTransaction(txn *gomatrixserverlib.Transaction) { diff --git a/relayapi/internal/perform_test.go b/relayapi/internal/perform_test.go index c2f9dd1c7..84bf143b9 100644 --- a/relayapi/internal/perform_test.go +++ b/relayapi/internal/perform_test.go @@ -21,7 +21,6 @@ import ( fedAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/relayapi/api" "github.com/matrix-org/dendrite/relayapi/storage/shared" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/gomatrixserverlib" @@ -76,12 +75,7 @@ func TestPerformRelayServerSync(t *testing.T) { &db, fedClient, nil, nil, nil, false, "", ) - req := api.PerformRelayServerSyncRequest{ - UserID: *userID, - RelayServer: gomatrixserverlib.ServerName("relay"), - } - res := api.PerformRelayServerSyncResponse{} - err = relayAPI.PerformRelayServerSync(context.Background(), &req, &res) + err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay")) assert.NoError(t, err) } @@ -101,12 +95,7 @@ func TestPerformRelayServerSyncFedError(t *testing.T) { &db, fedClient, nil, nil, nil, false, "", ) - req := api.PerformRelayServerSyncRequest{ - UserID: *userID, - RelayServer: gomatrixserverlib.ServerName("relay"), - } - res := api.PerformRelayServerSyncResponse{} - err = relayAPI.PerformRelayServerSync(context.Background(), &req, &res) + err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay")) assert.Error(t, err) } @@ -126,12 +115,7 @@ func TestPerformRelayServerSyncRunsUntilQueueEmpty(t *testing.T) { &db, fedClient, nil, nil, nil, false, "", ) - req := api.PerformRelayServerSyncRequest{ - UserID: *userID, - RelayServer: gomatrixserverlib.ServerName("relay"), - } - res := api.PerformRelayServerSyncResponse{} - err = relayAPI.PerformRelayServerSync(context.Background(), &req, &res) + err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay")) assert.NoError(t, err) assert.Equal(t, uint(3), fedClient.queryCount) } diff --git a/relayapi/inthttp/client.go b/relayapi/inthttp/client.go deleted file mode 100644 index 6866686cf..000000000 --- a/relayapi/inthttp/client.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package inthttp - -import ( - "context" - "errors" - "net/http" - - "github.com/matrix-org/dendrite/internal/caching" - "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/dendrite/relayapi/api" -) - -// HTTP paths for the internal HTTP API -const ( - RelayAPIPerformRelayServerSyncPath = "/relayapi/performRelayServerSync" - RelayAPIPerformStoreTransactionPath = "/relayapi/performStoreTransaction" - RelayAPIQueryTransactionsPath = "/relayapi/queryTransactions" -) - -// NewRelayAPIClient creates a RelayInternalAPI implemented by talking to a HTTP POST API. -// If httpClient is nil an error is returned -func NewRelayAPIClient( - relayapiURL string, - httpClient *http.Client, - cache caching.ServerKeyCache, -) (api.RelayInternalAPI, error) { - if httpClient == nil { - return nil, errors.New("NewRelayInternalAPIHTTP: httpClient is ") - } - return &httpRelayInternalAPI{ - relayAPIURL: relayapiURL, - httpClient: httpClient, - cache: cache, - }, nil -} - -type httpRelayInternalAPI struct { - relayAPIURL string - httpClient *http.Client - cache caching.ServerKeyCache -} - -func (h *httpRelayInternalAPI) PerformRelayServerSync( - ctx context.Context, - request *api.PerformRelayServerSyncRequest, - response *api.PerformRelayServerSyncResponse, -) error { - return httputil.CallInternalRPCAPI( - "PerformRelayServerSync", h.relayAPIURL+RelayAPIPerformRelayServerSyncPath, - h.httpClient, ctx, request, response, - ) -} - -func (h *httpRelayInternalAPI) PerformStoreTransaction( - ctx context.Context, - request *api.PerformStoreTransactionRequest, - response *api.PerformStoreTransactionResponse, -) error { - return httputil.CallInternalRPCAPI( - "PerformStoreTransaction", h.relayAPIURL+RelayAPIPerformStoreTransactionPath, - h.httpClient, ctx, request, response, - ) -} - -func (h *httpRelayInternalAPI) QueryTransactions( - ctx context.Context, - request *api.QueryRelayTransactionsRequest, - response *api.QueryRelayTransactionsResponse, -) error { - return httputil.CallInternalRPCAPI( - "QueryTransactions", h.relayAPIURL+RelayAPIQueryTransactionsPath, - h.httpClient, ctx, request, response, - ) -} diff --git a/relayapi/inthttp/client_test.go b/relayapi/inthttp/client_test.go deleted file mode 100644 index cea792a3c..000000000 --- a/relayapi/inthttp/client_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package inthttp - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/matrix-org/dendrite/relayapi/api" - "github.com/stretchr/testify/assert" -) - -func TestRelayAPIClientNil(t *testing.T) { - _, err := NewRelayAPIClient("", nil, nil) - assert.Error(t, err) -} - -func TestRelayAPIClientPerformSync(t *testing.T) { - // Start a local HTTP server - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - assert.Equal(t, "/api"+RelayAPIPerformRelayServerSyncPath, req.URL.String()) - })) - defer server.Close() - - cl, err := NewRelayAPIClient(server.URL, server.Client(), nil) - assert.NoError(t, err) - assert.NotNil(t, cl) - - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - cl.PerformRelayServerSync(ctx, &api.PerformRelayServerSyncRequest{}, &api.PerformRelayServerSyncResponse{}) -} - -func TestRelayAPIClientStore(t *testing.T) { - // Start a local HTTP server - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - assert.Equal(t, "/api"+RelayAPIPerformStoreTransactionPath, req.URL.String()) - })) - defer server.Close() - - cl, err := NewRelayAPIClient(server.URL, server.Client(), nil) - assert.NoError(t, err) - assert.NotNil(t, cl) - - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - cl.PerformStoreTransaction(ctx, &api.PerformStoreTransactionRequest{}, &api.PerformStoreTransactionResponse{}) -} - -func TestRelayAPIClientQuery(t *testing.T) { - // Start a local HTTP server - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - assert.Equal(t, "/api"+RelayAPIQueryTransactionsPath, req.URL.String()) - })) - defer server.Close() - - cl, err := NewRelayAPIClient(server.URL, server.Client(), nil) - assert.NoError(t, err) - assert.NotNil(t, cl) - - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - cl.QueryTransactions(ctx, &api.QueryRelayTransactionsRequest{}, &api.QueryRelayTransactionsResponse{}) -} diff --git a/relayapi/inthttp/server.go b/relayapi/inthttp/server.go deleted file mode 100644 index 0385fa51b..000000000 --- a/relayapi/inthttp/server.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package inthttp - -import ( - "github.com/gorilla/mux" - - "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/dendrite/relayapi/api" -) - -// AddRoutes adds the RelayInternalAPI handlers to the http.ServeMux. -// nolint:gocyclo -func AddRoutes(intAPI api.RelayInternalAPI, internalAPIMux *mux.Router, enableMetrics bool) { - internalAPIMux.Handle( - RelayAPIPerformRelayServerSyncPath, - httputil.MakeInternalRPCAPI( - "RelayAPIPerformRelayServerSync", - enableMetrics, - intAPI.PerformRelayServerSync, - ), - ) - - internalAPIMux.Handle( - RelayAPIPerformStoreTransactionPath, - httputil.MakeInternalRPCAPI( - "RelayAPIPerformStoreTransaction", - enableMetrics, - intAPI.PerformStoreTransaction, - ), - ) - - internalAPIMux.Handle( - RelayAPIQueryTransactionsPath, - httputil.MakeInternalRPCAPI( - "RelayAPIQueryTransactions", - enableMetrics, - intAPI.QueryTransactions, - ), - ) -} diff --git a/relayapi/relayapi.go b/relayapi/relayapi.go index df282294f..f9f9d4ff9 100644 --- a/relayapi/relayapi.go +++ b/relayapi/relayapi.go @@ -15,11 +15,9 @@ package relayapi import ( - "github.com/gorilla/mux" "github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/relayapi/api" "github.com/matrix-org/dendrite/relayapi/internal" - "github.com/matrix-org/dendrite/relayapi/inthttp" "github.com/matrix-org/dendrite/relayapi/routing" "github.com/matrix-org/dendrite/relayapi/storage" rsAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -28,12 +26,6 @@ import ( "github.com/sirupsen/logrus" ) -// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions -// on the given input API. -func AddInternalRoutes(router *mux.Router, intAPI api.RelayInternalAPI, enableMetrics bool) { - inthttp.AddRoutes(intAPI, router, enableMetrics) -} - // AddPublicRoutes sets up and registers HTTP handlers on the base API muxes for the FederationAPI component. func AddPublicRoutes( base *base.BaseDendrite, diff --git a/relayapi/relayapi_test.go b/relayapi/relayapi_test.go index 85966f441..e43c4b4a8 100644 --- a/relayapi/relayapi_test.go +++ b/relayapi/relayapi_test.go @@ -18,7 +18,6 @@ import ( "testing" "github.com/matrix-org/dendrite/relayapi" - "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" "github.com/stretchr/testify/assert" @@ -50,17 +49,6 @@ func TestCreateRelayInternalInvalidDatabasePanics(t *testing.T) { }) } -func TestCreateRelayInternalRoutes(t *testing.T) { - base, close := testrig.CreateBaseDendrite(t, test.DBTypeSQLite) - base.Cfg.RelayAPI.InternalAPI.Connect = config.HTTPAddress("http://localhost:8008") - defer close() - - relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil) - assert.NotNil(t, relayAPI) - - relayapi.AddInternalRoutes(base.InternalAPIMux, relayAPI, false) -} - func TestCreateInvalidRelayPublicRoutesPanics(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { base, close := testrig.CreateBaseDendrite(t, dbType) diff --git a/relayapi/routing/relaytxn.go b/relayapi/routing/relaytxn.go index bf8afbf4e..bdbd87d74 100644 --- a/relayapi/routing/relaytxn.go +++ b/relayapi/routing/relaytxn.go @@ -40,25 +40,12 @@ func GetTransactionFromRelay( ) util.JSONResponse { logrus.Infof("Handling relay_txn for %s", userID.Raw()) - entryProvided := false - var previousEntry gomatrixserverlib.RelayEntry + previousEntry := gomatrixserverlib.RelayEntry{EntryID: -1} if err := json.Unmarshal(fedReq.Content(), &previousEntry); err == nil { logrus.Infof("Previous entry provided: %v", previousEntry.EntryID) - entryProvided = true } - request := api.QueryRelayTransactionsRequest{ - UserID: userID, - PreviousEntry: gomatrixserverlib.RelayEntry{EntryID: -1}, - } - if entryProvided { - request.PreviousEntry = previousEntry - } - var response api.QueryRelayTransactionsResponse - err := relayAPI.QueryTransactions( - httpReq.Context(), - &request, - &response) + response, err := relayAPI.QueryTransactions(httpReq.Context(), userID, previousEntry) if err != nil { return util.JSONResponse{ Code: http.StatusInternalServerError, diff --git a/relayapi/routing/sendrelay.go b/relayapi/routing/sendrelay.go index 2ea004f27..a7027f293 100644 --- a/relayapi/routing/sendrelay.go +++ b/relayapi/routing/sendrelay.go @@ -65,12 +65,7 @@ func SendTransactionToRelay( util.GetLogger(httpReq.Context()).Warnf("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, fedReq.Origin(), len(t.PDUs), len(t.EDUs)) - req := api.PerformStoreTransactionRequest{ - Txn: t, - UserID: userID, - } - res := api.PerformStoreTransactionResponse{} - err := relayAPI.PerformStoreTransaction(httpReq.Context(), &req, &res) + err := relayAPI.PerformStoreTransaction(httpReq.Context(), t, userID) if err != nil { return util.JSONResponse{ Code: http.StatusInternalServerError,