Handle async send success/fail cases

This commit is contained in:
Devon Hudson 2022-12-01 14:17:13 -07:00
parent e26d07c779
commit 9dbfcd9e06
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
6 changed files with 210 additions and 51 deletions

View file

@ -164,7 +164,7 @@ func (a *FederationInternalAPI) doRequestIfNotBackingOffOrBlacklisted(
RetryAfter: retryAfter,
}
}
stats.Success()
stats.Success(false)
return res, nil
}

View file

@ -40,7 +40,7 @@ func (r *FederationInternalAPI) PerformDirectoryLookup(
}
response.RoomID = dir.RoomID
response.ServerNames = dir.Servers
r.statistics.ForServer(request.ServerName).Success()
r.statistics.ForServer(request.ServerName).Success(false)
return nil
}
@ -167,7 +167,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
r.statistics.ForServer(serverName).Failure()
return fmt.Errorf("r.federation.MakeJoin: %w", err)
}
r.statistics.ForServer(serverName).Success()
r.statistics.ForServer(serverName).Success(false)
// Set all the fields to be what they should be, this should be a no-op
// but it's possible that the remote server returned us something "odd"
@ -221,7 +221,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
r.statistics.ForServer(serverName).Failure()
return fmt.Errorf("r.federation.SendJoin: %w", err)
}
r.statistics.ForServer(serverName).Success()
r.statistics.ForServer(serverName).Success(false)
// If the remote server returned an event in the "event" key of
// the send_join request then we should use that instead. It may
@ -451,7 +451,7 @@ func (r *FederationInternalAPI) performOutboundPeekUsingServer(
r.statistics.ForServer(serverName).Failure()
return fmt.Errorf("r.federation.Peek: %w", err)
}
r.statistics.ForServer(serverName).Success()
r.statistics.ForServer(serverName).Success(false)
// Work out if we support the room version that has been supplied in
// the peek response.
@ -588,7 +588,7 @@ func (r *FederationInternalAPI) PerformLeave(
continue
}
r.statistics.ForServer(serverName).Success()
r.statistics.ForServer(serverName).Success(false)
return nil
}

View file

@ -350,7 +350,7 @@ func (oq *destinationQueue) backgroundSend() {
// If we have pending PDUs or EDUs then construct a transaction.
// Try sending the next transaction and see what happens.
terr := oq.nextTransaction(toSendPDUs, toSendEDUs)
terr, asyncSuccess := oq.nextTransaction(toSendPDUs, toSendEDUs)
if terr != nil {
// We failed to send the transaction. Mark it as a failure.
_, blacklisted := oq.statistics.Failure()
@ -367,18 +367,19 @@ func (oq *destinationQueue) backgroundSend() {
return
}
} else {
oq.handleTransactionSuccess(pduCount, eduCount)
oq.handleTransactionSuccess(pduCount, eduCount, asyncSuccess)
}
}
}
// nextTransaction creates a new transaction from the pending event
// queue and sends it.
// Returns an error if the transaction wasn't sent.
// Returns an error if the transaction wasn't sent. And whether the success
// was to an async mailserver or not.
func (oq *destinationQueue) nextTransaction(
pdus []*queuedPDU,
edus []*queuedEDU,
) error {
) (err error, asyncSuccess bool) {
// Create the transaction.
t, pduReceipts, eduReceipts := oq.createTransaction(pdus, edus)
logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
@ -387,21 +388,22 @@ func (oq *destinationQueue) nextTransaction(
ctx, cancel := context.WithTimeout(oq.process.Context(), time.Minute*5)
defer cancel()
var err error
mailservers := oq.statistics.KnownMailservers()
if oq.statistics.AssumedOffline() && len(mailservers) > 0 {
// TODO : how to pass through actual userID here?!?!?!?!
userID, _ := gomatrixserverlib.NewUserID("@:"+string(oq.origin), false)
anySuccess := false
userID, err := gomatrixserverlib.NewUserID("@user:"+string(oq.origin), false)
if err != nil {
return err, false
}
for _, mailserver := range mailservers {
_, asyncErr := oq.client.SendAsyncTransaction(ctx, *userID, t, mailserver)
if asyncErr != nil {
err = asyncErr
} else {
anySuccess = true
asyncSuccess = true
}
}
if anySuccess {
if asyncSuccess {
err = nil
}
} else {
@ -426,7 +428,7 @@ func (oq *destinationQueue) nextTransaction(
oq.transactionIDMutex.Lock()
oq.transactionID = ""
oq.transactionIDMutex.Unlock()
return nil
return nil, asyncSuccess
case gomatrix.HTTPError:
// Report that we failed to send the transaction and we
// will retry again, subject to backoff.
@ -436,13 +438,13 @@ func (oq *destinationQueue) nextTransaction(
// to a 400-ish error
code := errResponse.Code
logrus.Debug("Transaction failed with HTTP", code)
return err
return err, false
default:
logrus.WithFields(logrus.Fields{
"destination": oq.destination,
logrus.ErrorKey: err,
}).Debugf("Failed to send transaction %q", t.TransactionID)
return err
return err, false
}
}
@ -529,10 +531,10 @@ func (oq *destinationQueue) blacklistDestination() {
// handleTransactionSuccess updates the cached event queues as well as the success and
// backoff information for this server.
func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int) {
func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int, asyncSuccess bool) {
// If we successfully sent the transaction then clear out
// the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success()
oq.statistics.Success(asyncSuccess)
oq.pendingMutex.Lock()
defer oq.pendingMutex.Unlock()

View file

@ -75,6 +75,7 @@ func createDatabase() storage.Database {
pendingEDUs: make(map[*shared.Receipt]*gomatrixserverlib.EDU),
associatedPDUs: make(map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}),
associatedEDUs: make(map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}),
mailservers: make(map[gomatrixserverlib.ServerName][]gomatrixserverlib.ServerName),
}
}
@ -89,6 +90,7 @@ type fakeDatabase struct {
pendingEDUs map[*shared.Receipt]*gomatrixserverlib.EDU
associatedPDUs map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}
associatedEDUs map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}
mailservers map[gomatrixserverlib.ServerName][]gomatrixserverlib.ServerName
}
var nidMutex sync.Mutex
@ -340,7 +342,36 @@ func (d *fakeDatabase) IsServerAssumedOffline(serverName gomatrixserverlib.Serve
}
func (d *fakeDatabase) GetMailserversForServer(serverName gomatrixserverlib.ServerName) ([]gomatrixserverlib.ServerName, error) {
return []gomatrixserverlib.ServerName{}, nil
d.dbMutex.Lock()
defer d.dbMutex.Unlock()
knownMailservers := []gomatrixserverlib.ServerName{}
if mailservers, ok := d.mailservers[serverName]; ok {
knownMailservers = mailservers
}
return knownMailservers, nil
}
func (d *fakeDatabase) AddMailserversForServer(serverName gomatrixserverlib.ServerName, mailservers []gomatrixserverlib.ServerName) error {
d.dbMutex.Lock()
defer d.dbMutex.Unlock()
if knownMailservers, ok := d.mailservers[serverName]; ok {
for _, mailserver := range mailservers {
alreadyKnown := false
for _, knownMailserver := range knownMailservers {
if mailserver == knownMailserver {
alreadyKnown = true
}
}
if !alreadyKnown {
d.mailservers[serverName] = append(d.mailservers[serverName], mailserver)
}
}
}
return nil
}
type stubFederationRoomServerAPI struct {
@ -354,8 +385,10 @@ func (r *stubFederationRoomServerAPI) QueryServerBannedFromRoom(ctx context.Cont
type stubFederationClient struct {
api.FederationClient
shouldTxSucceed bool
txCount atomic.Uint32
shouldTxSucceed bool
shouldTxAsyncSucceed bool
txCount atomic.Uint32
txAsyncCount atomic.Uint32
}
func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) {
@ -368,6 +401,16 @@ func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixse
return gomatrixserverlib.RespSend{}, result
}
func (f *stubFederationClient) SendAsyncTransaction(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error) {
var result error
if !f.shouldTxAsyncSucceed {
result = fmt.Errorf("async transaction failed")
}
f.txAsyncCount.Add(1)
return gomatrixserverlib.EmptyResp{}, result
}
func mustCreatePDU(t *testing.T) *gomatrixserverlib.HeaderedEvent {
t.Helper()
content := `{"type":"m.room.message"}`
@ -383,12 +426,14 @@ func mustCreateEDU(t *testing.T) *gomatrixserverlib.EDU {
return &gomatrixserverlib.EDU{Type: gomatrixserverlib.MTyping}
}
func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32, shouldTxSucceed bool, t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *stubFederationClient, *OutgoingQueues, *process.ProcessContext, func()) {
func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32, shouldTxSucceed bool, shouldTxAsyncSucceed bool, t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *stubFederationClient, *OutgoingQueues, *process.ProcessContext, func()) {
db, processContext, close := mustCreateFederationDatabase(t, dbType, realDatabase)
fc := &stubFederationClient{
shouldTxSucceed: shouldTxSucceed,
txCount: *atomic.NewUint32(0),
shouldTxSucceed: shouldTxSucceed,
shouldTxAsyncSucceed: shouldTxAsyncSucceed,
txCount: *atomic.NewUint32(0),
txAsyncCount: *atomic.NewUint32(0),
}
rs := &stubFederationRoomServerAPI{}
@ -408,7 +453,7 @@ func TestSendPDUOnSuccessRemovedFromDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -437,7 +482,7 @@ func TestSendEDUOnSuccessRemovedFromDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -466,7 +511,7 @@ func TestSendPDUOnFailStoredInDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -496,7 +541,7 @@ func TestSendEDUOnFailStoredInDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -526,7 +571,7 @@ func TestSendPDUAgainDoesntInterruptBackoff(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -577,7 +622,7 @@ func TestSendEDUAgainDoesntInterruptBackoff(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -628,7 +673,7 @@ func TestSendPDUMultipleFailuresBlacklisted(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -660,7 +705,7 @@ func TestSendEDUMultipleFailuresBlacklisted(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -692,7 +737,7 @@ func TestSendPDUBlacklistedWithPriorExternalFailure(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -726,7 +771,7 @@ func TestSendEDUBlacklistedWithPriorExternalFailure(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -760,7 +805,7 @@ func TestRetryServerSendsPDUSuccessfully(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(1)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -811,7 +856,7 @@ func TestRetryServerSendsEDUSuccessfully(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(1)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -865,7 +910,7 @@ func TestSendPDUBatches(t *testing.T) {
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -909,7 +954,7 @@ func TestSendEDUBatches(t *testing.T) {
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -953,7 +998,7 @@ func TestSendPDUAndEDUBatches(t *testing.T) {
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -1004,7 +1049,7 @@ func TestExternalFailureBackoffDoesntStartQueue(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -1042,7 +1087,7 @@ func TestQueueInteractsWithRealDatabasePDUAndEDU(t *testing.T) {
destination := gomatrixserverlib.ServerName("remotehost")
destinations := map[gomatrixserverlib.ServerName]struct{}{destination: {}}
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, t, dbType, true)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, dbType, true)
// NOTE : These defers aren't called if go test is killed so the dbs may not get cleaned up.
defer close()
defer func() {
@ -1108,7 +1153,7 @@ func TestSendPDUMultipleFailuresAssumedOffline(t *testing.T) {
failuresUntilBlacklist := uint32(7)
failuresUntilAssumedOffline := uint32(2)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -1141,7 +1186,7 @@ func TestSendEDUMultipleFailuresAssumedOffline(t *testing.T) {
failuresUntilBlacklist := uint32(7)
failuresUntilAssumedOffline := uint32(2)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, t, test.DBTypeSQLite, false)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@ -1168,3 +1213,81 @@ func TestSendEDUMultipleFailuresAssumedOffline(t *testing.T) {
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendPDUOnAsyncSuccessRemovedFromDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
failuresUntilAssumedOffline := uint32(1)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, true, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
mailservers := []gomatrixserverlib.ServerName{"mailserver"}
queues.statistics.ForServer(destination).AddMailservers(mailservers)
ev := mustCreatePDU(t)
err := queues.SendEvent(ev, "localhost", []gomatrixserverlib.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == 1 {
if fc.txAsyncCount.Load() == 1 {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Success()
}
return poll.Continue("waiting for event to be removed from database. Currently present PDU: %d", len(data))
}
return poll.Continue("waiting for more async send attempts before checking database. Currently %d", fc.txAsyncCount.Load())
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
assumedOffline, _ := db.IsServerAssumedOffline(destination)
assert.Equal(t, assumedOffline, true)
}
func TestSendEDUOnAsyncSuccessRemovedFromDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
failuresUntilAssumedOffline := uint32(1)
destination := gomatrixserverlib.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, true, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
mailservers := []gomatrixserverlib.ServerName{"mailserver"}
queues.statistics.ForServer(destination).AddMailservers(mailservers)
ev := mustCreateEDU(t)
err := queues.SendEDU(ev, "localhost", []gomatrixserverlib.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == 1 {
if fc.txAsyncCount.Load() == 1 {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Success()
}
return poll.Continue("waiting for event to be removed from database. Currently present EDU: %d", len(data))
}
return poll.Continue("waiting for more async send attempts before checking database. Currently %d", fc.txAsyncCount.Load())
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
assumedOffline, _ := db.IsServerAssumedOffline(destination)
assert.Equal(t, assumedOffline, true)
}

View file

@ -138,13 +138,19 @@ func (s *ServerStatistics) AssignBackoffNotifier(notifier func()) {
// attempt, which increases the sent counter and resets the idle and
// failure counters. If a host was blacklisted at this point then
// we will unblacklist it.
func (s *ServerStatistics) Success() {
// `async` specifies whether the success was to the actual destination
// or one of their mailservers.
func (s *ServerStatistics) Success(async bool) {
s.cancel()
s.backoffCount.Store(0)
s.successCounter.Inc()
if s.statistics.DB != nil {
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
// NOTE : Sending to the final destination vs. a mailserver has
// slightly different semantics.
if !async {
s.successCounter.Inc()
if s.statistics.DB != nil {
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
}
}
}
}
@ -265,3 +271,31 @@ func (s *ServerStatistics) SuccessCount() uint32 {
func (s *ServerStatistics) KnownMailservers() []gomatrixserverlib.ServerName {
return s.knownMailservers
}
func (s *ServerStatistics) AddMailservers(mailservers []gomatrixserverlib.ServerName) {
seenSet := make(map[gomatrixserverlib.ServerName]bool)
uniqueList := []gomatrixserverlib.ServerName{}
for _, srv := range mailservers {
if seenSet[srv] {
continue
}
seenSet[srv] = true
uniqueList = append(uniqueList, srv)
}
err := s.statistics.DB.AddMailserversForServer(s.serverName, uniqueList)
if err == nil {
for _, newServer := range uniqueList {
alreadyKnown := false
for _, srv := range s.knownMailservers {
if srv == newServer {
alreadyKnown = true
}
}
if !alreadyKnown {
s.knownMailservers = append(s.knownMailservers, newServer)
}
}
}
}

View file

@ -14,7 +14,7 @@ func TestBackoff(t *testing.T) {
}
// Start by checking that counting successes works.
server.Success()
server.Success(false)
if successes := server.SuccessCount(); successes != 1 {
t.Fatalf("Expected success count 1, got %d", successes)
}