Incremental renaming refactor of relay server

This commit is contained in:
Devon Hudson 2023-01-11 14:14:57 -07:00
parent 1b2a043993
commit 7b2cbc7133
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
13 changed files with 63 additions and 63 deletions

View file

@ -575,7 +575,7 @@ func (m *RelayServerRetriever) GetQueriedServerStatus() map[gomatrixserverlib.Se
}
func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) {
logrus.Info("querying relay servers for async_events")
logrus.Info("querying relay servers for any available transactions")
for _, server := range relayServers {
userID, err := gomatrixserverlib.NewUserID("@user:"+string(m.ServerName), false)
if err != nil {

View file

@ -422,7 +422,7 @@ func (m *RelayServerRetriever) syncRelayServers(stop <-chan bool, running atomic
}
func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) {
logrus.Info("querying relay servers for async_events")
logrus.Info("querying relay servers for any available transactions")
for _, server := range relayServers {
userID, err := gomatrixserverlib.NewUserID("@user:"+string(m.ServerName), false)
if err != nil {

View file

@ -91,8 +91,8 @@ type FederationClient interface {
gomatrixserverlib.KeyClient
SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error)
SendAsyncTransaction(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error)
GetAsyncEvents(ctx context.Context, u gomatrixserverlib.UserID, prev gomatrixserverlib.RelayEntry, relayServer gomatrixserverlib.ServerName) (res gomatrixserverlib.RespGetAsyncEvents, err error)
P2PSendTransactionToRelay(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error)
P2PGetTransactionFromRelay(ctx context.Context, u gomatrixserverlib.UserID, prev gomatrixserverlib.RelayEntry, relayServer gomatrixserverlib.ServerName) (res gomatrixserverlib.RespGetRelayTxn, err error)
// Perform operations
LookupRoomAlias(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomAlias string) (res gomatrixserverlib.RespDirectory, err error)

View file

@ -418,7 +418,7 @@ func (oq *destinationQueue) nextTransaction(
return userErr, false
}
for _, relayServer := range relayServers {
_, asyncErr := oq.client.SendAsyncTransaction(ctx, *userID, t, relayServer)
_, asyncErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer)
if asyncErr != nil {
err = asyncErr
} else {

View file

@ -89,7 +89,7 @@ func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixse
return gomatrixserverlib.RespSend{}, result
}
func (f *stubFederationClient) SendAsyncTransaction(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error) {
func (f *stubFederationClient) P2PSendTransactionToRelay(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error) {
var result error
if !f.shouldTxAsyncSucceed {
result = fmt.Errorf("async transaction failed")

View file

@ -31,19 +31,19 @@ func (r *RelayInternalAPI) PerformRelayServerSync(
response *api.PerformRelayServerSyncResponse,
) error {
prevEntry := gomatrixserverlib.RelayEntry{EntryID: -1}
asyncResponse, err := r.fedClient.GetAsyncEvents(ctx, request.UserID, prevEntry, request.RelayServer)
asyncResponse, err := r.fedClient.P2PGetTransactionFromRelay(ctx, request.UserID, prevEntry, request.RelayServer)
if err != nil {
logrus.Errorf("GetAsyncEvents: %s", err.Error())
logrus.Errorf("P2PGetTransactionFromRelay: %s", err.Error())
return err
}
r.processTransaction(&asyncResponse.Txn)
for asyncResponse.EntriesQueued {
logrus.Infof("Retrieving next entry from relay, previous: %v", prevEntry)
asyncResponse, err = r.fedClient.GetAsyncEvents(ctx, request.UserID, prevEntry, request.RelayServer)
asyncResponse, err = r.fedClient.P2PGetTransactionFromRelay(ctx, request.UserID, prevEntry, request.RelayServer)
prevEntry = gomatrixserverlib.RelayEntry{EntryID: asyncResponse.EntryID}
if err != nil {
logrus.Errorf("GetAsyncEvents: %s", err.Error())
logrus.Errorf("P2PGetTransactionFromRelay: %s", err.Error())
return err
}
r.processTransaction(&asyncResponse.Txn)

View file

@ -35,10 +35,10 @@ type testFedClient struct {
queueDepth uint
}
func (f *testFedClient) GetAsyncEvents(ctx context.Context, u gomatrixserverlib.UserID, prev gomatrixserverlib.RelayEntry, relayServer gomatrixserverlib.ServerName) (res gomatrixserverlib.RespGetAsyncEvents, err error) {
func (f *testFedClient) P2PGetTransactionFromRelay(ctx context.Context, u gomatrixserverlib.UserID, prev gomatrixserverlib.RelayEntry, relayServer gomatrixserverlib.ServerName) (res gomatrixserverlib.RespGetRelayTxn, err error) {
f.queryCount++
if !f.shouldFail {
res = gomatrixserverlib.RespGetAsyncEvents{
res = gomatrixserverlib.RespGetRelayTxn{
Txn: gomatrixserverlib.Transaction{},
EntryID: 0,
}

View file

@ -24,21 +24,21 @@ import (
"github.com/sirupsen/logrus"
)
type AsyncEventsResponse struct {
type RelayTxnResponse struct {
Txn gomatrixserverlib.Transaction `json:"transaction"`
EntryID int64 `json:"entry_id,omitempty"`
EntriesQueued bool `json:"entries_queued"`
}
// GetAsyncEvents implements /_matrix/federation/v1/async_events/{userID}
// GetTxnFromRelay implements /_matrix/federation/v1/relay_txn/{userID}
// This endpoint can be extracted into a separate relay server service.
func GetAsyncEvents(
func GetTxnFromRelay(
httpReq *http.Request,
fedReq *gomatrixserverlib.FederationRequest,
relayAPI api.RelayInternalAPI,
userID gomatrixserverlib.UserID,
) util.JSONResponse {
logrus.Infof("Handling async_events for %s", userID.Raw())
logrus.Infof("Handling relay_txn for %s", userID.Raw())
entryProvided := false
var previousEntry gomatrixserverlib.RelayEntry
@ -67,7 +67,7 @@ func GetAsyncEvents(
return util.JSONResponse{
Code: http.StatusOK,
JSON: AsyncEventsResponse{
JSON: RelayTxnResponse{
Txn: response.Txn,
EntryID: response.EntryID,
EntriesQueued: response.EntriesQueued,

View file

@ -33,7 +33,7 @@ func createAsyncQuery(
prevEntry gomatrixserverlib.RelayEntry,
) gomatrixserverlib.FederationRequest {
var federationPathPrefixV1 = "/_matrix/federation/v1"
path := federationPathPrefixV1 + "/async_events/" + userID.Raw()
path := federationPathPrefixV1 + "/relay_txn/" + userID.Raw()
request := gomatrixserverlib.NewFederationRequest("GET", userID.Domain(), "relay", path)
request.SetContent(prevEntry)
@ -61,10 +61,10 @@ func TestGetAsyncEmptyDatabaseReturnsNothing(t *testing.T) {
)
request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1})
response := routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID)
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code)
jsonResponse := response.JSON.(routing.AsyncEventsResponse)
jsonResponse := response.JSON.(routing.RelayTxnResponse)
assert.Equal(t, false, jsonResponse.EntriesQueued)
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Txn)
@ -102,19 +102,19 @@ func TestGetAsyncReturnsSavedTransaction(t *testing.T) {
)
request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1})
response := routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID)
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code)
jsonResponse := response.JSON.(routing.AsyncEventsResponse)
jsonResponse := response.JSON.(routing.RelayTxnResponse)
assert.True(t, jsonResponse.EntriesQueued)
assert.Equal(t, transaction, jsonResponse.Txn)
// And once more to clear the queue
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
response = routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID)
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code)
jsonResponse = response.JSON.(routing.AsyncEventsResponse)
jsonResponse = response.JSON.(routing.RelayTxnResponse)
assert.False(t, jsonResponse.EntriesQueued)
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Txn)
@ -165,27 +165,27 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) {
)
request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1})
response := routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID)
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code)
jsonResponse := response.JSON.(routing.AsyncEventsResponse)
jsonResponse := response.JSON.(routing.RelayTxnResponse)
assert.True(t, jsonResponse.EntriesQueued)
assert.Equal(t, transaction, jsonResponse.Txn)
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
response = routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID)
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code)
jsonResponse = response.JSON.(routing.AsyncEventsResponse)
jsonResponse = response.JSON.(routing.RelayTxnResponse)
assert.True(t, jsonResponse.EntriesQueued)
assert.Equal(t, transaction2, jsonResponse.Txn)
// And once more to clear the queue
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
response = routing.GetAsyncEvents(httpReq, &request, relayAPI, *userID)
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code)
jsonResponse = response.JSON.(routing.AsyncEventsResponse)
jsonResponse = response.JSON.(routing.RelayTxnResponse)
assert.False(t, jsonResponse.EntriesQueued)
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Txn)

View file

@ -30,8 +30,8 @@ import (
)
const (
ForwardAsyncRouteName = "ForwardAsync"
AsyncEventsRouteName = "AsyncEvents"
SendRelayTransactionRouteName = "SendRelayTxn"
GetRelayTransactionRouteName = "GetRelayTxn"
)
// Setup registers HTTP handlers with the given ServeMux.
@ -50,8 +50,8 @@ func Setup(
) {
v1fedmux := fedMux.PathPrefix("/v1").Subrouter()
v1fedmux.Handle("/forward_async/{txnID}/{userID}", MakeRelayAPI(
"relay_forward_async", "", cfg.Matrix.IsLocalServerName, keys,
v1fedmux.Handle("/send_relay/{txnID}/{userID}", MakeRelayAPI(
"send_relay_transaction", "", cfg.Matrix.IsLocalServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
userID, err := gomatrixserverlib.NewUserID(vars["userID"], false)
if err != nil {
@ -60,15 +60,15 @@ func Setup(
JSON: jsonerror.InvalidUsername("Username was invalid"),
}
}
return ForwardAsync(
return SendTxnToRelay(
httpReq, request, relayAPI, gomatrixserverlib.TransactionID(vars["txnID"]),
*userID,
)
},
)).Methods(http.MethodPut, http.MethodOptions).Name(ForwardAsyncRouteName)
)).Methods(http.MethodPut, http.MethodOptions).Name(SendRelayTransactionRouteName)
v1fedmux.Handle("/async_events/{userID}", MakeRelayAPI(
"relay_async_events", "", cfg.Matrix.IsLocalServerName, keys,
v1fedmux.Handle("/relay_txn/{userID}", MakeRelayAPI(
"get_relay_transaction", "", cfg.Matrix.IsLocalServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
userID, err := gomatrixserverlib.NewUserID(vars["userID"], false)
if err != nil {
@ -77,9 +77,9 @@ func Setup(
JSON: jsonerror.InvalidUsername("Username was invalid"),
}
}
return GetAsyncEvents(httpReq, request, relayAPI, *userID)
return GetTxnFromRelay(httpReq, request, relayAPI, *userID)
},
)).Methods(http.MethodGet, http.MethodOptions).Name(AsyncEventsRouteName)
)).Methods(http.MethodGet, http.MethodOptions).Name(GetRelayTransactionRouteName)
}
// MakeRelayAPI makes an http.Handler that checks matrix relay authentication.

