From 07d29768542c7d67e9e57efa0d1bb5442ed323b0 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 12 Jan 2023 08:21:26 -0700 Subject: [PATCH] Finish s&f rename refactor --- federationapi/api/api.go | 17 -------- federationapi/federationapi.go | 2 +- federationapi/queue/destinationqueue.go | 24 ++++++------ federationapi/queue/queue_test.go | 28 ++++++------- relayapi/internal/perform.go | 18 ++++----- relayapi/routing/relaytxn.go | 4 +- relayapi/routing/relaytxn_test.go | 52 ++++++++++++------------- relayapi/routing/routing.go | 2 +- relayapi/routing/sendrelay_test.go | 4 +- relayapi/storage/interface.go | 10 ++--- relayapi/storage/shared/storage.go | 10 ++--- setup/config/config_federationapi.go | 12 +++--- 12 files changed, 83 insertions(+), 100 deletions(-) diff --git a/federationapi/api/api.go b/federationapi/api/api.go index 6c1e9812c..dc465a1cf 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -250,20 +250,3 @@ type P2PQueryRelayServersRequest struct { type P2PQueryRelayServersResponse struct { RelayServers []gomatrixserverlib.ServerName } - -type P2PPerformStoreAsyncRequest struct { - Transaction gomatrixserverlib.Transaction `json:"transaction"` - UserID gomatrixserverlib.UserID `json:"user_id"` -} - -type P2PPerformStoreAsyncResponse struct { -} - -type P2PQueryAsyncTransactionsRequest struct { - UserID gomatrixserverlib.UserID `json:"user_id"` -} - -type P2PQueryAsyncTransactionsResponse struct { - Transaction gomatrixserverlib.Transaction `json:"transaction"` - RemainingCount uint32 `json:"remaining"` -} diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index f40a8b76a..d0a59d55d 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -119,7 +119,7 @@ func NewInternalAPI( stats := statistics.NewStatistics( federationDB, cfg.FederationMaxRetries+1, - cfg.FederationRetriesUntilAssumedOffline+1) + cfg.P2PFederationRetriesUntilAssumedOffline+1) js, nats := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index 2bd60d4f2..c4f938577 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -371,7 +371,7 @@ func (oq *destinationQueue) backgroundSend() { // If we have pending PDUs or EDUs then construct a transaction. // Try sending the next transaction and see what happens. - terr, asyncSuccess := oq.nextTransaction(toSendPDUs, toSendEDUs) + terr, relaySuccess := oq.nextTransaction(toSendPDUs, toSendEDUs) if terr != nil { // We failed to send the transaction. Mark it as a failure. _, blacklisted := oq.statistics.Failure() @@ -388,7 +388,7 @@ func (oq *destinationQueue) backgroundSend() { return } } else { - oq.handleTransactionSuccess(pduCount, eduCount, asyncSuccess) + oq.handleTransactionSuccess(pduCount, eduCount, relaySuccess) } } } @@ -396,11 +396,11 @@ func (oq *destinationQueue) backgroundSend() { // nextTransaction creates a new transaction from the pending event // queue and sends it. // Returns an error if the transaction wasn't sent. And whether the success -// was to an async relay server or not. +// was to a relay server or not. func (oq *destinationQueue) nextTransaction( pdus []*queuedPDU, edus []*queuedEDU, -) (err error, asyncSuccess bool) { +) (err error, relaySuccess bool) { // Create the transaction. t, pduReceipts, eduReceipts := oq.createTransaction(pdus, edus) logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) @@ -418,14 +418,14 @@ func (oq *destinationQueue) nextTransaction( return userErr, false } for _, relayServer := range relayServers { - _, asyncErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer) - if asyncErr != nil { - err = asyncErr + _, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer) + if relayErr != nil { + err = relayErr } else { - asyncSuccess = true + relaySuccess = true } } - if asyncSuccess { + if relaySuccess { err = nil } } else { @@ -450,7 +450,7 @@ func (oq *destinationQueue) nextTransaction( oq.transactionIDMutex.Lock() oq.transactionID = "" oq.transactionIDMutex.Unlock() - return nil, asyncSuccess + return nil, relaySuccess case gomatrix.HTTPError: // Report that we failed to send the transaction and we // will retry again, subject to backoff. @@ -553,10 +553,10 @@ func (oq *destinationQueue) blacklistDestination() { // handleTransactionSuccess updates the cached event queues as well as the success and // backoff information for this server. -func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int, asyncSuccess bool) { +func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int, relaySuccess bool) { // If we successfully sent the transaction then clear out // the pending events and EDUs, and wipe our transaction ID. - oq.statistics.Success(asyncSuccess) + oq.statistics.Success(relaySuccess) oq.pendingMutex.Lock() defer oq.pendingMutex.Unlock() diff --git a/federationapi/queue/queue_test.go b/federationapi/queue/queue_test.go index 0fc5737c8..d7a87cb42 100644 --- a/federationapi/queue/queue_test.go +++ b/federationapi/queue/queue_test.go @@ -74,9 +74,9 @@ func (r *stubFederationRoomServerAPI) QueryServerBannedFromRoom(ctx context.Cont type stubFederationClient struct { api.FederationClient shouldTxSucceed bool - shouldTxAsyncSucceed bool + shouldTxRelaySucceed bool txCount atomic.Uint32 - txAsyncCount atomic.Uint32 + txRelayCount atomic.Uint32 } func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) { @@ -91,11 +91,11 @@ func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixse func (f *stubFederationClient) P2PSendTransactionToRelay(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error) { var result error - if !f.shouldTxAsyncSucceed { - result = fmt.Errorf("async transaction failed") + if !f.shouldTxRelaySucceed { + result = fmt.Errorf("relay transaction failed") } - f.txAsyncCount.Add(1) + f.txRelayCount.Add(1) return gomatrixserverlib.EmptyResp{}, result } @@ -114,14 +114,14 @@ func mustCreateEDU(t *testing.T) *gomatrixserverlib.EDU { return &gomatrixserverlib.EDU{Type: gomatrixserverlib.MTyping} } -func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32, shouldTxSucceed bool, shouldTxAsyncSucceed bool, t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *stubFederationClient, *OutgoingQueues, *process.ProcessContext, func()) { +func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32, shouldTxSucceed bool, shouldTxRelaySucceed bool, t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *stubFederationClient, *OutgoingQueues, *process.ProcessContext, func()) { db, processContext, close := mustCreateFederationDatabase(t, dbType, realDatabase) fc := &stubFederationClient{ shouldTxSucceed: shouldTxSucceed, - shouldTxAsyncSucceed: shouldTxAsyncSucceed, + shouldTxRelaySucceed: shouldTxRelaySucceed, txCount: *atomic.NewUint32(0), - txAsyncCount: *atomic.NewUint32(0), + txRelayCount: *atomic.NewUint32(0), } rs := &stubFederationRoomServerAPI{} @@ -903,7 +903,7 @@ func TestSendEDUMultipleFailuresAssumedOffline(t *testing.T) { poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond)) } -func TestSendPDUOnAsyncSuccessRemovedFromDB(t *testing.T) { +func TestSendPDUOnRelaySuccessRemovedFromDB(t *testing.T) { t.Parallel() failuresUntilBlacklist := uint32(16) failuresUntilAssumedOffline := uint32(1) @@ -924,7 +924,7 @@ func TestSendPDUOnAsyncSuccessRemovedFromDB(t *testing.T) { check := func(log poll.LogT) poll.Result { if fc.txCount.Load() == 1 { - if fc.txAsyncCount.Load() == 1 { + if fc.txRelayCount.Load() == 1 { data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100) assert.NoError(t, dbErr) if len(data) == 0 { @@ -932,7 +932,7 @@ func TestSendPDUOnAsyncSuccessRemovedFromDB(t *testing.T) { } return poll.Continue("waiting for event to be removed from database. Currently present PDU: %d", len(data)) } - return poll.Continue("waiting for more async send attempts before checking database. Currently %d", fc.txAsyncCount.Load()) + return poll.Continue("waiting for more relay send attempts before checking database. Currently %d", fc.txRelayCount.Load()) } return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load()) } @@ -942,7 +942,7 @@ func TestSendPDUOnAsyncSuccessRemovedFromDB(t *testing.T) { assert.Equal(t, true, assumedOffline) } -func TestSendEDUOnAsyncSuccessRemovedFromDB(t *testing.T) { +func TestSendEDUOnRelaySuccessRemovedFromDB(t *testing.T) { t.Parallel() failuresUntilBlacklist := uint32(16) failuresUntilAssumedOffline := uint32(1) @@ -963,7 +963,7 @@ func TestSendEDUOnAsyncSuccessRemovedFromDB(t *testing.T) { check := func(log poll.LogT) poll.Result { if fc.txCount.Load() == 1 { - if fc.txAsyncCount.Load() == 1 { + if fc.txRelayCount.Load() == 1 { data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100) assert.NoError(t, dbErr) if len(data) == 0 { @@ -971,7 +971,7 @@ func TestSendEDUOnAsyncSuccessRemovedFromDB(t *testing.T) { } return poll.Continue("waiting for event to be removed from database. Currently present EDU: %d", len(data)) } - return poll.Continue("waiting for more async send attempts before checking database. Currently %d", fc.txAsyncCount.Load()) + return poll.Continue("waiting for more relay send attempts before checking database. Currently %d", fc.txRelayCount.Load()) } return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load()) } diff --git a/relayapi/internal/perform.go b/relayapi/internal/perform.go index d214f0827..2e99e0da1 100644 --- a/relayapi/internal/perform.go +++ b/relayapi/internal/perform.go @@ -52,19 +52,19 @@ func (r *RelayInternalAPI) PerformRelayServerSync( return nil } -// PerformStoreAsync implements api.RelayInternalAPI +// PerformStoreTransaction implements api.RelayInternalAPI func (r *RelayInternalAPI) PerformStoreTransaction( ctx context.Context, request *api.PerformStoreTransactionRequest, response *api.PerformStoreTransactionResponse, ) error { logrus.Warnf("Storing transaction for %v", request.UserID) - receipt, err := r.db.StoreAsyncTransaction(ctx, request.Txn) + receipt, err := r.db.StoreTransaction(ctx, request.Txn) if err != nil { - logrus.Errorf("db.StoreAsyncTransaction: %s", err.Error()) + logrus.Errorf("db.StoreTransaction: %s", err.Error()) return err } - err = r.db.AssociateAsyncTransactionWithDestinations( + err = r.db.AssociateTransactionWithDestinations( ctx, map[gomatrixserverlib.UserID]struct{}{ request.UserID: {}, @@ -75,7 +75,7 @@ func (r *RelayInternalAPI) PerformStoreTransaction( return err } -// QueryAsyncTransactions implements api.RelayInternalAPI +// QueryTransactions implements api.RelayInternalAPI func (r *RelayInternalAPI) QueryTransactions( ctx context.Context, request *api.QueryRelayTransactionsRequest, @@ -88,16 +88,16 @@ func (r *RelayInternalAPI) QueryTransactions( request.UserID.Raw(), ) prevReceipt := shared.NewReceipt(request.PreviousEntry.EntryID) - err := r.db.CleanAsyncTransactions(ctx, request.UserID, []*shared.Receipt{&prevReceipt}) + err := r.db.CleanTransactions(ctx, request.UserID, []*shared.Receipt{&prevReceipt}) if err != nil { - logrus.Errorf("db.CleanAsyncTransactions: %s", err.Error()) + logrus.Errorf("db.CleanTransactions: %s", err.Error()) return err } } - transaction, receipt, err := r.db.GetAsyncTransaction(ctx, request.UserID) + transaction, receipt, err := r.db.GetTransaction(ctx, request.UserID) if err != nil { - logrus.Errorf("db.GetAsyncTransaction: %s", err.Error()) + logrus.Errorf("db.GetTransaction: %s", err.Error()) return err } diff --git a/relayapi/routing/relaytxn.go b/relayapi/routing/relaytxn.go index 75bb2d272..bf8afbf4e 100644 --- a/relayapi/routing/relaytxn.go +++ b/relayapi/routing/relaytxn.go @@ -30,9 +30,9 @@ type RelayTransactionResponse struct { EntriesQueued bool `json:"entries_queued"` } -// GetTxnFromRelay implements /_matrix/federation/v1/relay_txn/{userID} +// GetTransactionFromRelay implements /_matrix/federation/v1/relay_txn/{userID} // This endpoint can be extracted into a separate relay server service. -func GetTxnFromRelay( +func GetTransactionFromRelay( httpReq *http.Request, fedReq *gomatrixserverlib.FederationRequest, relayAPI api.RelayInternalAPI, diff --git a/relayapi/routing/relaytxn_test.go b/relayapi/routing/relaytxn_test.go index 4d154a08f..c2a95e517 100644 --- a/relayapi/routing/relaytxn_test.go +++ b/relayapi/routing/relaytxn_test.go @@ -28,7 +28,7 @@ import ( "github.com/stretchr/testify/assert" ) -func createAsyncQuery( +func createQuery( userID gomatrixserverlib.UserID, prevEntry gomatrixserverlib.RelayEntry, ) gomatrixserverlib.FederationRequest { @@ -40,7 +40,7 @@ func createAsyncQuery( return request } -func TestGetAsyncEmptyDatabaseReturnsNothing(t *testing.T) { +func TestGetEmptyDatabaseReturnsNothing(t *testing.T) { testDB := storage.NewFakeRelayDatabase() db := shared.Database{ Writer: sqlutil.NewDummyWriter(), @@ -53,27 +53,27 @@ func TestGetAsyncEmptyDatabaseReturnsNothing(t *testing.T) { transaction := createTransaction() - _, err = db.StoreAsyncTransaction(context.Background(), transaction) + _, err = db.StoreTransaction(context.Background(), transaction) assert.NoError(t, err, "Failed to store transaction") relayAPI := internal.NewRelayInternalAPI( &db, nil, nil, nil, nil, false, "", ) - request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) - response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) + request := createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) + response := routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse := response.JSON.(routing.RelayTransactionResponse) assert.Equal(t, false, jsonResponse.EntriesQueued) assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction) - count, err := db.GetAsyncTransactionCount(context.Background(), *userID) + count, err := db.GetTransactionCount(context.Background(), *userID) assert.NoError(t, err) assert.Zero(t, count) } -func TestGetAsyncReturnsSavedTransaction(t *testing.T) { +func TestGetReturnsSavedTransaction(t *testing.T) { testDB := storage.NewFakeRelayDatabase() db := shared.Database{ Writer: sqlutil.NewDummyWriter(), @@ -85,10 +85,10 @@ func TestGetAsyncReturnsSavedTransaction(t *testing.T) { assert.NoError(t, err, "Invalid userID") transaction := createTransaction() - receipt, err := db.StoreAsyncTransaction(context.Background(), transaction) + receipt, err := db.StoreTransaction(context.Background(), transaction) assert.NoError(t, err, "Failed to store transaction") - err = db.AssociateAsyncTransactionWithDestinations( + err = db.AssociateTransactionWithDestinations( context.Background(), map[gomatrixserverlib.UserID]struct{}{ *userID: {}, @@ -101,8 +101,8 @@ func TestGetAsyncReturnsSavedTransaction(t *testing.T) { &db, nil, nil, nil, nil, false, "", ) - request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) - response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) + request := createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) + response := routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse := response.JSON.(routing.RelayTransactionResponse) @@ -110,20 +110,20 @@ func TestGetAsyncReturnsSavedTransaction(t *testing.T) { assert.Equal(t, transaction, jsonResponse.Transaction) // And once more to clear the queue - request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) - response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) + request = createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) + response = routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse = response.JSON.(routing.RelayTransactionResponse) assert.False(t, jsonResponse.EntriesQueued) assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction) - count, err := db.GetAsyncTransactionCount(context.Background(), *userID) + count, err := db.GetTransactionCount(context.Background(), *userID) assert.NoError(t, err) assert.Zero(t, count) } -func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) { +func TestGetReturnsMultipleSavedTransactions(t *testing.T) { testDB := storage.NewFakeRelayDatabase() db := shared.Database{ Writer: sqlutil.NewDummyWriter(), @@ -135,10 +135,10 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) { assert.NoError(t, err, "Invalid userID") transaction := createTransaction() - receipt, err := db.StoreAsyncTransaction(context.Background(), transaction) + receipt, err := db.StoreTransaction(context.Background(), transaction) assert.NoError(t, err, "Failed to store transaction") - err = db.AssociateAsyncTransactionWithDestinations( + err = db.AssociateTransactionWithDestinations( context.Background(), map[gomatrixserverlib.UserID]struct{}{ *userID: {}, @@ -148,10 +148,10 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) { assert.NoError(t, err, "Failed to associate transaction with user") transaction2 := createTransaction() - receipt2, err := db.StoreAsyncTransaction(context.Background(), transaction2) + receipt2, err := db.StoreTransaction(context.Background(), transaction2) assert.NoError(t, err, "Failed to store transaction") - err = db.AssociateAsyncTransactionWithDestinations( + err = db.AssociateTransactionWithDestinations( context.Background(), map[gomatrixserverlib.UserID]struct{}{ *userID: {}, @@ -164,16 +164,16 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) { &db, nil, nil, nil, nil, false, "", ) - request := createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) - response := routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) + request := createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1}) + response := routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse := response.JSON.(routing.RelayTransactionResponse) assert.True(t, jsonResponse.EntriesQueued) assert.Equal(t, transaction, jsonResponse.Transaction) - request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) - response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) + request = createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) + response = routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse = response.JSON.(routing.RelayTransactionResponse) @@ -181,15 +181,15 @@ func TestGetAsyncReturnsMultipleSavedTransactions(t *testing.T) { assert.Equal(t, transaction2, jsonResponse.Transaction) // And once more to clear the queue - request = createAsyncQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) - response = routing.GetTxnFromRelay(httpReq, &request, relayAPI, *userID) + request = createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: jsonResponse.EntryID}) + response = routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) jsonResponse = response.JSON.(routing.RelayTransactionResponse) assert.False(t, jsonResponse.EntriesQueued) assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction) - count, err := db.GetAsyncTransactionCount(context.Background(), *userID) + count, err := db.GetTransactionCount(context.Background(), *userID) assert.NoError(t, err) assert.Zero(t, count) } diff --git a/relayapi/routing/routing.go b/relayapi/routing/routing.go index ffe69e68c..16fea7742 100644 --- a/relayapi/routing/routing.go +++ b/relayapi/routing/routing.go @@ -77,7 +77,7 @@ func Setup( JSON: jsonerror.InvalidUsername("Username was invalid"), } } - return GetTxnFromRelay(httpReq, request, relayAPI, *userID) + return GetTransactionFromRelay(httpReq, request, relayAPI, *userID) }, )).Methods(http.MethodGet, http.MethodOptions).Name(GetRelayTransactionRouteName) } diff --git a/relayapi/routing/sendrelay_test.go b/relayapi/routing/sendrelay_test.go index 8961e4256..b5f2ac6dd 100644 --- a/relayapi/routing/sendrelay_test.go +++ b/relayapi/routing/sendrelay_test.go @@ -197,10 +197,10 @@ func TestUniqueTransactionStoredInDatabase(t *testing.T) { response := routing.SendTransactionToRelay( httpReq, &request, relayAPI, txn.TransactionID, *userID) - transaction, _, err := db.GetAsyncTransaction(context.TODO(), *userID) + transaction, _, err := db.GetTransaction(context.TODO(), *userID) assert.NoError(t, err, "Failed retrieving transaction") - transactionCount, err := db.GetAsyncTransactionCount(context.TODO(), *userID) + transactionCount, err := db.GetTransactionCount(context.TODO(), *userID) assert.NoError(t, err, "Failed retrieving transaction count") assert.Equal(t, 200, response.Code) diff --git a/relayapi/storage/interface.go b/relayapi/storage/interface.go index a1218f804..3fe57b9f6 100644 --- a/relayapi/storage/interface.go +++ b/relayapi/storage/interface.go @@ -22,9 +22,9 @@ import ( ) type Database interface { - StoreAsyncTransaction(ctx context.Context, txn gomatrixserverlib.Transaction) (*shared.Receipt, error) - AssociateAsyncTransactionWithDestinations(ctx context.Context, destinations map[gomatrixserverlib.UserID]struct{}, transactionID gomatrixserverlib.TransactionID, receipt *shared.Receipt) error - CleanAsyncTransactions(ctx context.Context, userID gomatrixserverlib.UserID, receipts []*shared.Receipt) error - GetAsyncTransaction(ctx context.Context, userID gomatrixserverlib.UserID) (*gomatrixserverlib.Transaction, *shared.Receipt, error) - GetAsyncTransactionCount(ctx context.Context, userID gomatrixserverlib.UserID) (int64, error) + StoreTransaction(ctx context.Context, txn gomatrixserverlib.Transaction) (*shared.Receipt, error) + AssociateTransactionWithDestinations(ctx context.Context, destinations map[gomatrixserverlib.UserID]struct{}, transactionID gomatrixserverlib.TransactionID, receipt *shared.Receipt) error + CleanTransactions(ctx context.Context, userID gomatrixserverlib.UserID, receipts []*shared.Receipt) error + GetTransaction(ctx context.Context, userID gomatrixserverlib.UserID) (*gomatrixserverlib.Transaction, *shared.Receipt, error) + GetTransactionCount(ctx context.Context, userID gomatrixserverlib.UserID) (int64, error) } diff --git a/relayapi/storage/shared/storage.go b/relayapi/storage/shared/storage.go index 0d856fbe0..5c33d5c8f 100644 --- a/relayapi/storage/shared/storage.go +++ b/relayapi/storage/shared/storage.go @@ -36,7 +36,7 @@ type Database struct { RelayQueueJSON tables.RelayQueueJSON } -func (d *Database) StoreAsyncTransaction( +func (d *Database) StoreTransaction( ctx context.Context, txn gomatrixserverlib.Transaction, ) (*shared.Receipt, error) { var err error @@ -58,7 +58,7 @@ func (d *Database) StoreAsyncTransaction( return &receipt, nil } -func (d *Database) AssociateAsyncTransactionWithDestinations( +func (d *Database) AssociateTransactionWithDestinations( ctx context.Context, destinations map[gomatrixserverlib.UserID]struct{}, transactionID gomatrixserverlib.TransactionID, @@ -78,7 +78,7 @@ func (d *Database) AssociateAsyncTransactionWithDestinations( return nil } -func (d *Database) CleanAsyncTransactions( +func (d *Database) CleanTransactions( ctx context.Context, userID gomatrixserverlib.UserID, receipts []*shared.Receipt, @@ -108,7 +108,7 @@ func (d *Database) CleanAsyncTransactions( return nil } -func (d *Database) GetAsyncTransaction( +func (d *Database) GetTransaction( ctx context.Context, userID gomatrixserverlib.UserID, ) (*gomatrixserverlib.Transaction, *shared.Receipt, error) { @@ -139,7 +139,7 @@ func (d *Database) GetAsyncTransaction( return transaction, &receipt, nil } -func (d *Database) GetAsyncTransactionCount( +func (d *Database) GetTransactionCount( ctx context.Context, userID gomatrixserverlib.UserID, ) (int64, error) { diff --git a/setup/config/config_federationapi.go b/setup/config/config_federationapi.go index 3e7a9ece5..6c198018d 100644 --- a/setup/config/config_federationapi.go +++ b/setup/config/config_federationapi.go @@ -18,11 +18,11 @@ type FederationAPI struct { // The default value is 16 if not specified, which is circa 18 hours. FederationMaxRetries uint32 `yaml:"send_max_retries"` - // How many consecutive failures that we should tolerate when sending federation - // requests to a specific server until we should assume they are offline. If we - // assume they are offline then we will attempt to send messages to their async - // relay server if we know of one that is appropriate. - FederationRetriesUntilAssumedOffline uint32 `yaml:"retries_until_assumed_offline"` + // P2P Feature: How many consecutive failures that we should tolerate when + // sending federation requests to a specific server until we should assume they + // are offline. If we assume they are offline then we will attempt to send + // messages to their relay server if we know of one that is appropriate. + P2PFederationRetriesUntilAssumedOffline uint32 `yaml:"p2p_retries_until_assumed_offline"` // FederationDisableTLSValidation disables the validation of X.509 TLS certs // on remote federation endpoints. This is not recommended in production! @@ -49,7 +49,7 @@ func (c *FederationAPI) Defaults(opts DefaultOpts) { c.Database.Defaults(10) } c.FederationMaxRetries = 16 - c.FederationRetriesUntilAssumedOffline = 2 + c.P2PFederationRetriesUntilAssumedOffline = 2 c.DisableTLSValidation = false c.DisableHTTPKeepalives = false if opts.Generate {