From b60cc7283d4659014eae6d55886dda8f168a56af Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 10 Feb 2020 14:05:23 +0000 Subject: [PATCH] Some updates, sorta works now --- federationsender/queue/destinationqueue.go | 10 +++- federationsender/queue/queue.go | 56 ++++++++++++------- .../storage/postgres/retries_table.go | 16 ++++-- 3 files changed, 57 insertions(+), 25 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index ce85cd004..764813008 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -79,7 +79,6 @@ func (oq *destinationQueue) backgroundSend() { return } - // TODO: handle retries. // TODO: blacklist uncooperative servers. _, err := oq.client.SendTransaction(context.TODO(), *t) @@ -88,6 +87,15 @@ func (oq *destinationQueue) backgroundSend() { "destination": oq.destination, log.ErrorKey: err, }).Info("problem sending transaction") + + for _, pdu := range (*t).PDUs { + if err := oq.parent.QueueEvent((*t).Destination, pdu); err != nil { + log.WithFields(log.Fields{ + "destination": (*t).Destination, + log.ErrorKey: err, + }).Warn("Error queuing PDU") + } + } } } } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index e2475e9db..09e8299b5 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -16,7 +16,6 @@ package queue import ( "context" - "errors" "fmt" "sync" "time" @@ -27,6 +26,8 @@ import ( log "github.com/sirupsen/logrus" ) +const retryInterval = time.Second * 5 + // OutgoingQueues is a collection of queues for sending transactions to other // matrix servers type OutgoingQueues struct { @@ -57,19 +58,14 @@ func NewOutgoingQueues( func (oqs *OutgoingQueues) QueueEvent( destination gomatrixserverlib.ServerName, event gomatrixserverlib.Event, - retryAt time.Time, ) error { - if time.Until(retryAt) < time.Second*5 { - return errors.New("can't queue for less than 5 seconds") - } - return oqs.db.QueueEventForRetry( - context.Background(), // context - string(oqs.origin), // origin servername - string(destination), // destination servername - event, // event - 0, // attempts - retryAt, // retry at time + context.Background(), // context + string(oqs.origin), // origin servername + string(destination), // destination servername + event, // event + 0, // attempts + time.Now().Add(retryInterval*2), // retry at time ) } @@ -149,6 +145,7 @@ func (oqs *OutgoingQueues) SendEDU( oq := oqs.queues[destination] if oq == nil { oq = &destinationQueue{ + parent: oqs, origin: oqs.origin, destination: destination, client: oqs.client, @@ -167,22 +164,41 @@ func (oqs *OutgoingQueues) SendEDU( func (oqs *OutgoingQueues) processRetries() { ctx := context.Background() for { - time.Sleep(time.Second * 5) - fmt.Println("trying to process retries") + time.Sleep(retryInterval) + if err := oqs.db.DeleteRetryExpiredEvents(ctx); err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + }).Warn("Error cleaning expired retry events") + } retries, err := oqs.db.SelectRetryEventsPending(ctx) if err != nil { - fmt.Println("failed:", err) + log.WithFields(log.Fields{ + log.ErrorKey: err, + }).Warn("Error selecting pending retry events") continue } - fmt.Println("there are", len(retries), "PDUs to retry sending") - - for _, retry := range retries { - fmt.Println("retrying:", retry) + if len(retries) == 0 { + continue } - oqs.db.DeleteRetryExpiredEvents(ctx) + log.WithFields(log.Fields{ + "pending": len(retries), + }).Info("Retrying failed PDU sends") + + for _, retry := range retries { + if err := oqs.SendEvent( + retry.PDU, + retry.Origin, + []gomatrixserverlib.ServerName{retry.Destination}, + ); err != nil { + log.WithFields(log.Fields{ + "destination": retry.Destination, + log.ErrorKey: err, + }).Warn("Error resending retry event") + } + } } } diff --git a/federationsender/storage/postgres/retries_table.go b/federationsender/storage/postgres/retries_table.go index 8867a8685..f9edeb1e1 100644 --- a/federationsender/storage/postgres/retries_table.go +++ b/federationsender/storage/postgres/retries_table.go @@ -18,6 +18,7 @@ package postgres import ( "context" "database/sql" + "encoding/json" "time" "github.com/matrix-org/dendrite/common" @@ -40,7 +41,9 @@ INSERT INTO federationsender_retry (origin_server_name, destination_server_name, event_json, attempts, retry_at) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT federationsender_retry_unique - DO UPDATE SET attempts = $4, retry_at = $5 + DO UPDATE SET + attempts = federationsender_retry.attempts+1, + retry_at = $5 ` const deleteEventSQL = ` @@ -49,10 +52,11 @@ const deleteEventSQL = ` const selectEventsForRetry = ` SELECT * FROM federationsender_retry WHERE retry_at >= $1 AND attempts < 5 + ORDER BY retry_at ` const deleteExpiredEvents = ` - DELETE FROM federationsender_retry WHERE attempts > 5 + DELETE FROM federationsender_retry WHERE attempts >= 5 OR retry_at < $1 ` type retryStatements struct { @@ -116,12 +120,16 @@ func (s *retryStatements) selectRetryEventsPending( defer rows.Close() for rows.Next() { var entry types.PendingPDU + var rawEvent []byte if err = rows.Scan( &entry.RetryNID, &entry.Origin, &entry.Destination, - &entry.PDU, &entry.Attempts, &entry.Attempts, + &rawEvent, &entry.Attempts, &entry.Attempts, ); err != nil { return nil, err } + if err := json.Unmarshal(rawEvent, &entry.PDU); err != nil { + return nil, err + } pending = append(pending, &entry) } return pending, err @@ -131,6 +139,6 @@ func (s *retryStatements) deleteRetryExpiredEvents( ctx context.Context, txn *sql.Tx, ) error { stmt := common.TxStmt(txn, s.deleteExpiredEventsStmt) - _, err := stmt.ExecContext(ctx) + _, err := stmt.ExecContext(ctx, time.Now().UTC().Unix()) return err }