mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 18:43:10 -06:00
Another incremental renaming refactor of relay server
This commit is contained in:
parent
7b2cbc7133
commit
ed71fe63ae
|
|
@ -92,7 +92,7 @@ type FederationClient interface {
|
||||||
SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error)
|
SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error)
|
||||||
|
|
||||||
P2PSendTransactionToRelay(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error)
|
P2PSendTransactionToRelay(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error)
|
||||||
P2PGetTransactionFromRelay(ctx context.Context, u gomatrixserverlib.UserID, prev gomatrixserverlib.RelayEntry, relayServer gomatrixserverlib.ServerName) (res gomatrixserverlib.RespGetRelayTxn, err error)
|
P2PGetTransactionFromRelay(ctx context.Context, u gomatrixserverlib.UserID, prev gomatrixserverlib.RelayEntry, relayServer gomatrixserverlib.ServerName) (res gomatrixserverlib.RespGetRelayTransaction, err error)
|
||||||
|
|
||||||
// Perform operations
|
// Perform operations
|
||||||
LookupRoomAlias(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomAlias string) (res gomatrixserverlib.RespDirectory, err error)
|
LookupRoomAlias(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomAlias string) (res gomatrixserverlib.RespDirectory, err error)
|
||||||
|
|
|
||||||
|
|
@ -32,18 +32,18 @@ type RelayInternalAPI interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type RelayServerAPI interface {
|
type RelayServerAPI interface {
|
||||||
// Store async transactions for forwarding to the destination at a later time.
|
// Store transactions for forwarding to the destination at a later time.
|
||||||
PerformStoreAsync(
|
PerformStoreTransaction(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *PerformStoreAsyncRequest,
|
request *PerformStoreTransactionRequest,
|
||||||
response *PerformStoreAsyncResponse,
|
response *PerformStoreTransactionResponse,
|
||||||
) error
|
) error
|
||||||
|
|
||||||
// Obtain the oldest stored transaction for the specified userID.
|
// Obtain the oldest stored transaction for the specified userID.
|
||||||
QueryAsyncTransactions(
|
QueryTransactions(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *QueryAsyncTransactionsRequest,
|
request *QueryRelayTransactionsRequest,
|
||||||
response *QueryAsyncTransactionsResponse,
|
response *QueryRelayTransactionsResponse,
|
||||||
) error
|
) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -63,21 +63,21 @@ type QueryRelayServersResponse struct {
|
||||||
RelayServers []gomatrixserverlib.ServerName
|
RelayServers []gomatrixserverlib.ServerName
|
||||||
}
|
}
|
||||||
|
|
||||||
type PerformStoreAsyncRequest struct {
|
type PerformStoreTransactionRequest struct {
|
||||||
Txn gomatrixserverlib.Transaction `json:"transaction"`
|
Txn gomatrixserverlib.Transaction `json:"transaction"`
|
||||||
UserID gomatrixserverlib.UserID `json:"user_id"`
|
UserID gomatrixserverlib.UserID `json:"user_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type PerformStoreAsyncResponse struct {
|
type PerformStoreTransactionResponse struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueryAsyncTransactionsRequest struct {
|
type QueryRelayTransactionsRequest struct {
|
||||||
UserID gomatrixserverlib.UserID `json:"user_id"`
|
UserID gomatrixserverlib.UserID `json:"user_id"`
|
||||||
PreviousEntry gomatrixserverlib.RelayEntry `json:"prev_entry,omitempty"`
|
PreviousEntry gomatrixserverlib.RelayEntry `json:"prev_entry,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueryAsyncTransactionsResponse struct {
|
type QueryRelayTransactionsResponse struct {
|
||||||
Txn gomatrixserverlib.Transaction `json:"transaction"`
|
Transaction gomatrixserverlib.Transaction `json:"transaction"`
|
||||||
EntryID int64 `json:"entry_id"`
|
EntryID int64 `json:"entry_id"`
|
||||||
EntriesQueued bool `json:"entries_queued"`
|
EntriesQueued bool `json:"entries_queued"`
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -53,10 +53,10 @@ func (r *RelayInternalAPI) PerformRelayServerSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
// PerformStoreAsync implements api.RelayInternalAPI
|
// PerformStoreAsync implements api.RelayInternalAPI
|
||||||
func (r *RelayInternalAPI) PerformStoreAsync(
|
func (r *RelayInternalAPI) PerformStoreTransaction(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.PerformStoreAsyncRequest,
|
request *api.PerformStoreTransactionRequest,
|
||||||
response *api.PerformStoreAsyncResponse,
|
response *api.PerformStoreTransactionResponse,
|
||||||
) error {
|
) error {
|
||||||
logrus.Warnf("Storing transaction for %v", request.UserID)
|
logrus.Warnf("Storing transaction for %v", request.UserID)
|
||||||
receipt, err := r.db.StoreAsyncTransaction(ctx, request.Txn)
|
receipt, err := r.db.StoreAsyncTransaction(ctx, request.Txn)
|
||||||
|
|
@ -76,12 +76,12 @@ func (r *RelayInternalAPI) PerformStoreAsync(
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryAsyncTransactions implements api.RelayInternalAPI
|
// QueryAsyncTransactions implements api.RelayInternalAPI
|
||||||
func (r *RelayInternalAPI) QueryAsyncTransactions(
|
func (r *RelayInternalAPI) QueryTransactions(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.QueryAsyncTransactionsRequest,
|
request *api.QueryRelayTransactionsRequest,
|
||||||
response *api.QueryAsyncTransactionsResponse,
|
response *api.QueryRelayTransactionsResponse,
|
||||||
) error {
|
) error {
|
||||||
logrus.Infof("QueryAsyncTransactions for %s", request.UserID.Raw())
|
logrus.Infof("QueryTransactions for %s", request.UserID.Raw())
|
||||||
if request.PreviousEntry.EntryID >= 0 {
|
if request.PreviousEntry.EntryID >= 0 {
|
||||||
logrus.Infof("Cleaning previous entry (%v) from db for %s",
|
logrus.Infof("Cleaning previous entry (%v) from db for %s",
|
||||||
request.PreviousEntry.EntryID,
|
request.PreviousEntry.EntryID,
|
||||||
|
|
@ -103,7 +103,7 @@ func (r *RelayInternalAPI) QueryAsyncTransactions(
|
||||||
|
|
||||||
if transaction != nil && receipt != nil {
|
if transaction != nil && receipt != nil {
|
||||||
logrus.Infof("Obtained transaction (%v) for %s", transaction.TransactionID, request.UserID.Raw())
|
logrus.Infof("Obtained transaction (%v) for %s", transaction.TransactionID, request.UserID.Raw())
|
||||||
response.Txn = *transaction
|
response.Transaction = *transaction
|
||||||
response.EntryID = receipt.GetNID()
|
response.EntryID = receipt.GetNID()
|
||||||
response.EntriesQueued = true
|
response.EntriesQueued = true
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -35,10 +35,10 @@ type testFedClient struct {
|
||||||
queueDepth uint
|
queueDepth uint
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *testFedClient) P2PGetTransactionFromRelay(ctx context.Context, u gomatrixserverlib.UserID, prev gomatrixserverlib.RelayEntry, relayServer gomatrixserverlib.ServerName) (res gomatrixserverlib.RespGetRelayTxn, err error) {
|
func (f *testFedClient) P2PGetTransactionFromRelay(ctx context.Context, u gomatrixserverlib.UserID, prev gomatrixserverlib.RelayEntry, relayServer gomatrixserverlib.ServerName) (res gomatrixserverlib.RespGetRelayTransaction, err error) {
|
||||||
f.queryCount++
|
f.queryCount++
|
||||||
if !f.shouldFail {
|
if !f.shouldFail {
|
||||||
res = gomatrixserverlib.RespGetRelayTxn{
|
res = gomatrixserverlib.RespGetRelayTransaction{
|
||||||
Txn: gomatrixserverlib.Transaction{},
|
Txn: gomatrixserverlib.Transaction{},
|
||||||
EntryID: 0,
|
EntryID: 0,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,9 +26,9 @@ import (
|
||||||
|
|
||||||
// HTTP paths for the internal HTTP API
|
// HTTP paths for the internal HTTP API
|
||||||
const (
|
const (
|
||||||
RelayAPIPerformRelayServerSyncPath = "/relayapi/performRelayServerSync"
|
RelayAPIPerformRelayServerSyncPath = "/relayapi/performRelayServerSync"
|
||||||
RelayAPIPerformStoreAsyncPath = "/relayapi/performStoreAsync"
|
RelayAPIPerformStoreTransactionPath = "/relayapi/performStoreTransaction"
|
||||||
RelayAPIQueryAsyncTransactionsPath = "/relayapi/queryAsyncTransactions"
|
RelayAPIQueryTransactionsPath = "/relayapi/queryTransactions"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewRelayAPIClient creates a RelayInternalAPI implemented by talking to a HTTP POST API.
|
// NewRelayAPIClient creates a RelayInternalAPI implemented by talking to a HTTP POST API.
|
||||||
|
|
@ -61,24 +61,24 @@ func (h *httpRelayInternalAPI) PerformRelayServerSync(
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpRelayInternalAPI) PerformStoreAsync(
|
func (h *httpRelayInternalAPI) PerformStoreTransaction(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.PerformStoreAsyncRequest,
|
request *api.PerformStoreTransactionRequest,
|
||||||
response *api.PerformStoreAsyncResponse,
|
response *api.PerformStoreTransactionResponse,
|
||||||
) error {
|
) error {
|
||||||
return httputil.CallInternalRPCAPI(
|
return httputil.CallInternalRPCAPI(
|
||||||
"PerformStoreAsync", h.relayAPIURL+RelayAPIPerformStoreAsyncPath,
|
"PerformStoreTransaction", h.relayAPIURL+RelayAPIPerformStoreTransactionPath,
|
||||||
h.httpClient, ctx, request, response,
|
h.httpClient, ctx, request, response,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpRelayInternalAPI) QueryAsyncTransactions(
|
func (h *httpRelayInternalAPI) QueryTransactions(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.QueryAsyncTransactionsRequest,
|
request *api.QueryRelayTransactionsRequest,
|
||||||
response *api.QueryAsyncTransactionsResponse,
|
response *api.QueryRelayTransactionsResponse,
|
||||||
) error {
|
) error {
|
||||||
return httputil.CallInternalRPCAPI(
|
return httputil.CallInternalRPCAPI(
|
||||||
"QueryAsyncTransactions", h.relayAPIURL+RelayAPIQueryAsyncTransactionsPath,
|
"QueryTransactions", h.relayAPIURL+RelayAPIQueryTransactionsPath,
|
||||||
h.httpClient, ctx, request, response,
|
h.httpClient, ctx, request, response,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ func TestRelayAPIClientPerormSync(t *testing.T) {
|
||||||
func TestRelayAPIClientStore(t *testing.T) {
|
func TestRelayAPIClientStore(t *testing.T) {
|
||||||
// Start a local HTTP server
|
// Start a local HTTP server
|
||||||
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
assert.Equal(t, "/api"+RelayAPIPerformStoreAsyncPath, req.URL.String())
|
assert.Equal(t, "/api"+RelayAPIPerformStoreTransactionPath, req.URL.String())
|
||||||
}))
|
}))
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
|
|
@ -45,13 +45,13 @@ func TestRelayAPIClientStore(t *testing.T) {
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cl.PerformStoreAsync(ctx, &api.PerformStoreAsyncRequest{}, &api.PerformStoreAsyncResponse{})
|
cl.PerformStoreTransaction(ctx, &api.PerformStoreTransactionRequest{}, &api.PerformStoreTransactionResponse{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRelayAPIClientQuery(t *testing.T) {
|
func TestRelayAPIClientQuery(t *testing.T) {
|
||||||
// Start a local HTTP server
|
// Start a local HTTP server
|
||||||
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
assert.Equal(t, "/api"+RelayAPIQueryAsyncTransactionsPath, req.URL.String())
|
assert.Equal(t, "/api"+RelayAPIQueryTransactionsPath, req.URL.String())
|
||||||
}))
|
}))
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
|
|
@ -61,5 +61,5 @@ func TestRelayAPIClientQuery(t *testing.T) {
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cl.QueryAsyncTransactions(ctx, &api.QueryAsyncTransactionsRequest{}, &api.QueryAsyncTransactionsResponse{})
|
cl.QueryTransactions(ctx, &api.QueryRelayTransactionsRequest{}, &api.QueryRelayTransactionsResponse{})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,12 +30,12 @@ func AddRoutes(intAPI api.RelayInternalAPI, internalAPIMux *mux.Router, enableMe
|
||||||
)
|
)
|
||||||
|
|
||||||
internalAPIMux.Handle(
|
internalAPIMux.Handle(
|
||||||
RelayAPIPerformStoreAsyncPath,
|
RelayAPIPerformStoreTransactionPath,
|
||||||
httputil.MakeInternalRPCAPI("RelayAPIPerformStoreAsync", enableMetrics, intAPI.PerformStoreAsync),
|
httputil.MakeInternalRPCAPI("RelayAPIPerformStoreTransaction", enableMetrics, intAPI.PerformStoreTransaction),
|
||||||
)
|
)
|
||||||
|
|
||||||
internalAPIMux.Handle(
|
internalAPIMux.Handle(
|
||||||
RelayAPIQueryAsyncTransactionsPath,
|
RelayAPIQueryTransactionsPath,
|
||||||
httputil.MakeInternalRPCAPI("RelayAPIQueryAsyncTransactions", enableMetrics, intAPI.QueryAsyncTransactions),
|
httputil.MakeInternalRPCAPI("RelayAPIQueryTransactions", enableMetrics, intAPI.QueryTransactions),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,8 +24,8 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RelayTxnResponse struct {
|
type RelayTransactionResponse struct {
|
||||||
Txn gomatrixserverlib.Transaction `json:"transaction"`
|
Transaction gomatrixserverlib.Transaction `json:"transaction"`
|
||||||
EntryID int64 `json:"entry_id,omitempty"`
|
EntryID int64 `json:"entry_id,omitempty"`
|
||||||
EntriesQueued bool `json:"entries_queued"`
|
EntriesQueued bool `json:"entries_queued"`
|
||||||
}
|
}
|
||||||
|
|
@ -47,15 +47,15 @@ func GetTxnFromRelay(
|
||||||
entryProvided = true
|
entryProvided = true
|
||||||
}
|
}
|
||||||
|
|
||||||
request := api.QueryAsyncTransactionsRequest{
|
request := api.QueryRelayTransactionsRequest{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
PreviousEntry: gomatrixserverlib.RelayEntry{EntryID: -1},
|
PreviousEntry: gomatrixserverlib.RelayEntry{EntryID: -1},
|
||||||
}
|
}
|
||||||
if entryProvided {
|
if entryProvided {
|
||||||
request.PreviousEntry = previousEntry
|
request.PreviousEntry = previousEntry
|
||||||
}
|
}
|
||||||
var response api.QueryAsyncTransactionsResponse
|
var response api.QueryRelayTransactionsResponse
|
||||||
err := relayAPI.QueryAsyncTransactions(
|
err := relayAPI.QueryTransactions(
|
||||||
httpReq.Context(),
|
httpReq.Context(),
|
||||||
&request,
|
&request,
|
||||||
&response)
|
&response)
|
||||||
|
|
@ -67,8 +67,8 @@ func GetTxnFromRelay(
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: RelayTxnResponse{
|
JSON: RelayTransactionResponse{
|
||||||
Txn: response.Txn,
|
Transaction: response.Transaction,
|
||||||
EntryID: response.EntryID,
|
EntryID: response.EntryID,
|
||||||
EntriesQueued: response.EntriesQueued,
|
EntriesQueued: response.EntriesQueued,
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -64,9 +64,9 @@ func TestGetAsyncEmptyDatabaseReturnsNothing(t *testing.T) {
|
||||||
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
||||||
assert.Equal(t, http.StatusOK, response.Code)
|
assert.Equal(t, http.StatusOK, response.Code)
|
||||||
|
|
||||||
jsonResponse := response.JSON.(routing.RelayTxnResponse)
|
jsonResponse := response.JSON.(routing.RelayTransactionResponse)
|
||||||
assert.Equal(t, false, jsonResponse.EntriesQueued)
|
assert.Equal(t, false, jsonResponse.EntriesQueued)
|
||||||
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Txn)
|
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction)
|
||||||
|
|
||||||
count, err := db.GetAsyncTransactionCount(context.Background(), *userID)
|
count, err := db.GetAsyncTransactionCount(context.Background(), *userID)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
@ -105,18 +105,18 @@ func TestGetAsyncReturnsSavedTransaction(t *testing.T) {
|
||||||
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
||||||
assert.Equal(t, http.StatusOK, response.Code)
|
assert.Equal(t, http.StatusOK, response.Code)
|
||||||
|
|
||||||
jsonResponse := response.JSON.(routing.RelayTxnResponse)
|
jsonResponse := response.JSON.(routing.RelayTransactionResponse)
|
||||||
assert.True(t, jsonResponse.EntriesQueued)
|
assert.True(t, jsonResponse.EntriesQueued)
|
||||||
assert.Equal(t, transaction, jsonResponse.Txn)
|
assert.Equal(t, transaction, jsonResponse.Transaction)
|
||||||
|
|
||||||
// And once more to clear the queue
|
// And once more to clear the queue
|
||||||
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
|
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
|
||||||
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
||||||
assert.Equal(t, http.StatusOK, response.Code)
|
assert.Equal(t, http.StatusOK, response.Code)
|
||||||
|
|
||||||
jsonResponse = response.JSON.(routing.RelayTxnResponse)
|
jsonResponse = response.JSON.(routing.RelayTransactionResponse)
|
||||||
assert.False(t, jsonResponse.EntriesQueued)
|
assert.False(t, jsonResponse.EntriesQueued)
|
||||||
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Txn)
|
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction)
|
||||||
|
|
||||||
count, err := db.GetAsyncTransactionCount(context.Background(), *userID)
|
count, err := db.GetAsyncTransactionCount(context.Background(), *userID)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
@ -168,26 +168,26 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) {
|
||||||
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
||||||
assert.Equal(t, http.StatusOK, response.Code)
|
assert.Equal(t, http.StatusOK, response.Code)
|
||||||
|
|
||||||
jsonResponse := response.JSON.(routing.RelayTxnResponse)
|
jsonResponse := response.JSON.(routing.RelayTransactionResponse)
|
||||||
assert.True(t, jsonResponse.EntriesQueued)
|
assert.True(t, jsonResponse.EntriesQueued)
|
||||||
assert.Equal(t, transaction, jsonResponse.Txn)
|
assert.Equal(t, transaction, jsonResponse.Transaction)
|
||||||
|
|
||||||
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
|
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
|
||||||
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
||||||
assert.Equal(t, http.StatusOK, response.Code)
|
assert.Equal(t, http.StatusOK, response.Code)
|
||||||
|
|
||||||
jsonResponse = response.JSON.(routing.RelayTxnResponse)
|
jsonResponse = response.JSON.(routing.RelayTransactionResponse)
|
||||||
assert.True(t, jsonResponse.EntriesQueued)
|
assert.True(t, jsonResponse.EntriesQueued)
|
||||||
assert.Equal(t, transaction2, jsonResponse.Txn)
|
assert.Equal(t, transaction2, jsonResponse.Transaction)
|
||||||
|
|
||||||
// And once more to clear the queue
|
// And once more to clear the queue
|
||||||
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
|
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
|
||||||
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
|
||||||
assert.Equal(t, http.StatusOK, response.Code)
|
assert.Equal(t, http.StatusOK, response.Code)
|
||||||
|
|
||||||
jsonResponse = response.JSON.(routing.RelayTxnResponse)
|
jsonResponse = response.JSON.(routing.RelayTransactionResponse)
|
||||||
assert.False(t, jsonResponse.EntriesQueued)
|
assert.False(t, jsonResponse.EntriesQueued)
|
||||||
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Txn)
|
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction)
|
||||||
|
|
||||||
count, err := db.GetAsyncTransactionCount(context.Background(), *userID)
|
count, err := db.GetAsyncTransactionCount(context.Background(), *userID)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
|
||||||
|
|
@ -30,8 +30,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
SendRelayTransactionRouteName = "SendRelayTxn"
|
SendRelayTransactionRouteName = "SendRelayTransaction"
|
||||||
GetRelayTransactionRouteName = "GetRelayTxn"
|
GetRelayTransactionRouteName = "GetRelayTransaction"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Setup registers HTTP handlers with the given ServeMux.
|
// Setup registers HTTP handlers with the given ServeMux.
|
||||||
|
|
@ -60,7 +60,7 @@ func Setup(
|
||||||
JSON: jsonerror.InvalidUsername("Username was invalid"),
|
JSON: jsonerror.InvalidUsername("Username was invalid"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SendTxnToRelay(
|
return SendTransactionToRelay(
|
||||||
httpReq, request, relayAPI, gomatrixserverlib.TransactionID(vars["txnID"]),
|
httpReq, request, relayAPI, gomatrixserverlib.TransactionID(vars["txnID"]),
|
||||||
*userID,
|
*userID,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -131,7 +131,7 @@ func TestHandleSendRelayBadUserID(t *testing.T) {
|
||||||
assert.NotEqual(t, 200, res.StatusCode)
|
assert.NotEqual(t, 200, res.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHandleRelayTxn(t *testing.T) {
|
func TestHandleRelayTransaction(t *testing.T) {
|
||||||
base, close := testrig.CreateBaseDendrite(t, test.DBTypeSQLite)
|
base, close := testrig.CreateBaseDendrite(t, test.DBTypeSQLite)
|
||||||
defer close()
|
defer close()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -25,9 +25,9 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SendTxnToRelay implements PUT /_matrix/federation/v1/relay_txn/{txnID}/{userID}
|
// SendTransactionToRelay implements PUT /_matrix/federation/v1/relay_txn/{txnID}/{userID}
|
||||||
// This endpoint can be extracted into a separate relay server service.
|
// This endpoint can be extracted into a separate relay server service.
|
||||||
func SendTxnToRelay(
|
func SendTransactionToRelay(
|
||||||
httpReq *http.Request,
|
httpReq *http.Request,
|
||||||
fedReq *gomatrixserverlib.FederationRequest,
|
fedReq *gomatrixserverlib.FederationRequest,
|
||||||
relayAPI api.RelayInternalAPI,
|
relayAPI api.RelayInternalAPI,
|
||||||
|
|
@ -65,12 +65,12 @@ func SendTxnToRelay(
|
||||||
|
|
||||||
util.GetLogger(httpReq.Context()).Warnf("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, fedReq.Origin(), len(t.PDUs), len(t.EDUs))
|
util.GetLogger(httpReq.Context()).Warnf("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, fedReq.Origin(), len(t.PDUs), len(t.EDUs))
|
||||||
|
|
||||||
req := api.PerformStoreAsyncRequest{
|
req := api.PerformStoreTransactionRequest{
|
||||||
Txn: t,
|
Txn: t,
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
}
|
}
|
||||||
res := api.PerformStoreAsyncResponse{}
|
res := api.PerformStoreTransactionResponse{}
|
||||||
err := relayAPI.PerformStoreAsync(httpReq.Context(), &req, &res)
|
err := relayAPI.PerformStoreTransaction(httpReq.Context(), &req, &res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusInternalServerError,
|
Code: http.StatusInternalServerError,
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,7 @@ func TestForwardEmptyReturnsOk(t *testing.T) {
|
||||||
&db, nil, nil, nil, nil, false, "",
|
&db, nil, nil, nil, nil, false, "",
|
||||||
)
|
)
|
||||||
|
|
||||||
response := routing.SendTxnToRelay(httpReq, &request, relayAPI, "1", *userID)
|
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
|
||||||
|
|
||||||
assert.Equal(t, 200, response.Code)
|
assert.Equal(t, 200, response.Code)
|
||||||
}
|
}
|
||||||
|
|
@ -104,7 +104,7 @@ func TestForwardBadJSONReturnsError(t *testing.T) {
|
||||||
&db, nil, nil, nil, nil, false, "",
|
&db, nil, nil, nil, nil, false, "",
|
||||||
)
|
)
|
||||||
|
|
||||||
response := routing.SendTxnToRelay(httpReq, &request, relayAPI, "1", *userID)
|
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
|
||||||
|
|
||||||
assert.NotEqual(t, 200, response.Code)
|
assert.NotEqual(t, 200, response.Code)
|
||||||
}
|
}
|
||||||
|
|
@ -138,7 +138,7 @@ func TestForwardTooManyPDUsReturnsError(t *testing.T) {
|
||||||
&db, nil, nil, nil, nil, false, "",
|
&db, nil, nil, nil, nil, false, "",
|
||||||
)
|
)
|
||||||
|
|
||||||
response := routing.SendTxnToRelay(httpReq, &request, relayAPI, "1", *userID)
|
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
|
||||||
|
|
||||||
assert.NotEqual(t, 200, response.Code)
|
assert.NotEqual(t, 200, response.Code)
|
||||||
}
|
}
|
||||||
|
|
@ -172,7 +172,7 @@ func TestForwardTooManyEDUsReturnsError(t *testing.T) {
|
||||||
&db, nil, nil, nil, nil, false, "",
|
&db, nil, nil, nil, nil, false, "",
|
||||||
)
|
)
|
||||||
|
|
||||||
response := routing.SendTxnToRelay(httpReq, &request, relayAPI, "1", *userID)
|
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
|
||||||
|
|
||||||
assert.NotEqual(t, 200, response.Code)
|
assert.NotEqual(t, 200, response.Code)
|
||||||
}
|
}
|
||||||
|
|
@ -195,7 +195,7 @@ func TestUniqueTransactionStoredInDatabase(t *testing.T) {
|
||||||
&db, nil, nil, nil, nil, false, "",
|
&db, nil, nil, nil, nil, false, "",
|
||||||
)
|
)
|
||||||
|
|
||||||
response := routing.SendTxnToRelay(
|
response := routing.SendTransactionToRelay(
|
||||||
httpReq, &request, relayAPI, txn.TransactionID, *userID)
|
httpReq, &request, relayAPI, txn.TransactionID, *userID)
|
||||||
transaction, _, err := db.GetAsyncTransaction(context.TODO(), *userID)
|
transaction, _, err := db.GetAsyncTransaction(context.TODO(), *userID)
|
||||||
assert.NoError(t, err, "Failed retrieving transaction")
|
assert.NoError(t, err, "Failed retrieving transaction")
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue