Refactor relayapi to remove unnecessary internal http scaffolding

This commit is contained in:
Devon Hudson 2023-01-17 12:41:58 -07:00
parent 8090f20c20
commit 4037ee3b21
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
13 changed files with 44 additions and 340 deletions

View file

@ -581,12 +581,7 @@ func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverli
if err != nil {
return
}
request := relayServerAPI.PerformRelayServerSyncRequest{
UserID: *userID,
RelayServer: server,
}
response := relayServerAPI.PerformRelayServerSyncResponse{}
err = m.RelayAPI.PerformRelayServerSync(context.Background(), &request, &response)
err = m.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server)
if err == nil {
func() {
m.queriedServersMutex.Lock()

View file

@ -142,7 +142,7 @@ type FakeRelayAPI struct {
relayServerAPI.RelayInternalAPI
}
func (r *FakeRelayAPI) PerformRelayServerSync(ctx context.Context, req *relayServerAPI.PerformRelayServerSyncRequest, res *relayServerAPI.PerformRelayServerSyncResponse) error {
func (r *FakeRelayAPI) PerformRelayServerSync(ctx context.Context, userID gomatrixserverlib.UserID, relayServer gomatrixserverlib.ServerName) error {
return nil
}

View file

@ -428,12 +428,7 @@ func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverli
if err != nil {
return
}
request := relayServerAPI.PerformRelayServerSyncRequest{
UserID: *userID,
RelayServer: server,
}
response := relayServerAPI.PerformRelayServerSyncResponse{}
err = m.RelayAPI.PerformRelayServerSync(context.Background(), &request, &response)
err = m.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server)
if err == nil {
m.RelayServersQueried[server] = true
// TODO : What happens if your relay receives new messages after this point?

View file

@ -24,56 +24,29 @@ import (
type RelayInternalAPI interface {
RelayServerAPI
// Retrieve from external relay server all transactions stored for us and process them.
PerformRelayServerSync(
ctx context.Context,
request *PerformRelayServerSyncRequest,
response *PerformRelayServerSyncResponse,
userID gomatrixserverlib.UserID,
relayServer gomatrixserverlib.ServerName,
) error
}
// RelayServerAPI exposes the store & query transaction functionality of a relay server.
type RelayServerAPI interface {
// Store transactions for forwarding to the destination at a later time.
PerformStoreTransaction(
ctx context.Context,
request *PerformStoreTransactionRequest,
response *PerformStoreTransactionResponse,
transaction gomatrixserverlib.Transaction,
userID gomatrixserverlib.UserID,
) error
// Obtain the oldest stored transaction for the specified userID.
QueryTransactions(
ctx context.Context,
request *QueryRelayTransactionsRequest,
response *QueryRelayTransactionsResponse,
) error
}
type PerformRelayServerSyncRequest struct {
UserID gomatrixserverlib.UserID `json:"user_id"`
RelayServer gomatrixserverlib.ServerName `json:"relay_server"`
}
type PerformRelayServerSyncResponse struct {
}
type QueryRelayServersRequest struct {
Server gomatrixserverlib.ServerName
}
type QueryRelayServersResponse struct {
RelayServers []gomatrixserverlib.ServerName
}
type PerformStoreTransactionRequest struct {
Txn gomatrixserverlib.Transaction `json:"transaction"`
UserID gomatrixserverlib.UserID `json:"user_id"`
}
type PerformStoreTransactionResponse struct {
}
type QueryRelayTransactionsRequest struct {
UserID gomatrixserverlib.UserID `json:"user_id"`
PreviousEntry gomatrixserverlib.RelayEntry `json:"prev_entry,omitempty"`
userID gomatrixserverlib.UserID,
previousEntry gomatrixserverlib.RelayEntry,
) (QueryRelayTransactionsResponse, error)
}
type QueryRelayTransactionsResponse struct {

View file

@ -27,11 +27,11 @@ import (
// PerformRelayServerSync implements api.RelayInternalAPI
func (r *RelayInternalAPI) PerformRelayServerSync(
ctx context.Context,
request *api.PerformRelayServerSyncRequest,
response *api.PerformRelayServerSyncResponse,
userID gomatrixserverlib.UserID,
relayServer gomatrixserverlib.ServerName,
) error {
prevEntry := gomatrixserverlib.RelayEntry{EntryID: -1}
asyncResponse, err := r.fedClient.P2PGetTransactionFromRelay(ctx, request.UserID, prevEntry, request.RelayServer)
asyncResponse, err := r.fedClient.P2PGetTransactionFromRelay(ctx, userID, prevEntry, relayServer)
if err != nil {
logrus.Errorf("P2PGetTransactionFromRelay: %s", err.Error())
return err
@ -40,7 +40,7 @@ func (r *RelayInternalAPI) PerformRelayServerSync(
for asyncResponse.EntriesQueued {
logrus.Infof("Retrieving next entry from relay, previous: %v", prevEntry)
asyncResponse, err = r.fedClient.P2PGetTransactionFromRelay(ctx, request.UserID, prevEntry, request.RelayServer)
asyncResponse, err = r.fedClient.P2PGetTransactionFromRelay(ctx, userID, prevEntry, relayServer)
prevEntry = gomatrixserverlib.RelayEntry{EntryID: asyncResponse.EntryID}
if err != nil {
logrus.Errorf("P2PGetTransactionFromRelay: %s", err.Error())
@ -55,11 +55,11 @@ func (r *RelayInternalAPI) PerformRelayServerSync(
// PerformStoreTransaction implements api.RelayInternalAPI
func (r *RelayInternalAPI) PerformStoreTransaction(
ctx context.Context,
request *api.PerformStoreTransactionRequest,
response *api.PerformStoreTransactionResponse,
transaction gomatrixserverlib.Transaction,
userID gomatrixserverlib.UserID,
) error {
logrus.Warnf("Storing transaction for %v", request.UserID)
receipt, err := r.db.StoreTransaction(ctx, request.Txn)
logrus.Warnf("Storing transaction for %v", userID)
receipt, err := r.db.StoreTransaction(ctx, transaction)
if err != nil {
logrus.Errorf("db.StoreTransaction: %s", err.Error())
return err
@ -67,9 +67,9 @@ func (r *RelayInternalAPI) PerformStoreTransaction(
err = r.db.AssociateTransactionWithDestinations(
ctx,
map[gomatrixserverlib.UserID]struct{}{
request.UserID: {},
userID: {},
},
request.Txn.TransactionID,
transaction.TransactionID,
receipt)
return err
@ -78,41 +78,42 @@ func (r *RelayInternalAPI) PerformStoreTransaction(
// QueryTransactions implements api.RelayInternalAPI
func (r *RelayInternalAPI) QueryTransactions(
ctx context.Context,
request *api.QueryRelayTransactionsRequest,
response *api.QueryRelayTransactionsResponse,
) error {
logrus.Infof("QueryTransactions for %s", request.UserID.Raw())
if request.PreviousEntry.EntryID >= 0 {
userID gomatrixserverlib.UserID,
previousEntry gomatrixserverlib.RelayEntry,
) (api.QueryRelayTransactionsResponse, error) {
logrus.Infof("QueryTransactions for %s", userID.Raw())
if previousEntry.EntryID >= 0 {
logrus.Infof("Cleaning previous entry (%v) from db for %s",
request.PreviousEntry.EntryID,
request.UserID.Raw(),
previousEntry.EntryID,
userID.Raw(),
)
prevReceipt := receipt.NewReceipt(request.PreviousEntry.EntryID)
err := r.db.CleanTransactions(ctx, request.UserID, []*receipt.Receipt{&prevReceipt})
prevReceipt := receipt.NewReceipt(previousEntry.EntryID)
err := r.db.CleanTransactions(ctx, userID, []*receipt.Receipt{&prevReceipt})
if err != nil {
logrus.Errorf("db.CleanTransactions: %s", err.Error())
return err
return api.QueryRelayTransactionsResponse{}, err
}
}
transaction, receipt, err := r.db.GetTransaction(ctx, request.UserID)
transaction, receipt, err := r.db.GetTransaction(ctx, userID)
if err != nil {
logrus.Errorf("db.GetTransaction: %s", err.Error())
return err
return api.QueryRelayTransactionsResponse{}, err
}
response := api.QueryRelayTransactionsResponse{}
if transaction != nil && receipt != nil {
logrus.Infof("Obtained transaction (%v) for %s", transaction.TransactionID, request.UserID.Raw())
logrus.Infof("Obtained transaction (%v) for %s", transaction.TransactionID, userID.Raw())
response.Transaction = *transaction
response.EntryID = receipt.GetNID()
response.EntriesQueued = true
} else {
logrus.Infof("No more entries in the queue for %s", request.UserID.Raw())
logrus.Infof("No more entries in the queue for %s", userID.Raw())
response.EntryID = -1
response.EntriesQueued = false
}
return nil
return response, nil
}
func (r *RelayInternalAPI) processTransaction(txn *gomatrixserverlib.Transaction) {

View file

@ -21,7 +21,6 @@ import (
fedAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/relayapi/api"
"github.com/matrix-org/dendrite/relayapi/storage/shared"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/gomatrixserverlib"
@ -76,12 +75,7 @@ func TestPerformRelayServerSync(t *testing.T) {
&db, fedClient, nil, nil, nil, false, "",
)
req := api.PerformRelayServerSyncRequest{
UserID: *userID,
RelayServer: gomatrixserverlib.ServerName("relay"),
}
res := api.PerformRelayServerSyncResponse{}
err = relayAPI.PerformRelayServerSync(context.Background(), &req, &res)
err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay"))
assert.NoError(t, err)
}
@ -101,12 +95,7 @@ func TestPerformRelayServerSyncFedError(t *testing.T) {
&db, fedClient, nil, nil, nil, false, "",
)
req := api.PerformRelayServerSyncRequest{
UserID: *userID,
RelayServer: gomatrixserverlib.ServerName("relay"),
}
res := api.PerformRelayServerSyncResponse{}
err = relayAPI.PerformRelayServerSync(context.Background(), &req, &res)
err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay"))
assert.Error(t, err)
}
@ -126,12 +115,7 @@ func TestPerformRelayServerSyncRunsUntilQueueEmpty(t *testing.T) {
&db, fedClient, nil, nil, nil, false, "",
)
req := api.PerformRelayServerSyncRequest{
UserID: *userID,
RelayServer: gomatrixserverlib.ServerName("relay"),
}
res := api.PerformRelayServerSyncResponse{}
err = relayAPI.PerformRelayServerSync(context.Background(), &req, &res)
err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay"))
assert.NoError(t, err)
assert.Equal(t, uint(3), fedClient.queryCount)
}

View file

@ -1,88 +0,0 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inthttp
import (
"context"
"errors"
"net/http"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/relayapi/api"
)
// HTTP paths for the internal HTTP API
const (
RelayAPIPerformRelayServerSyncPath = "/relayapi/performRelayServerSync"
RelayAPIPerformStoreTransactionPath = "/relayapi/performStoreTransaction"
RelayAPIQueryTransactionsPath = "/relayapi/queryTransactions"
)
// NewRelayAPIClient creates a RelayInternalAPI implemented by talking to a HTTP POST API.
// If httpClient is nil an error is returned
func NewRelayAPIClient(
relayapiURL string,
httpClient *http.Client,
cache caching.ServerKeyCache,
) (api.RelayInternalAPI, error) {
if httpClient == nil {
return nil, errors.New("NewRelayInternalAPIHTTP: httpClient is <nil>")
}
return &httpRelayInternalAPI{
relayAPIURL: relayapiURL,
httpClient: httpClient,
cache: cache,
}, nil
}
type httpRelayInternalAPI struct {
relayAPIURL string
httpClient *http.Client
cache caching.ServerKeyCache
}
func (h *httpRelayInternalAPI) PerformRelayServerSync(
ctx context.Context,
request *api.PerformRelayServerSyncRequest,
response *api.PerformRelayServerSyncResponse,
) error {
return httputil.CallInternalRPCAPI(
"PerformRelayServerSync", h.relayAPIURL+RelayAPIPerformRelayServerSyncPath,
h.httpClient, ctx, request, response,
)
}
func (h *httpRelayInternalAPI) PerformStoreTransaction(
ctx context.Context,
request *api.PerformStoreTransactionRequest,
response *api.PerformStoreTransactionResponse,
) error {
return httputil.CallInternalRPCAPI(
"PerformStoreTransaction", h.relayAPIURL+RelayAPIPerformStoreTransactionPath,
h.httpClient, ctx, request, response,
)
}
func (h *httpRelayInternalAPI) QueryTransactions(
ctx context.Context,
request *api.QueryRelayTransactionsRequest,
response *api.QueryRelayTransactionsResponse,
) error {
return httputil.CallInternalRPCAPI(
"QueryTransactions", h.relayAPIURL+RelayAPIQueryTransactionsPath,
h.httpClient, ctx, request, response,
)
}

View file

@ -1,65 +0,0 @@
package inthttp
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/matrix-org/dendrite/relayapi/api"
"github.com/stretchr/testify/assert"
)
func TestRelayAPIClientNil(t *testing.T) {
_, err := NewRelayAPIClient("", nil, nil)
assert.Error(t, err)
}
func TestRelayAPIClientPerformSync(t *testing.T) {
// Start a local HTTP server
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "/api"+RelayAPIPerformRelayServerSyncPath, req.URL.String())
}))
defer server.Close()
cl, err := NewRelayAPIClient(server.URL, server.Client(), nil)
assert.NoError(t, err)
assert.NotNil(t, cl)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
cl.PerformRelayServerSync(ctx, &api.PerformRelayServerSyncRequest{}, &api.PerformRelayServerSyncResponse{})
}
func TestRelayAPIClientStore(t *testing.T) {
// Start a local HTTP server
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "/api"+RelayAPIPerformStoreTransactionPath, req.URL.String())
}))
defer server.Close()
cl, err := NewRelayAPIClient(server.URL, server.Client(), nil)
assert.NoError(t, err)
assert.NotNil(t, cl)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
cl.PerformStoreTransaction(ctx, &api.PerformStoreTransactionRequest{}, &api.PerformStoreTransactionResponse{})
}
func TestRelayAPIClientQuery(t *testing.T) {
// Start a local HTTP server
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "/api"+RelayAPIQueryTransactionsPath, req.URL.String())
}))
defer server.Close()
cl, err := NewRelayAPIClient(server.URL, server.Client(), nil)
assert.NoError(t, err)
assert.NotNil(t, cl)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
cl.QueryTransactions(ctx, &api.QueryRelayTransactionsRequest{}, &api.QueryRelayTransactionsResponse{})
}

View file

@ -1,53 +0,0 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inthttp
import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/relayapi/api"
)
// AddRoutes adds the RelayInternalAPI handlers to the http.ServeMux.
// nolint:gocyclo
func AddRoutes(intAPI api.RelayInternalAPI, internalAPIMux *mux.Router, enableMetrics bool) {
internalAPIMux.Handle(
RelayAPIPerformRelayServerSyncPath,
httputil.MakeInternalRPCAPI(
"RelayAPIPerformRelayServerSync",
enableMetrics,
intAPI.PerformRelayServerSync,
),
)
internalAPIMux.Handle(
RelayAPIPerformStoreTransactionPath,
httputil.MakeInternalRPCAPI(
"RelayAPIPerformStoreTransaction",
enableMetrics,
intAPI.PerformStoreTransaction,
),
)
internalAPIMux.Handle(
RelayAPIQueryTransactionsPath,
httputil.MakeInternalRPCAPI(
"RelayAPIQueryTransactions",
enableMetrics,
intAPI.QueryTransactions,
),
)
}

View file

@ -15,11 +15,9 @@
package relayapi
import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/relayapi/api"
"github.com/matrix-org/dendrite/relayapi/internal"
"github.com/matrix-org/dendrite/relayapi/inthttp"
"github.com/matrix-org/dendrite/relayapi/routing"
"github.com/matrix-org/dendrite/relayapi/storage"
rsAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -28,12 +26,6 @@ import (
"github.com/sirupsen/logrus"
)
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
// on the given input API.
func AddInternalRoutes(router *mux.Router, intAPI api.RelayInternalAPI, enableMetrics bool) {
inthttp.AddRoutes(intAPI, router, enableMetrics)
}
// AddPublicRoutes sets up and registers HTTP handlers on the base API muxes for the FederationAPI component.
func AddPublicRoutes(
base *base.BaseDendrite,

View file

@ -18,7 +18,6 @@ import (
"testing"
"github.com/matrix-org/dendrite/relayapi"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/stretchr/testify/assert"
@ -50,17 +49,6 @@ func TestCreateRelayInternalInvalidDatabasePanics(t *testing.T) {
})
}
func TestCreateRelayInternalRoutes(t *testing.T) {
base, close := testrig.CreateBaseDendrite(t, test.DBTypeSQLite)
base.Cfg.RelayAPI.InternalAPI.Connect = config.HTTPAddress("http://localhost:8008")
defer close()
relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil)
assert.NotNil(t, relayAPI)
relayapi.AddInternalRoutes(base.InternalAPIMux, relayAPI, false)
}
func TestCreateInvalidRelayPublicRoutesPanics(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType)

