Finish s&f rename refactor

This commit is contained in:
Devon Hudson 2023-01-12 08:21:26 -07:00
parent 40563741da
commit 07d2976854
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
12 changed files with 83 additions and 100 deletions

View file

@ -250,20 +250,3 @@ type P2PQueryRelayServersRequest struct {
type P2PQueryRelayServersResponse struct { type P2PQueryRelayServersResponse struct {
RelayServers []gomatrixserverlib.ServerName RelayServers []gomatrixserverlib.ServerName
} }
type P2PPerformStoreAsyncRequest struct {
Transaction gomatrixserverlib.Transaction `json:"transaction"`
UserID gomatrixserverlib.UserID `json:"user_id"`
}
type P2PPerformStoreAsyncResponse struct {
}
type P2PQueryAsyncTransactionsRequest struct {
UserID gomatrixserverlib.UserID `json:"user_id"`
}
type P2PQueryAsyncTransactionsResponse struct {
Transaction gomatrixserverlib.Transaction `json:"transaction"`
RemainingCount uint32 `json:"remaining"`
}

View file

@ -119,7 +119,7 @@ func NewInternalAPI(
stats := statistics.NewStatistics( stats := statistics.NewStatistics(
federationDB, federationDB,
cfg.FederationMaxRetries+1, cfg.FederationMaxRetries+1,
cfg.FederationRetriesUntilAssumedOffline+1) cfg.P2PFederationRetriesUntilAssumedOffline+1)
js, nats := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, nats := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)

View file

@ -371,7 +371,7 @@ func (oq *destinationQueue) backgroundSend() {
// If we have pending PDUs or EDUs then construct a transaction. // If we have pending PDUs or EDUs then construct a transaction.
// Try sending the next transaction and see what happens. // Try sending the next transaction and see what happens.
terr, asyncSuccess := oq.nextTransaction(toSendPDUs, toSendEDUs) terr, relaySuccess := oq.nextTransaction(toSendPDUs, toSendEDUs)
if terr != nil { if terr != nil {
// We failed to send the transaction. Mark it as a failure. // We failed to send the transaction. Mark it as a failure.
_, blacklisted := oq.statistics.Failure() _, blacklisted := oq.statistics.Failure()
@ -388,7 +388,7 @@ func (oq *destinationQueue) backgroundSend() {
return return
} }
} else { } else {
oq.handleTransactionSuccess(pduCount, eduCount, asyncSuccess) oq.handleTransactionSuccess(pduCount, eduCount, relaySuccess)
} }
} }
} }
@ -396,11 +396,11 @@ func (oq *destinationQueue) backgroundSend() {
// nextTransaction creates a new transaction from the pending event // nextTransaction creates a new transaction from the pending event
// queue and sends it. // queue and sends it.
// Returns an error if the transaction wasn't sent. And whether the success // Returns an error if the transaction wasn't sent. And whether the success
// was to an async relay server or not. // was to a relay server or not.
func (oq *destinationQueue) nextTransaction( func (oq *destinationQueue) nextTransaction(
pdus []*queuedPDU, pdus []*queuedPDU,
edus []*queuedEDU, edus []*queuedEDU,
) (err error, asyncSuccess bool) { ) (err error, relaySuccess bool) {
// Create the transaction. // Create the transaction.
t, pduReceipts, eduReceipts := oq.createTransaction(pdus, edus) t, pduReceipts, eduReceipts := oq.createTransaction(pdus, edus)
logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
@ -418,14 +418,14 @@ func (oq *destinationQueue) nextTransaction(
return userErr, false return userErr, false
} }
for _, relayServer := range relayServers { for _, relayServer := range relayServers {
_, asyncErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer) _, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer)
if asyncErr != nil { if relayErr != nil {
err = asyncErr err = relayErr
} else { } else {
asyncSuccess = true relaySuccess = true
} }
} }
if asyncSuccess { if relaySuccess {
err = nil err = nil
} }
} else { } else {
@ -450,7 +450,7 @@ func (oq *destinationQueue) nextTransaction(
oq.transactionIDMutex.Lock() oq.transactionIDMutex.Lock()
oq.transactionID = "" oq.transactionID = ""
oq.transactionIDMutex.Unlock() oq.transactionIDMutex.Unlock()
return nil, asyncSuccess return nil, relaySuccess
case gomatrix.HTTPError: case gomatrix.HTTPError:
// Report that we failed to send the transaction and we // Report that we failed to send the transaction and we
// will retry again, subject to backoff. // will retry again, subject to backoff.
@ -553,10 +553,10 @@ func (oq *destinationQueue) blacklistDestination() {
// handleTransactionSuccess updates the cached event queues as well as the success and // handleTransactionSuccess updates the cached event queues as well as the success and
// backoff information for this server. // backoff information for this server.
func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int, asyncSuccess bool) { func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int, relaySuccess bool) {
// If we successfully sent the transaction then clear out // If we successfully sent the transaction then clear out
// the pending events and EDUs, and wipe our transaction ID. // the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success(asyncSuccess) oq.statistics.Success(relaySuccess)
oq.pendingMutex.Lock() oq.pendingMutex.Lock()
defer oq.pendingMutex.Unlock() defer oq.pendingMutex.Unlock()

View file

@ -74,9 +74,9 @@ func (r *stubFederationRoomServerAPI) QueryServerBannedFromRoom(ctx context.Cont
type stubFederationClient struct { type stubFederationClient struct {
api.FederationClient api.FederationClient
shouldTxSucceed bool shouldTxSucceed bool
shouldTxAsyncSucceed bool shouldTxRelaySucceed bool
txCount atomic.Uint32 txCount atomic.Uint32
txAsyncCount atomic.Uint32 txRelayCount atomic.Uint32
} }
func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) { func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) {
@ -91,11 +91,11 @@ func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixse
func (f *stubFederationClient) P2PSendTransactionToRelay(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error) { func (f *stubFederationClient) P2PSendTransactionToRelay(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error) {
var result error var result error
if !f.shouldTxAsyncSucceed { if !f.shouldTxRelaySucceed {
result = fmt.Errorf("async transaction failed") result = fmt.Errorf("relay transaction failed")
} }
f.txAsyncCount.Add(1) f.txRelayCount.Add(1)
return gomatrixserverlib.EmptyResp{}, result return gomatrixserverlib.EmptyResp{}, result
} }
@ -114,14 +114,14 @@ func mustCreateEDU(t *testing.T) *gomatrixserverlib.EDU {
return &gomatrixserverlib.EDU{Type: gomatrixserverlib.MTyping} return &gomatrixserverlib.EDU{Type: gomatrixserverlib.MTyping}
} }
func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32, shouldTxSucceed bool, shouldTxAsyncSucceed bool, t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *stubFederationClient, *OutgoingQueues, *process.ProcessContext, func()) { func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32, shouldTxSucceed bool, shouldTxRelaySucceed bool, t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *stubFederationClient, *OutgoingQueues, *process.ProcessContext, func()) {
db, processContext, close := mustCreateFederationDatabase(t, dbType, realDatabase) db, processContext, close := mustCreateFederationDatabase(t, dbType, realDatabase)
fc := &stubFederationClient{ fc := &stubFederationClient{
shouldTxSucceed: shouldTxSucceed, shouldTxSucceed: shouldTxSucceed,
shouldTxAsyncSucceed: shouldTxAsyncSucceed, shouldTxRelaySucceed: shouldTxRelaySucceed,
txCount: *atomic.NewUint32(0), txCount: *atomic.NewUint32(0),
txAsyncCount: *atomic.NewUint32(0), txRelayCount: *atomic.NewUint32(0),
} }
rs := &stubFederationRoomServerAPI{} rs := &stubFederationRoomServerAPI{}
@ -903,7 +903,7 @@ func TestSendEDUMultipleFailuresAssumedOffline(t *testing.T) {
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond)) poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
} }
func TestSendPDUOnAsyncSuccessRemovedFromDB(t *testing.T) { func TestSendPDUOnRelaySuccessRemovedFromDB(t *testing.T) {
t.Parallel() t.Parallel()
failuresUntilBlacklist := uint32(16) failuresUntilBlacklist := uint32(16)
failuresUntilAssumedOffline := uint32(1) failuresUntilAssumedOffline := uint32(1)
@ -924,7 +924,7 @@ func TestSendPDUOnAsyncSuccessRemovedFromDB(t *testing.T) {
check := func(log poll.LogT) poll.Result { check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == 1 { if fc.txCount.Load() == 1 {
if fc.txAsyncCount.Load() == 1 { if fc.txRelayCount.Load() == 1 {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100) data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr) assert.NoError(t, dbErr)
if len(data) == 0 { if len(data) == 0 {
@ -932,7 +932,7 @@ func TestSendPDUOnAsyncSuccessRemovedFromDB(t *testing.T) {
} }
return poll.Continue("waiting for event to be removed from database. Currently present PDU: %d", len(data)) return poll.Continue("waiting for event to be removed from database. Currently present PDU: %d", len(data))
} }
return poll.Continue("waiting for more async send attempts before checking database. Currently %d", fc.txAsyncCount.Load()) return poll.Continue("waiting for more relay send attempts before checking database. Currently %d", fc.txRelayCount.Load())
} }
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load()) return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
} }
@ -942,7 +942,7 @@ func TestSendPDUOnAsyncSuccessRemovedFromDB(t *testing.T) {
assert.Equal(t, true, assumedOffline) assert.Equal(t, true, assumedOffline)
} }
func TestSendEDUOnAsyncSuccessRemovedFromDB(t *testing.T) { func TestSendEDUOnRelaySuccessRemovedFromDB(t *testing.T) {
t.Parallel() t.Parallel()
failuresUntilBlacklist := uint32(16) failuresUntilBlacklist := uint32(16)
failuresUntilAssumedOffline := uint32(1) failuresUntilAssumedOffline := uint32(1)
@ -963,7 +963,7 @@ func TestSendEDUOnAsyncSuccessRemovedFromDB(t *testing.T) {
check := func(log poll.LogT) poll.Result { check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == 1 { if fc.txCount.Load() == 1 {
if fc.txAsyncCount.Load() == 1 { if fc.txRelayCount.Load() == 1 {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100) data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr) assert.NoError(t, dbErr)
if len(data) == 0 { if len(data) == 0 {
@ -971,7 +971,7 @@ func TestSendEDUOnAsyncSuccessRemovedFromDB(t *testing.T) {
} }
return poll.Continue("waiting for event to be removed from database. Currently present EDU: %d", len(data)) return poll.Continue("waiting for event to be removed from database. Currently present EDU: %d", len(data))
} }
return poll.Continue("waiting for more async send attempts before checking database. Currently %d", fc.txAsyncCount.Load()) return poll.Continue("waiting for more relay send attempts before checking database. Currently %d", fc.txRelayCount.Load())
} }
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load()) return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
} }