View file

@ -34,12 +34,12 @@ import (
"github.com/stretchr/testify/assert"
)
type forwardAsyncContent struct {
type sendRelayContent struct {
PDUs []json.RawMessage `json:"pdus"`
EDUs []gomatrixserverlib.EDU `json:"edus"`
}
func TestHandleForwardAsync(t *testing.T) {
func TestHandleSendRelay(t *testing.T) {
base, close := testrig.CreateBaseDendrite(t, test.DBTypeSQLite)
defer close()
@ -60,13 +60,13 @@ func TestHandleForwardAsync(t *testing.T) {
}
routing.Setup(fedMux, &cfg, r, keyRing)
handler := fedMux.Get(routing.ForwardAsyncRouteName).GetHandler().ServeHTTP
handler := fedMux.Get(routing.SendRelayTransactionRouteName).GetHandler().ServeHTTP
_, sk, _ := ed25519.GenerateKey(nil)
keyID := signing.KeyID
pk := sk.Public().(ed25519.PublicKey)
serverName := gomatrixserverlib.ServerName(hex.EncodeToString(pk))
req := gomatrixserverlib.NewFederationRequest("PUT", serverName, "remote", "/forward_async/1234/@user:local")
content := forwardAsyncContent{}
req := gomatrixserverlib.NewFederationRequest("PUT", serverName, "remote", "/send_relay/1234/@user:local")
content := sendRelayContent{}
err := req.SetContent(content)
if err != nil {
t.Fatalf("Error: %s", err.Error())
@ -85,7 +85,7 @@ func TestHandleForwardAsync(t *testing.T) {
assert.Equal(t, 200, res.StatusCode)
}
func TestHandleForwardAsyncBadUserID(t *testing.T) {
func TestHandleSendRelayBadUserID(t *testing.T) {
base, close := testrig.CreateBaseDendrite(t, test.DBTypeSQLite)
defer close()
@ -106,13 +106,13 @@ func TestHandleForwardAsyncBadUserID(t *testing.T) {
}
routing.Setup(fedMux, &cfg, r, keyRing)
handler := fedMux.Get(routing.ForwardAsyncRouteName).GetHandler().ServeHTTP
handler := fedMux.Get(routing.SendRelayTransactionRouteName).GetHandler().ServeHTTP
_, sk, _ := ed25519.GenerateKey(nil)
keyID := signing.KeyID
pk := sk.Public().(ed25519.PublicKey)
serverName := gomatrixserverlib.ServerName(hex.EncodeToString(pk))
req := gomatrixserverlib.NewFederationRequest("PUT", serverName, "remote", "/forward_async/1234/user")
content := forwardAsyncContent{}
req := gomatrixserverlib.NewFederationRequest("PUT", serverName, "remote", "/send_relay/1234/user")
content := sendRelayContent{}
err := req.SetContent(content)
if err != nil {
t.Fatalf("Error: %s", err.Error())
@ -131,7 +131,7 @@ func TestHandleForwardAsyncBadUserID(t *testing.T) {
assert.NotEqual(t, 200, res.StatusCode)
}
func TestHandleAsyncEvents(t *testing.T) {
func TestHandleRelayTxn(t *testing.T) {
base, close := testrig.CreateBaseDendrite(t, test.DBTypeSQLite)
defer close()
@ -152,12 +152,12 @@ func TestHandleAsyncEvents(t *testing.T) {
}
routing.Setup(fedMux, &cfg, r, keyRing)
handler := fedMux.Get(routing.AsyncEventsRouteName).GetHandler().ServeHTTP
handler := fedMux.Get(routing.GetRelayTransactionRouteName).GetHandler().ServeHTTP
_, sk, _ := ed25519.GenerateKey(nil)
keyID := signing.KeyID
pk := sk.Public().(ed25519.PublicKey)
serverName := gomatrixserverlib.ServerName(hex.EncodeToString(pk))
req := gomatrixserverlib.NewFederationRequest("GET", serverName, "remote", "/async_events/@user:local")
req := gomatrixserverlib.NewFederationRequest("GET", serverName, "remote", "/relay_txn/@user:local")
content := gomatrixserverlib.RelayEntry{EntryID: 0}
err := req.SetContent(content)
if err != nil {
@ -177,7 +177,7 @@ func TestHandleAsyncEvents(t *testing.T) {
assert.Equal(t, 200, res.StatusCode)
}
func TestHandleAsyncEventsBadUserID(t *testing.T) {
func TestHandleRelayTxnBadUserID(t *testing.T) {
base, close := testrig.CreateBaseDendrite(t, test.DBTypeSQLite)
defer close()
@ -198,12 +198,12 @@ func TestHandleAsyncEventsBadUserID(t *testing.T) {
}
routing.Setup(fedMux, &cfg, r, keyRing)
handler := fedMux.Get(routing.AsyncEventsRouteName).GetHandler().ServeHTTP
handler := fedMux.Get(routing.GetRelayTransactionRouteName).GetHandler().ServeHTTP
_, sk, _ := ed25519.GenerateKey(nil)
keyID := signing.KeyID
pk := sk.Public().(ed25519.PublicKey)
serverName := gomatrixserverlib.ServerName(hex.EncodeToString(pk))
req := gomatrixserverlib.NewFederationRequest("GET", serverName, "remote", "/async_events/user")
req := gomatrixserverlib.NewFederationRequest("GET", serverName, "remote", "/relay_txn/user")
content := gomatrixserverlib.RelayEntry{EntryID: 0}
err := req.SetContent(content)
if err != nil {

View file

@ -25,9 +25,9 @@ import (
"github.com/sirupsen/logrus"
)
// ForwardAsync implements /_matrix/federation/v1/forward_async/{txnID}/{userID}
// SendTxnToRelay implements PUT /_matrix/federation/v1/relay_txn/{txnID}/{userID}
// This endpoint can be extracted into a separate relay server service.
func ForwardAsync(
func SendTxnToRelay(
httpReq *http.Request,
fedReq *gomatrixserverlib.FederationRequest,
relayAPI api.RelayInternalAPI,

View file

@ -50,7 +50,7 @@ func createFederationRequest(
content interface{},
) gomatrixserverlib.FederationRequest {
var federationPathPrefixV1 = "/_matrix/federation/v1"
path := federationPathPrefixV1 + "/forward_async/" + string(txnID) + "/" + userID.Raw()
path := federationPathPrefixV1 + "/send_relay/" + string(txnID) + "/" + userID.Raw()
request := gomatrixserverlib.NewFederationRequest("PUT", origin, destination, path)
request.SetContent(content)
@ -75,7 +75,7 @@ func TestForwardEmptyReturnsOk(t *testing.T) {
&db, nil, nil, nil, nil, false, "",
)
response := routing.ForwardAsync(httpReq, &request, relayAPI, "1", *userID)
response := routing.SendTxnToRelay(httpReq, &request, relayAPI, "1", *userID)
assert.Equal(t, 200, response.Code)
}
@ -104,7 +104,7 @@ func TestForwardBadJSONReturnsError(t *testing.T) {
&db, nil, nil, nil, nil, false, "",
)
response := routing.ForwardAsync(httpReq, &request, relayAPI, "1", *userID)
response := routing.SendTxnToRelay(httpReq, &request, relayAPI, "1", *userID)
assert.NotEqual(t, 200, response.Code)
}
@ -138,7 +138,7 @@ func TestForwardTooManyPDUsReturnsError(t *testing.T) {
&db, nil, nil, nil, nil, false, "",
)
response := routing.ForwardAsync(httpReq, &request, relayAPI, "1", *userID)
response := routing.SendTxnToRelay(httpReq, &request, relayAPI, "1", *userID)
assert.NotEqual(t, 200, response.Code)
}
@ -172,7 +172,7 @@ func TestForwardTooManyEDUsReturnsError(t *testing.T) {
&db, nil, nil, nil, nil, false, "",
)
response := routing.ForwardAsync(httpReq, &request, relayAPI, "1", *userID)
response := routing.SendTxnToRelay(httpReq, &request, relayAPI, "1", *userID)
assert.NotEqual(t, 200, response.Code)
}
@ -195,7 +195,7 @@ func TestUniqueTransactionStoredInDatabase(t *testing.T) {
&db, nil, nil, nil, nil, false, "",
)
response := routing.ForwardAsync(
response := routing.SendTxnToRelay(
httpReq, &request, relayAPI, txn.TransactionID, *userID)
transaction, _, err := db.GetAsyncTransaction(context.TODO(), *userID)
assert.NoError(t, err, "Failed retrieving transaction")