diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index a318d2099..fcd03d522 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -40,7 +40,7 @@ func SetupFederationSenderComponent( logrus.WithError(err).Panic("failed to connect to federation sender db") } - queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation) + queues := queue.NewOutgoingQueues(federationSenderDB, base.Cfg.Matrix.ServerName, federation) rsConsumer := consumers.NewOutputRoomEventConsumer( base.Cfg, base.KafkaConsumer, queues, diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index c0afe3be2..ce85cd004 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" ) @@ -29,23 +30,22 @@ import ( // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { - client *gomatrixserverlib.FederationClient - origin gomatrixserverlib.ServerName - destination gomatrixserverlib.ServerName - // The running mutex protects running, sentCounter, lastTransactionIDs and - // pendingEvents, pendingEDUs. + parent *OutgoingQueues + client *gomatrixserverlib.FederationClient + origin gomatrixserverlib.ServerName + destination gomatrixserverlib.ServerName runningMutex sync.Mutex - running bool - sentCounter int - lastTransactionIDs []gomatrixserverlib.TransactionID - pendingEvents []*gomatrixserverlib.Event - pendingEDUs []*gomatrixserverlib.EDU + running bool // protected by runningMutex + sentCounter int // protected by runningMutex + lastTransactionIDs []gomatrixserverlib.TransactionID // protected by runningMutex + pendingEvents []*types.PendingPDU // protected by runningMutex + pendingEDUs []*types.PendingEDU // protected by runningMutex } // Send event adds the event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to // start sending events to that destination. -func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) { +func (oq *destinationQueue) sendEvent(ev *types.PendingPDU) { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() oq.pendingEvents = append(oq.pendingEvents, ev) @@ -58,7 +58,7 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) { // sendEDU adds the EDU event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to // start sending event to that destination. -func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { +func (oq *destinationQueue) sendEDU(e *types.PendingEDU) { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() oq.pendingEDUs = append(oq.pendingEDUs, e) @@ -73,7 +73,9 @@ func (oq *destinationQueue) backgroundSend() { t := oq.next() if t == nil { // If the queue is empty then stop processing for this destination. - // TODO: Remove this destination from the queue map. + oq.parent.queuesMutex.Lock() + delete(oq.parent.queues, oq.destination) + oq.parent.queuesMutex.Unlock() return } @@ -116,13 +118,13 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} for _, pdu := range oq.pendingEvents { - t.PDUs = append(t.PDUs, *pdu) + t.PDUs = append(t.PDUs, *pdu.PDU) } oq.pendingEvents = nil oq.sentCounter += len(t.PDUs) for _, edu := range oq.pendingEDUs { - t.EDUs = append(t.EDUs, *edu) + t.EDUs = append(t.EDUs, *edu.EDU) } oq.pendingEDUs = nil oq.sentCounter += len(t.EDUs) diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 6a05c5f07..e2475e9db 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -15,9 +15,14 @@ package queue import ( + "context" + "errors" "fmt" "sync" + "time" + "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" ) @@ -25,20 +30,53 @@ import ( // OutgoingQueues is a collection of queues for sending transactions to other // matrix servers type OutgoingQueues struct { - origin gomatrixserverlib.ServerName - client *gomatrixserverlib.FederationClient - // The queuesMutex protects queues + db storage.Database + origin gomatrixserverlib.ServerName + client *gomatrixserverlib.FederationClient queuesMutex sync.Mutex - queues map[gomatrixserverlib.ServerName]*destinationQueue + queues map[gomatrixserverlib.ServerName]*destinationQueue // protected by queuesMutex } // NewOutgoingQueues makes a new OutgoingQueues -func NewOutgoingQueues(origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient) *OutgoingQueues { - return &OutgoingQueues{ +func NewOutgoingQueues( + db storage.Database, + origin gomatrixserverlib.ServerName, + client *gomatrixserverlib.FederationClient, +) *OutgoingQueues { + queues := OutgoingQueues{ + db: db, origin: origin, client: client, queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } + + go queues.processRetries() + return &queues +} + +func (oqs *OutgoingQueues) QueueEvent( + destination gomatrixserverlib.ServerName, + event gomatrixserverlib.Event, + retryAt time.Time, +) error { + if time.Until(retryAt) < time.Second*5 { + return errors.New("can't queue for less than 5 seconds") + } + + return oqs.db.QueueEventForRetry( + context.Background(), // context + string(oqs.origin), // origin servername + string(destination), // destination servername + event, // event + 0, // attempts + retryAt, // retry at time + ) +} + +func (oqs *OutgoingQueues) RemoveQueue(name gomatrixserverlib.ServerName) { + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + delete(oqs.queues, name) } // SendEvent sends an event to the destinations @@ -64,9 +102,10 @@ func (oqs *OutgoingQueues) SendEvent( oqs.queuesMutex.Lock() defer oqs.queuesMutex.Unlock() for _, destination := range destinations { - oq := oqs.queues[destination] - if oq == nil { + oq, ok := oqs.queues[destination] + if !ok { oq = &destinationQueue{ + parent: oqs, origin: oqs.origin, destination: destination, client: oqs.client, @@ -74,7 +113,9 @@ func (oqs *OutgoingQueues) SendEvent( oqs.queues[destination] = oq } - oq.sendEvent(ev) + oq.sendEvent(&types.PendingPDU{ + PDU: ev, + }) } return nil @@ -115,12 +156,36 @@ func (oqs *OutgoingQueues) SendEDU( oqs.queues[destination] = oq } - oq.sendEDU(e) + oq.sendEDU(&types.PendingEDU{ + EDU: e, + }) } return nil } +func (oqs *OutgoingQueues) processRetries() { + ctx := context.Background() + for { + time.Sleep(time.Second * 5) + fmt.Println("trying to process retries") + + retries, err := oqs.db.SelectRetryEventsPending(ctx) + if err != nil { + fmt.Println("failed:", err) + continue + } + + fmt.Println("there are", len(retries), "PDUs to retry sending") + + for _, retry := range retries { + fmt.Println("retrying:", retry) + } + + oqs.db.DeleteRetryExpiredEvents(ctx) + } +} + // filterDestinations removes our own server from the list of destinations. // Otherwise we could end up trying to talk to ourselves. func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) []gomatrixserverlib.ServerName { diff --git a/federationsender/storage/postgres/retries_table.go b/federationsender/storage/postgres/retries_table.go new file mode 100644 index 000000000..8867a8685 --- /dev/null +++ b/federationsender/storage/postgres/retries_table.go @@ -0,0 +1,136 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "time" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/federationsender/types" +) + +const retrySchema = ` +CREATE TABLE IF NOT EXISTS federationsender_retry ( + retry_nid BIGSERIAL PRIMARY KEY, + origin_server_name TEXT NOT NULL, + destination_server_name TEXT NOT NULL, + event_json BYTEA NOT NULL, + attempts BIGINT DEFAULT 0, + retry_at BIGINT NOT NULL, + CONSTRAINT federationsender_retry_unique UNIQUE (origin_server_name, destination_server_name, event_json) +);` + +const upsertEventSQL = ` +INSERT INTO federationsender_retry + (origin_server_name, destination_server_name, event_json, attempts, retry_at) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT ON CONSTRAINT federationsender_retry_unique + DO UPDATE SET attempts = $4, retry_at = $5 +` + +const deleteEventSQL = ` + DELETE FROM federationsender_retry WHERE retry_nid = $1 +` + +const selectEventsForRetry = ` + SELECT * FROM federationsender_retry WHERE retry_at >= $1 AND attempts < 5 +` + +const deleteExpiredEvents = ` + DELETE FROM federationsender_retry WHERE attempts > 5 +` + +type retryStatements struct { + upsertEventStmt *sql.Stmt + deleteEventStmt *sql.Stmt + selectEventsForRetryStmt *sql.Stmt + deleteExpiredEventsStmt *sql.Stmt +} + +func (s *retryStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(retrySchema) + if err != nil { + return + } + + if s.upsertEventStmt, err = db.Prepare(upsertEventSQL); err != nil { + return + } + if s.deleteEventStmt, err = db.Prepare(deleteEventSQL); err != nil { + return + } + if s.selectEventsForRetryStmt, err = db.Prepare(selectEventsForRetry); err != nil { + return + } + if s.deleteExpiredEventsStmt, err = db.Prepare(deleteExpiredEvents); err != nil { + return + } + return +} + +func (s *retryStatements) upsertRetryEvent( + ctx context.Context, txn *sql.Tx, + originServer string, destinationServer string, eventJSON []byte, + attempts int, retryAt int64, +) error { + _, err := common.TxStmt(txn, s.upsertEventStmt).ExecContext( + ctx, originServer, destinationServer, + eventJSON, attempts, retryAt, + ) + return err +} + +func (s *retryStatements) deleteRetryEvent( + ctx context.Context, txn *sql.Tx, retryNID int64, +) error { + _, err := common.TxStmt(txn, s.deleteEventStmt).ExecContext( + ctx, retryNID, + ) + return err +} + +func (s *retryStatements) selectRetryEventsPending( + ctx context.Context, txn *sql.Tx, +) ([]*types.PendingPDU, error) { + var pending []*types.PendingPDU + stmt := common.TxStmt(txn, s.selectEventsForRetryStmt) + rows, err := stmt.QueryContext(ctx, time.Now().UTC().Unix()) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var entry types.PendingPDU + if err = rows.Scan( + &entry.RetryNID, &entry.Origin, &entry.Destination, + &entry.PDU, &entry.Attempts, &entry.Attempts, + ); err != nil { + return nil, err + } + pending = append(pending, &entry) + } + return pending, err +} + +func (s *retryStatements) deleteRetryExpiredEvents( + ctx context.Context, txn *sql.Tx, +) error { + stmt := common.TxStmt(txn, s.deleteExpiredEventsStmt) + _, err := stmt.ExecContext(ctx) + return err +} diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index c60f6dc5c..e2365b5f6 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -18,15 +18,19 @@ package postgres import ( "context" "database/sql" + "encoding/json" + "time" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/gomatrixserverlib" ) // Database stores information needed by the federation sender type Database struct { joinedHostsStatements roomStatements + retryStatements common.PartitionOffsetStatements db *sql.DB } @@ -55,6 +59,10 @@ func (d *Database) prepare() error { return err } + if err = d.retryStatements.prepare(d.db); err != nil { + return err + } + return d.PartitionOffsetStatements.Prepare(d.db, "federationsender") } @@ -120,3 +128,36 @@ func (d *Database) GetJoinedHosts( ) ([]types.JoinedHost, error) { return d.selectJoinedHosts(ctx, roomID) } + +func (d *Database) QueueEventForRetry( + ctx context.Context, + originServer, destinationServer string, event gomatrixserverlib.Event, + attempts int, retryAt time.Time, +) error { + eventJSON, err := json.Marshal(event) + if err != nil { + return err + } + if err := d.upsertRetryEvent( + ctx, nil, + originServer, destinationServer, + eventJSON, attempts, retryAt.UTC().Unix(), + ); err != nil { + return err + } + return nil +} + +func (d *Database) DeleteRetryEvent(ctx context.Context, retryNID int64) error { + return d.deleteRetryEvent(ctx, nil, retryNID) +} + +func (d *Database) SelectRetryEventsPending(ctx context.Context) ( + []*types.PendingPDU, error, +) { + return d.selectRetryEventsPending(ctx, nil) +} + +func (d *Database) DeleteRetryExpiredEvents(ctx context.Context) error { + return d.deleteRetryExpiredEvents(ctx, nil) +} diff --git a/federationsender/storage/storage.go b/federationsender/storage/storage.go index 4ce151c7a..2d2fcdb18 100644 --- a/federationsender/storage/storage.go +++ b/federationsender/storage/storage.go @@ -17,16 +17,21 @@ package storage import ( "context" "net/url" + "time" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/federationsender/storage/postgres" "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/gomatrixserverlib" ) type Database interface { common.PartitionStorer UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) + QueueEventForRetry(ctx context.Context, originServer, destinationServer string, event gomatrixserverlib.Event, attempts int, retryAt time.Time) error + SelectRetryEventsPending(ctx context.Context) ([]*types.PendingPDU, error) + DeleteRetryExpiredEvents(ctx context.Context) error } // NewDatabase opens a new database diff --git a/federationsender/types/types.go b/federationsender/types/types.go index 05ba92f77..55b1b9beb 100644 --- a/federationsender/types/types.go +++ b/federationsender/types/types.go @@ -16,6 +16,7 @@ package types import ( "fmt" + "time" "github.com/matrix-org/gomatrixserverlib" ) @@ -43,3 +44,21 @@ func (e EventIDMismatchError) Error() string { e.DatabaseID, e.RoomServerID, ) } + +type Queueable struct { + RetryNID int64 + Origin gomatrixserverlib.ServerName + Destination gomatrixserverlib.ServerName + Attempts int + LastAttempt time.Time +} + +type PendingPDU struct { + Queueable + PDU *gomatrixserverlib.Event +} + +type PendingEDU struct { + Queueable + EDU *gomatrixserverlib.EDU +}