View file

@ -52,19 +52,19 @@ func (r *RelayInternalAPI) PerformRelayServerSync(
return nil return nil
} }
// PerformStoreAsync implements api.RelayInternalAPI // PerformStoreTransaction implements api.RelayInternalAPI
func (r *RelayInternalAPI) PerformStoreTransaction( func (r *RelayInternalAPI) PerformStoreTransaction(
ctx context.Context, ctx context.Context,
request *api.PerformStoreTransactionRequest, request *api.PerformStoreTransactionRequest,
response *api.PerformStoreTransactionResponse, response *api.PerformStoreTransactionResponse,
) error { ) error {
logrus.Warnf("Storing transaction for %v", request.UserID) logrus.Warnf("Storing transaction for %v", request.UserID)
receipt, err := r.db.StoreAsyncTransaction(ctx, request.Txn) receipt, err := r.db.StoreTransaction(ctx, request.Txn)
if err != nil { if err != nil {
logrus.Errorf("db.StoreAsyncTransaction: %s", err.Error()) logrus.Errorf("db.StoreTransaction: %s", err.Error())
return err return err
} }
err = r.db.AssociateAsyncTransactionWithDestinations( err = r.db.AssociateTransactionWithDestinations(
ctx, ctx,
map[gomatrixserverlib.UserID]struct{}{ map[gomatrixserverlib.UserID]struct{}{
request.UserID: {}, request.UserID: {},
@ -75,7 +75,7 @@ func (r *RelayInternalAPI) PerformStoreTransaction(
return err return err
} }
// QueryAsyncTransactions implements api.RelayInternalAPI // QueryTransactions implements api.RelayInternalAPI
func (r *RelayInternalAPI) QueryTransactions( func (r *RelayInternalAPI) QueryTransactions(
ctx context.Context, ctx context.Context,
request *api.QueryRelayTransactionsRequest, request *api.QueryRelayTransactionsRequest,
@ -88,16 +88,16 @@ func (r *RelayInternalAPI) QueryTransactions(
request.UserID.Raw(), request.UserID.Raw(),
) )
prevReceipt := shared.NewReceipt(request.PreviousEntry.EntryID) prevReceipt := shared.NewReceipt(request.PreviousEntry.EntryID)
err := r.db.CleanAsyncTransactions(ctx, request.UserID, []*shared.Receipt{&prevReceipt}) err := r.db.CleanTransactions(ctx, request.UserID, []*shared.Receipt{&prevReceipt})
if err != nil { if err != nil {
logrus.Errorf("db.CleanAsyncTransactions: %s", err.Error()) logrus.Errorf("db.CleanTransactions: %s", err.Error())
return err return err
} }
} }
transaction, receipt, err := r.db.GetAsyncTransaction(ctx, request.UserID) transaction, receipt, err := r.db.GetTransaction(ctx, request.UserID)
if err != nil { if err != nil {
logrus.Errorf("db.GetAsyncTransaction: %s", err.Error()) logrus.Errorf("db.GetTransaction: %s", err.Error())
return err return err
} }

View file

@ -30,9 +30,9 @@ type RelayTransactionResponse struct {
EntriesQueued bool `json:"entries_queued"` EntriesQueued bool `json:"entries_queued"`
} }
// GetTxnFromRelay implements /_matrix/federation/v1/relay_txn/{userID} // GetTransactionFromRelay implements /_matrix/federation/v1/relay_txn/{userID}
// This endpoint can be extracted into a separate relay server service. // This endpoint can be extracted into a separate relay server service.
func GetTxnFromRelay( func GetTransactionFromRelay(
httpReq *http.Request, httpReq *http.Request,
fedReq *gomatrixserverlib.FederationRequest, fedReq *gomatrixserverlib.FederationRequest,
relayAPI api.RelayInternalAPI, relayAPI api.RelayInternalAPI,

View file

@ -28,7 +28,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func createAsyncQuery( func createQuery(
userID gomatrixserverlib.UserID, userID gomatrixserverlib.UserID,
prevEntry gomatrixserverlib.RelayEntry, prevEntry gomatrixserverlib.RelayEntry,
) gomatrixserverlib.FederationRequest { ) gomatrixserverlib.FederationRequest {
@ -40,7 +40,7 @@ func createAsyncQuery(
return request return request
} }
func TestGetAsyncEmptyDatabaseReturnsNothing(t *testing.T) { func TestGetEmptyDatabaseReturnsNothing(t *testing.T) {
testDB := storage.NewFakeRelayDatabase() testDB := storage.NewFakeRelayDatabase()
db := shared.Database{ db := shared.Database{
Writer: sqlutil.NewDummyWriter(), Writer: sqlutil.NewDummyWriter(),
@ -53,27 +53,27 @@ func TestGetAsyncEmptyDatabaseReturnsNothing(t *testing.T) {
transaction := createTransaction() transaction := createTransaction()
_, err = db.StoreAsyncTransaction(context.Background(), transaction) _, err = db.StoreTransaction(context.Background(), transaction)
assert.NoError(t, err, "Failed to store transaction") assert.NoError(t, err, "Failed to store transaction")
relayAPI := internal.NewRelayInternalAPI( relayAPI := internal.NewRelayInternalAPI(
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "",
) )
request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) request := createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1})
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) response := routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code) assert.Equal(t, http.StatusOK, response.Code)
jsonResponse := response.JSON.(routing.RelayTransactionResponse) jsonResponse := response.JSON.(routing.RelayTransactionResponse)
assert.Equal(t, false, jsonResponse.EntriesQueued) assert.Equal(t, false, jsonResponse.EntriesQueued)
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction) assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction)
count, err := db.GetAsyncTransactionCount(context.Background(), *userID) count, err := db.GetTransactionCount(context.Background(), *userID)
assert.NoError(t, err) assert.NoError(t, err)
assert.Zero(t, count) assert.Zero(t, count)
} }
func TestGetAsyncReturnsSavedTransaction(t *testing.T) { func TestGetReturnsSavedTransaction(t *testing.T) {
testDB := storage.NewFakeRelayDatabase() testDB := storage.NewFakeRelayDatabase()
db := shared.Database{ db := shared.Database{
Writer: sqlutil.NewDummyWriter(), Writer: sqlutil.NewDummyWriter(),
@ -85,10 +85,10 @@ func TestGetAsyncReturnsSavedTransaction(t *testing.T) {
assert.NoError(t, err, "Invalid userID") assert.NoError(t, err, "Invalid userID")
transaction := createTransaction() transaction := createTransaction()
receipt, err := db.StoreAsyncTransaction(context.Background(), transaction) receipt, err := db.StoreTransaction(context.Background(), transaction)
assert.NoError(t, err, "Failed to store transaction") assert.NoError(t, err, "Failed to store transaction")
err = db.AssociateAsyncTransactionWithDestinations( err = db.AssociateTransactionWithDestinations(
context.Background(), context.Background(),
map[gomatrixserverlib.UserID]struct{}{ map[gomatrixserverlib.UserID]struct{}{
*userID: {}, *userID: {},
@ -101,8 +101,8 @@ func TestGetAsyncReturnsSavedTransaction(t *testing.T) {
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "",
) )
request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) request := createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1})
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) response := routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code) assert.Equal(t, http.StatusOK, response.Code)
jsonResponse := response.JSON.(routing.RelayTransactionResponse) jsonResponse := response.JSON.(routing.RelayTransactionResponse)
@ -110,20 +110,20 @@ func TestGetAsyncReturnsSavedTransaction(t *testing.T) {
assert.Equal(t, transaction, jsonResponse.Transaction) assert.Equal(t, transaction, jsonResponse.Transaction)
// And once more to clear the queue // And once more to clear the queue
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) request = createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) response = routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code) assert.Equal(t, http.StatusOK, response.Code)
jsonResponse = response.JSON.(routing.RelayTransactionResponse) jsonResponse = response.JSON.(routing.RelayTransactionResponse)
assert.False(t, jsonResponse.EntriesQueued) assert.False(t, jsonResponse.EntriesQueued)
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction) assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction)
count, err := db.GetAsyncTransactionCount(context.Background(), *userID) count, err := db.GetTransactionCount(context.Background(), *userID)
assert.NoError(t, err) assert.NoError(t, err)
assert.Zero(t, count) assert.Zero(t, count)
} }
func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) { func TestGetReturnsMultipleSavedTransactions(t *testing.T) {
testDB := storage.NewFakeRelayDatabase() testDB := storage.NewFakeRelayDatabase()
db := shared.Database{ db := shared.Database{
Writer: sqlutil.NewDummyWriter(), Writer: sqlutil.NewDummyWriter(),
@ -135,10 +135,10 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) {
assert.NoError(t, err, "Invalid userID") assert.NoError(t, err, "Invalid userID")
transaction := createTransaction() transaction := createTransaction()
receipt, err := db.StoreAsyncTransaction(context.Background(), transaction) receipt, err := db.StoreTransaction(context.Background(), transaction)
assert.NoError(t, err, "Failed to store transaction") assert.NoError(t, err, "Failed to store transaction")
err = db.AssociateAsyncTransactionWithDestinations( err = db.AssociateTransactionWithDestinations(
context.Background(), context.Background(),
map[gomatrixserverlib.UserID]struct{}{ map[gomatrixserverlib.UserID]struct{}{
*userID: {}, *userID: {},
@ -148,10 +148,10 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) {
assert.NoError(t, err, "Failed to associate transaction with user") assert.NoError(t, err, "Failed to associate transaction with user")
transaction2 := createTransaction() transaction2 := createTransaction()
receipt2, err := db.StoreAsyncTransaction(context.Background(), transaction2) receipt2, err := db.StoreTransaction(context.Background(), transaction2)
assert.NoError(t, err, "Failed to store transaction") assert.NoError(t, err, "Failed to store transaction")
err = db.AssociateAsyncTransactionWithDestinations( err = db.AssociateTransactionWithDestinations(
context.Background(), context.Background(),
map[gomatrixserverlib.UserID]struct{}{ map[gomatrixserverlib.UserID]struct{}{
*userID: {}, *userID: {},
@ -164,16 +164,16 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) {
&db, nil, nil, nil, nil, false, "", &db, nil, nil, nil, nil, false, "",
) )
request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) request := createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1})
response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) response := routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code) assert.Equal(t, http.StatusOK, response.Code)
jsonResponse := response.JSON.(routing.RelayTransactionResponse) jsonResponse := response.JSON.(routing.RelayTransactionResponse)
assert.True(t, jsonResponse.EntriesQueued) assert.True(t, jsonResponse.EntriesQueued)
assert.Equal(t, transaction, jsonResponse.Transaction) assert.Equal(t, transaction, jsonResponse.Transaction)
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) request = createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) response = routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code) assert.Equal(t, http.StatusOK, response.Code)
jsonResponse = response.JSON.(routing.RelayTransactionResponse) jsonResponse = response.JSON.(routing.RelayTransactionResponse)
@ -181,15 +181,15 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) {
assert.Equal(t, transaction2, jsonResponse.Transaction) assert.Equal(t, transaction2, jsonResponse.Transaction)
// And once more to clear the queue // And once more to clear the queue
request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) request = createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID})
response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) response = routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID)
assert.Equal(t, http.StatusOK, response.Code) assert.Equal(t, http.StatusOK, response.Code)
jsonResponse = response.JSON.(routing.RelayTransactionResponse) jsonResponse = response.JSON.(routing.RelayTransactionResponse)
assert.False(t, jsonResponse.EntriesQueued) assert.False(t, jsonResponse.EntriesQueued)
assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction) assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction)
count, err := db.GetAsyncTransactionCount(context.Background(), *userID) count, err := db.GetTransactionCount(context.Background(), *userID)
assert.NoError(t, err) assert.NoError(t, err)
assert.Zero(t, count) assert.Zero(t, count)
} }

