Initial work for retrying failed federation requests

This commit is contained in:
Neil Alexander 2020-02-07 17:47:37 +00:00
parent b72d7eb0cf
commit be819a022e
7 changed files with 294 additions and 26 deletions

View file

@ -40,7 +40,7 @@ func SetupFederationSenderComponent(
logrus.WithError(err).Panic("failed to connect to federation sender db")
}
queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation)
queues := queue.NewOutgoingQueues(federationSenderDB, base.Cfg.Matrix.ServerName, federation)
rsConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, queues,

View file

@ -20,6 +20,7 @@ import (
"sync"
"time"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
@ -29,23 +30,22 @@ import (
// ensures that only one request is in flight to a given destination
// at a time.
type destinationQueue struct {
client *gomatrixserverlib.FederationClient
origin gomatrixserverlib.ServerName
destination gomatrixserverlib.ServerName
// The running mutex protects running, sentCounter, lastTransactionIDs and
// pendingEvents, pendingEDUs.
parent *OutgoingQueues
client *gomatrixserverlib.FederationClient
origin gomatrixserverlib.ServerName
destination gomatrixserverlib.ServerName
runningMutex sync.Mutex
running bool
sentCounter int
lastTransactionIDs []gomatrixserverlib.TransactionID
pendingEvents []*gomatrixserverlib.Event
pendingEDUs []*gomatrixserverlib.EDU
running bool // protected by runningMutex
sentCounter int // protected by runningMutex
lastTransactionIDs []gomatrixserverlib.TransactionID // protected by runningMutex
pendingEvents []*types.PendingPDU // protected by runningMutex
pendingEDUs []*types.PendingEDU // protected by runningMutex
}
// Send event adds the event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) {
func (oq *destinationQueue) sendEvent(ev *types.PendingPDU) {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
oq.pendingEvents = append(oq.pendingEvents, ev)
@ -58,7 +58,7 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) {
// sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending event to that destination.
func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) {
func (oq *destinationQueue) sendEDU(e *types.PendingEDU) {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
oq.pendingEDUs = append(oq.pendingEDUs, e)
@ -73,7 +73,9 @@ func (oq *destinationQueue) backgroundSend() {
t := oq.next()
if t == nil {
// If the queue is empty then stop processing for this destination.
// TODO: Remove this destination from the queue map.
oq.parent.queuesMutex.Lock()
delete(oq.parent.queues, oq.destination)
oq.parent.queuesMutex.Unlock()
return
}
@ -116,13 +118,13 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}
for _, pdu := range oq.pendingEvents {
t.PDUs = append(t.PDUs, *pdu)
t.PDUs = append(t.PDUs, *pdu.PDU)
}
oq.pendingEvents = nil
oq.sentCounter += len(t.PDUs)
for _, edu := range oq.pendingEDUs {
t.EDUs = append(t.EDUs, *edu)
t.EDUs = append(t.EDUs, *edu.EDU)
}
oq.pendingEDUs = nil
oq.sentCounter += len(t.EDUs)

View file

@ -15,9 +15,14 @@
package queue
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
@ -25,20 +30,53 @@ import (
// OutgoingQueues is a collection of queues for sending transactions to other
// matrix servers
type OutgoingQueues struct {
origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient
// The queuesMutex protects queues
db storage.Database
origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient
queuesMutex sync.Mutex
queues map[gomatrixserverlib.ServerName]*destinationQueue
queues map[gomatrixserverlib.ServerName]*destinationQueue // protected by queuesMutex
}
// NewOutgoingQueues makes a new OutgoingQueues
func NewOutgoingQueues(origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient) *OutgoingQueues {
return &OutgoingQueues{
func NewOutgoingQueues(
db storage.Database,
origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient,
) *OutgoingQueues {
queues := OutgoingQueues{
db: db,
origin: origin,
client: client,
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
}
go queues.processRetries()
return &queues
}
func (oqs *OutgoingQueues) QueueEvent(
destination gomatrixserverlib.ServerName,
event gomatrixserverlib.Event,
retryAt time.Time,
) error {
if time.Until(retryAt) < time.Second*5 {
return errors.New("can't queue for less than 5 seconds")
}
return oqs.db.QueueEventForRetry(
context.Background(), // context
string(oqs.origin), // origin servername
string(destination), // destination servername
event, // event
0, // attempts
retryAt, // retry at time
)
}
func (oqs *OutgoingQueues) RemoveQueue(name gomatrixserverlib.ServerName) {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
delete(oqs.queues, name)
}
// SendEvent sends an event to the destinations
@ -64,9 +102,10 @@ func (oqs *OutgoingQueues) SendEvent(
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
for _, destination := range destinations {
oq := oqs.queues[destination]
if oq == nil {
oq, ok := oqs.queues[destination]
if !ok {
oq = &destinationQueue{
parent: oqs,
origin: oqs.origin,
destination: destination,
client: oqs.client,
@ -74,7 +113,9 @@ func (oqs *OutgoingQueues) SendEvent(
oqs.queues[destination] = oq
}
oq.sendEvent(ev)
oq.sendEvent(&types.PendingPDU{
PDU: ev,
})
}
return nil
@ -115,12 +156,36 @@ func (oqs *OutgoingQueues) SendEDU(
oqs.queues[destination] = oq
}
oq.sendEDU(e)
oq.sendEDU(&types.PendingEDU{
EDU: e,
})
}
return nil
}
func (oqs *OutgoingQueues) processRetries() {
ctx := context.Background()
for {
time.Sleep(time.Second * 5)
fmt.Println("trying to process retries")
retries, err := oqs.db.SelectRetryEventsPending(ctx)
if err != nil {
fmt.Println("failed:", err)
continue
}
fmt.Println("there are", len(retries), "PDUs to retry sending")
for _, retry := range retries {
fmt.Println("retrying:", retry)
}
oqs.db.DeleteRetryExpiredEvents(ctx)
}
}
// filterDestinations removes our own server from the list of destinations.
// Otherwise we could end up trying to talk to ourselves.
func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) []gomatrixserverlib.ServerName {

View file

@ -0,0 +1,136 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"time"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/types"
)
const retrySchema = `
CREATE TABLE IF NOT EXISTS federationsender_retry (
retry_nid BIGSERIAL PRIMARY KEY,
origin_server_name TEXT NOT NULL,
destination_server_name TEXT NOT NULL,
event_json BYTEA NOT NULL,
attempts BIGINT DEFAULT 0,
retry_at BIGINT NOT NULL,
CONSTRAINT federationsender_retry_unique UNIQUE (origin_server_name, destination_server_name, event_json)
);`
const upsertEventSQL = `
INSERT INTO federationsender_retry
(origin_server_name, destination_server_name, event_json, attempts, retry_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT ON CONSTRAINT federationsender_retry_unique
DO UPDATE SET attempts = $4, retry_at = $5
`
const deleteEventSQL = `
DELETE FROM federationsender_retry WHERE retry_nid = $1
`
const selectEventsForRetry = `
SELECT * FROM federationsender_retry WHERE retry_at >= $1 AND attempts < 5
`
const deleteExpiredEvents = `
DELETE FROM federationsender_retry WHERE attempts > 5
`
type retryStatements struct {
upsertEventStmt *sql.Stmt
deleteEventStmt *sql.Stmt
selectEventsForRetryStmt *sql.Stmt
deleteExpiredEventsStmt *sql.Stmt
}
func (s *retryStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(retrySchema)
if err != nil {
return
}
if s.upsertEventStmt, err = db.Prepare(upsertEventSQL); err != nil {
return
}
if s.deleteEventStmt, err = db.Prepare(deleteEventSQL); err != nil {
return
}
if s.selectEventsForRetryStmt, err = db.Prepare(selectEventsForRetry); err != nil {
return
}
if s.deleteExpiredEventsStmt, err = db.Prepare(deleteExpiredEvents); err != nil {
return
}
return
}
func (s *retryStatements) upsertRetryEvent(
ctx context.Context, txn *sql.Tx,
originServer string, destinationServer string, eventJSON []byte,
attempts int, retryAt int64,
) error {
_, err := common.TxStmt(txn, s.upsertEventStmt).ExecContext(
ctx, originServer, destinationServer,
eventJSON, attempts, retryAt,
)
return err
}
func (s *retryStatements) deleteRetryEvent(
ctx context.Context, txn *sql.Tx, retryNID int64,
) error {
_, err := common.TxStmt(txn, s.deleteEventStmt).ExecContext(
ctx, retryNID,
)
return err
}
func (s *retryStatements) selectRetryEventsPending(
ctx context.Context, txn *sql.Tx,
) ([]*types.PendingPDU, error) {
var pending []*types.PendingPDU
stmt := common.TxStmt(txn, s.selectEventsForRetryStmt)
rows, err := stmt.QueryContext(ctx, time.Now().UTC().Unix())
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var entry types.PendingPDU
if err = rows.Scan(
&entry.RetryNID, &entry.Origin, &entry.Destination,
&entry.PDU, &entry.Attempts, &entry.Attempts,
); err != nil {
return nil, err
}
pending = append(pending, &entry)
}
return pending, err
}
func (s *retryStatements) deleteRetryExpiredEvents(
ctx context.Context, txn *sql.Tx,
) error {
stmt := common.TxStmt(txn, s.deleteExpiredEventsStmt)
_, err := stmt.ExecContext(ctx)
return err
}

View file

@ -18,15 +18,19 @@ package postgres
import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib"
)
// Database stores information needed by the federation sender
type Database struct {
joinedHostsStatements
roomStatements
retryStatements
common.PartitionOffsetStatements
db *sql.DB
}
@ -55,6 +59,10 @@ func (d *Database) prepare() error {
return err
}
if err = d.retryStatements.prepare(d.db); err != nil {
return err
}
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
}
@ -120,3 +128,36 @@ func (d *Database) GetJoinedHosts(
) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID)
}
func (d *Database) QueueEventForRetry(
ctx context.Context,
originServer, destinationServer string, event gomatrixserverlib.Event,
attempts int, retryAt time.Time,
) error {
eventJSON, err := json.Marshal(event)
if err != nil {
return err
}
if err := d.upsertRetryEvent(
ctx, nil,
originServer, destinationServer,
eventJSON, attempts, retryAt.UTC().Unix(),
); err != nil {
return err
}
return nil
}
func (d *Database) DeleteRetryEvent(ctx context.Context, retryNID int64) error {
return d.deleteRetryEvent(ctx, nil, retryNID)
}
func (d *Database) SelectRetryEventsPending(ctx context.Context) (
[]*types.PendingPDU, error,
) {
return d.selectRetryEventsPending(ctx, nil)
}
func (d *Database) DeleteRetryExpiredEvents(ctx context.Context) error {
return d.deleteRetryExpiredEvents(ctx, nil)
}

View file

@ -17,16 +17,21 @@ package storage
import (
"context"
"net/url"
"time"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/storage/postgres"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
common.PartitionStorer
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
QueueEventForRetry(ctx context.Context, originServer, destinationServer string, event gomatrixserverlib.Event, attempts int, retryAt time.Time) error
SelectRetryEventsPending(ctx context.Context) ([]*types.PendingPDU, error)
DeleteRetryExpiredEvents(ctx context.Context) error
}
// NewDatabase opens a new database

View file

@ -16,6 +16,7 @@ package types
import (
"fmt"
"time"
"github.com/matrix-org/gomatrixserverlib"
)
@ -43,3 +44,21 @@ func (e EventIDMismatchError) Error() string {
e.DatabaseID, e.RoomServerID,
)
}
type Queueable struct {
RetryNID int64
Origin gomatrixserverlib.ServerName
Destination gomatrixserverlib.ServerName
Attempts int
LastAttempt time.Time
}
type PendingPDU struct {
Queueable
PDU *gomatrixserverlib.Event
}
type PendingEDU struct {
Queueable
EDU *gomatrixserverlib.EDU
}