View file

@ -40,25 +40,12 @@ func GetTransactionFromRelay(
) util.JSONResponse {
logrus.Infof("Handling relay_txn for %s", userID.Raw())
entryProvided := false
var previousEntry gomatrixserverlib.RelayEntry
previousEntry := gomatrixserverlib.RelayEntry{EntryID: -1}
if err := json.Unmarshal(fedReq.Content(), &previousEntry); err == nil {
logrus.Infof("Previous entry provided: %v", previousEntry.EntryID)
entryProvided = true
}
request := api.QueryRelayTransactionsRequest{
UserID: userID,
PreviousEntry: gomatrixserverlib.RelayEntry{EntryID: -1},
}
if entryProvided {
request.PreviousEntry = previousEntry
}
var response api.QueryRelayTransactionsResponse
err := relayAPI.QueryTransactions(
httpReq.Context(),
&request,
&response)
response, err := relayAPI.QueryTransactions(httpReq.Context(), userID, previousEntry)
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,

View file

@ -65,12 +65,7 @@ func SendTransactionToRelay(
util.GetLogger(httpReq.Context()).Warnf("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, fedReq.Origin(), len(t.PDUs), len(t.EDUs))
req := api.PerformStoreTransactionRequest{
Txn: t,
UserID: userID,
}
res := api.PerformStoreTransactionResponse{}
err := relayAPI.PerformStoreTransaction(httpReq.Context(), &req, &res)
err := relayAPI.PerformStoreTransaction(httpReq.Context(), t, userID)
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,