View file

@ -77,7 +77,7 @@ func Setup(
JSON: jsonerror.InvalidUsername("Username was invalid"), JSON: jsonerror.InvalidUsername("Username was invalid"),
} }
} }
return GetTxnFromRelay(httpReq, request, relayAPI, *userID) return GetTransactionFromRelay(httpReq, request, relayAPI, *userID)
}, },
)).Methods(http.MethodGet, http.MethodOptions).Name(GetRelayTransactionRouteName) )).Methods(http.MethodGet, http.MethodOptions).Name(GetRelayTransactionRouteName)
} }

View file

@ -197,10 +197,10 @@ func TestUniqueTransactionStoredInDatabase(t *testing.T) {
response := routing.SendTransactionToRelay( response := routing.SendTransactionToRelay(
httpReq, &request, relayAPI, txn.TransactionID, *userID) httpReq, &request, relayAPI, txn.TransactionID, *userID)
transaction, _, err := db.GetAsyncTransaction(context.TODO(), *userID) transaction, _, err := db.GetTransaction(context.TODO(), *userID)
assert.NoError(t, err, "Failed retrieving transaction") assert.NoError(t, err, "Failed retrieving transaction")
transactionCount, err := db.GetAsyncTransactionCount(context.TODO(), *userID) transactionCount, err := db.GetTransactionCount(context.TODO(), *userID)
assert.NoError(t, err, "Failed retrieving transaction count") assert.NoError(t, err, "Failed retrieving transaction count")
assert.Equal(t, 200, response.Code) assert.Equal(t, 200, response.Code)

View file

@ -22,9 +22,9 @@ import (
) )
type Database interface { type Database interface {
StoreAsyncTransaction(ctx context.Context, txn gomatrixserverlib.Transaction) (*shared.Receipt, error) StoreTransaction(ctx context.Context, txn gomatrixserverlib.Transaction) (*shared.Receipt, error)
AssociateAsyncTransactionWithDestinations(ctx context.Context, destinations map[gomatrixserverlib.UserID]struct{}, transactionID gomatrixserverlib.TransactionID, receipt *shared.Receipt) error AssociateTransactionWithDestinations(ctx context.Context, destinations map[gomatrixserverlib.UserID]struct{}, transactionID gomatrixserverlib.TransactionID, receipt *shared.Receipt) error
CleanAsyncTransactions(ctx context.Context, userID gomatrixserverlib.UserID, receipts []*shared.Receipt) error CleanTransactions(ctx context.Context, userID gomatrixserverlib.UserID, receipts []*shared.Receipt) error
GetAsyncTransaction(ctx context.Context, userID gomatrixserverlib.UserID) (*gomatrixserverlib.Transaction, *shared.Receipt, error) GetTransaction(ctx context.Context, userID gomatrixserverlib.UserID) (*gomatrixserverlib.Transaction, *shared.Receipt, error)
GetAsyncTransactionCount(ctx context.Context, userID gomatrixserverlib.UserID) (int64, error) GetTransactionCount(ctx context.Context, userID gomatrixserverlib.UserID) (int64, error)
} }

View file

@ -36,7 +36,7 @@ type Database struct {
RelayQueueJSON tables.RelayQueueJSON RelayQueueJSON tables.RelayQueueJSON
} }
func (d *Database) StoreAsyncTransaction( func (d *Database) StoreTransaction(
ctx context.Context, txn gomatrixserverlib.Transaction, ctx context.Context, txn gomatrixserverlib.Transaction,
) (*shared.Receipt, error) { ) (*shared.Receipt, error) {
var err error var err error
@ -58,7 +58,7 @@ func (d *Database) StoreAsyncTransaction(
return &receipt, nil return &receipt, nil
} }
func (d *Database) AssociateAsyncTransactionWithDestinations( func (d *Database) AssociateTransactionWithDestinations(
ctx context.Context, ctx context.Context,
destinations map[gomatrixserverlib.UserID]struct{}, destinations map[gomatrixserverlib.UserID]struct{},
transactionID gomatrixserverlib.TransactionID, transactionID gomatrixserverlib.TransactionID,
@ -78,7 +78,7 @@ func (d *Database) AssociateAsyncTransactionWithDestinations(
return nil return nil
} }
func (d *Database) CleanAsyncTransactions( func (d *Database) CleanTransactions(
ctx context.Context, ctx context.Context,
userID gomatrixserverlib.UserID, userID gomatrixserverlib.UserID,
receipts []*shared.Receipt, receipts []*shared.Receipt,
@ -108,7 +108,7 @@ func (d *Database) CleanAsyncTransactions(
return nil return nil
} }
func (d *Database) GetAsyncTransaction( func (d *Database) GetTransaction(
ctx context.Context, ctx context.Context,
userID gomatrixserverlib.UserID, userID gomatrixserverlib.UserID,
) (*gomatrixserverlib.Transaction, *shared.Receipt, error) { ) (*gomatrixserverlib.Transaction, *shared.Receipt, error) {
@ -139,7 +139,7 @@ func (d *Database) GetAsyncTransaction(
return transaction, &receipt, nil return transaction, &receipt, nil
} }
func (d *Database) GetAsyncTransactionCount( func (d *Database) GetTransactionCount(
ctx context.Context, ctx context.Context,
userID gomatrixserverlib.UserID, userID gomatrixserverlib.UserID,
) (int64, error) { ) (int64, error) {

View file

@ -18,11 +18,11 @@ type FederationAPI struct {
// The default value is 16 if not specified, which is circa 18 hours. // The default value is 16 if not specified, which is circa 18 hours.
FederationMaxRetries uint32 `yaml:"send_max_retries"` FederationMaxRetries uint32 `yaml:"send_max_retries"`
// How many consecutive failures that we should tolerate when sending federation // P2P Feature: How many consecutive failures that we should tolerate when
// requests to a specific server until we should assume they are offline. If we // sending federation requests to a specific server until we should assume they
// assume they are offline then we will attempt to send messages to their async // are offline. If we assume they are offline then we will attempt to send
// relay server if we know of one that is appropriate. // messages to their relay server if we know of one that is appropriate.
FederationRetriesUntilAssumedOffline uint32 `yaml:"retries_until_assumed_offline"` P2PFederationRetriesUntilAssumedOffline uint32 `yaml:"p2p_retries_until_assumed_offline"`
// FederationDisableTLSValidation disables the validation of X.509 TLS certs // FederationDisableTLSValidation disables the validation of X.509 TLS certs
// on remote federation endpoints. This is not recommended in production! // on remote federation endpoints. This is not recommended in production!
@ -49,7 +49,7 @@ func (c *FederationAPI) Defaults(opts DefaultOpts) {
c.Database.Defaults(10) c.Database.Defaults(10)
} }
c.FederationMaxRetries = 16 c.FederationMaxRetries = 16
c.FederationRetriesUntilAssumedOffline = 2 c.P2PFederationRetriesUntilAssumedOffline = 2
c.DisableTLSValidation = false c.DisableTLSValidation = false
c.DisableHTTPKeepalives = false c.DisableHTTPKeepalives = false
if opts.Generate { if opts.Generate {