Initial work on persistent queues

This commit is contained in:
Neil Alexander 2020-06-29 15:29:28 +01:00
parent 1ad7219e4b
commit e7997c1e97
9 changed files with 503 additions and 28 deletions

View file

@ -50,7 +50,8 @@ func NewInternalAPI(
statistics := &types.Statistics{} statistics := &types.Statistics{}
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
base.Cfg.Matrix.ServerName, federation, rsAPI, statistics, &queue.SigningInfo{ federationSenderDB, base.Cfg.Matrix.ServerName, federation, rsAPI, statistics,
&queue.SigningInfo{
KeyID: base.Cfg.Matrix.KeyID, KeyID: base.Cfg.Matrix.KeyID,
PrivateKey: base.Cfg.Matrix.PrivateKey, PrivateKey: base.Cfg.Matrix.PrivateKey,
ServerName: base.Cfg.Matrix.ServerName, ServerName: base.Cfg.Matrix.ServerName,

View file

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrix"
@ -34,6 +35,7 @@ import (
// ensures that only one request is in flight to a given destination // ensures that only one request is in flight to a given destination
// at a time. // at a time.
type destinationQueue struct { type destinationQueue struct {
db storage.Database
signing *SigningInfo signing *SigningInfo
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
client *gomatrixserverlib.FederationClient // federation client client *gomatrixserverlib.FederationClient // federation client
@ -45,7 +47,7 @@ type destinationQueue struct {
incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
lastTransactionIDs []gomatrixserverlib.TransactionID // last transaction ID transactionID gomatrixserverlib.TransactionID // last transaction ID
pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
@ -200,19 +202,49 @@ func (oq *destinationQueue) backgroundSend() {
// If we have pending PDUs or EDUs then construct a transaction. // If we have pending PDUs or EDUs then construct a transaction.
if numPDUs > 0 || numEDUs > 0 { if numPDUs > 0 || numEDUs > 0 {
// Generate a transaction ID.
if oq.transactionID == "" {
now := gomatrixserverlib.AsTimestamp(time.Now())
oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
}
// Try sending the next transaction and see what happens. // Try sending the next transaction and see what happens.
transaction, terr := oq.nextTransaction(oq.pendingPDUs, oq.pendingEDUs, oq.statistics.SuccessCount()) transaction, terr := oq.nextTransaction(oq.transactionID, oq.pendingPDUs, oq.pendingEDUs, oq.statistics.SuccessCount())
if terr != nil { if terr != nil {
// We failed to send the transaction. // We failed to send the transaction.
if giveUp := oq.statistics.Failure(); giveUp { giveUp := oq.statistics.Failure()
// TODO: commit the transaction to the database
if terr = oq.db.StoreFailedPDUs(
context.TODO(),
oq.transactionID,
oq.destination,
oq.pendingPDUs,
); terr != nil {
// We failed to persist the events to the database for some
// reason, so we'll keep them in memory for now. Hopefully
// it's a temporary condition but log it.
logrus.WithError(terr).Errorf("Failed to persist failed sends for server %q to database", oq.destination)
} else {
// Reallocate so that the underlying arrays can be GC'd, as
// opposed to growing forever.
for i := 0; i < numPDUs; i++ {
oq.pendingPDUs[i] = nil
}
oq.pendingPDUs = append(
[]*gomatrixserverlib.HeaderedEvent{},
oq.pendingPDUs[numPDUs:]...,
)
}
if giveUp {
// It's been suggested that we should give up because // It's been suggested that we should give up because
// the backoff has exceeded a maximum allowable value. // the backoff has exceeded a maximum allowable value.
return return
} }
} else if transaction { } else if transaction {
// If we successfully sent the transaction then clear out // If we successfully sent the transaction then clear out
// the pending events and EDUs. // the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success() oq.statistics.Success()
oq.transactionID = ""
// Reallocate so that the underlying arrays can be GC'd, as // Reallocate so that the underlying arrays can be GC'd, as
// opposed to growing forever. // opposed to growing forever.
for i := 0; i < numPDUs; i++ { for i := 0; i < numPDUs; i++ {
@ -262,6 +294,7 @@ func (oq *destinationQueue) backgroundSend() {
// queue and sends it. Returns true if a transaction was sent or // queue and sends it. Returns true if a transaction was sent or
// false otherwise. // false otherwise.
func (oq *destinationQueue) nextTransaction( func (oq *destinationQueue) nextTransaction(
transactionID gomatrixserverlib.TransactionID,
pendingPDUs []*gomatrixserverlib.HeaderedEvent, pendingPDUs []*gomatrixserverlib.HeaderedEvent,
pendingEDUs []*gomatrixserverlib.EDU, pendingEDUs []*gomatrixserverlib.EDU,
sentCounter uint32, sentCounter uint32,
@ -270,17 +303,12 @@ func (oq *destinationQueue) nextTransaction(
PDUs: []json.RawMessage{}, PDUs: []json.RawMessage{},
EDUs: []gomatrixserverlib.EDU{}, EDUs: []gomatrixserverlib.EDU{},
} }
now := gomatrixserverlib.AsTimestamp(time.Now()) t.TransactionID = transactionID
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, sentCounter))
t.Origin = oq.origin t.Origin = oq.origin
t.Destination = oq.destination t.Destination = oq.destination
t.OriginServerTS = now t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now())
t.PreviousIDs = oq.lastTransactionIDs
if t.PreviousIDs == nil {
t.PreviousIDs = []gomatrixserverlib.TransactionID{}
}
oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} oq.transactionID = t.TransactionID
for _, pdu := range pendingPDUs { for _, pdu := range pendingPDUs {
// Append the JSON of the event, since this is a json.RawMessage type in the // Append the JSON of the event, since this is a json.RawMessage type in the

View file

@ -19,6 +19,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -29,6 +30,7 @@ import (
// OutgoingQueues is a collection of queues for sending transactions to other // OutgoingQueues is a collection of queues for sending transactions to other
// matrix servers // matrix servers
type OutgoingQueues struct { type OutgoingQueues struct {
db storage.Database
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
origin gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient client *gomatrixserverlib.FederationClient
@ -40,6 +42,7 @@ type OutgoingQueues struct {
// NewOutgoingQueues makes a new OutgoingQueues // NewOutgoingQueues makes a new OutgoingQueues
func NewOutgoingQueues( func NewOutgoingQueues(
db storage.Database,
origin gomatrixserverlib.ServerName, origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient, client *gomatrixserverlib.FederationClient,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
@ -47,6 +50,7 @@ func NewOutgoingQueues(
signing *SigningInfo, signing *SigningInfo,
) *OutgoingQueues { ) *OutgoingQueues {
return &OutgoingQueues{ return &OutgoingQueues{
db: db,
rsAPI: rsAPI, rsAPI: rsAPI,
origin: origin, origin: origin,
client: client, client: client,
@ -76,6 +80,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
oq := oqs.queues[destination] oq := oqs.queues[destination]
if oq == nil { if oq == nil {
oq = &destinationQueue{ oq = &destinationQueue{
db: oqs.db,
rsAPI: oqs.rsAPI, rsAPI: oqs.rsAPI,
origin: oqs.origin, origin: oqs.origin,
destination: destination, destination: destination,

View file

@ -19,10 +19,13 @@ import (
"github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/gomatrixserverlib"
) )
type Database interface { type Database interface {
internal.PartitionStorer internal.PartitionStorer
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
GetFailedPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName) ([]*gomatrixserverlib.HeaderedEvent, error)
StoreFailedPDUs(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, pdus []*gomatrixserverlib.HeaderedEvent) error
} }

View file

@ -0,0 +1,167 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const queueRetrySchema = `
-- The queue_retry table contains events that we failed to
-- send to a destination host, such that we can try them again
-- later.
CREATE TABLE IF NOT EXISTS federationsender_queue_retry (
-- The string ID of the room.
transaction_id TEXT NOT NULL,
-- The event type: "pdu", "invite", "send_to_device".
send_type TEXT NOT NULL,
-- The event ID of the m.room.member join event.
event_id TEXT NOT NULL,
-- The origin server TS of the event.
origin_server_ts BIGINT NOT NULL,
-- The domain part of the user ID the m.room.member event is for.
server_name TEXT NOT NULL,
-- The JSON body.
json_body BYTEA NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_retry_event_id_idx
ON federationsender_queue_retry (event_id);
`
const insertRetrySQL = "" +
"INSERT INTO federationsender_queue_retry (transaction_id, send_type, event_id, origin_server_ts, server_name, json_body)" +
" VALUES ($1, $2, $3, $4, $5, $6)"
const deleteRetrySQL = "" +
"DELETE FROM federationsender_queue_retry WHERE event_id = ANY($1)"
const selectRetryNextTransactionIDSQL = "" +
"SELECT transaction_id FROM federationsender_queue_retry" +
" WHERE server_name = $1 AND send_type = $2" +
" ORDER BY transaction_id ASC" +
" LIMIT 1"
const selectRetryPDUsByTransactionSQL = "" +
"SELECT event_id, server_name, origin_server_ts, json_body FROM federationsender_queue_retry" +
" WHERE server_name = $1 AND send_type = $2 AND transaction_id = $3" +
" LIMIT 50"
type queueRetryStatements struct {
insertRetryStmt *sql.Stmt
deleteRetryStmt *sql.Stmt
selectRetryNextTransactionIDStmt *sql.Stmt
selectRetryPDUsByTransactionStmt *sql.Stmt
}
func (s *queueRetryStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(queueRetrySchema)
if err != nil {
return
}
if s.insertRetryStmt, err = db.Prepare(insertRetrySQL); err != nil {
return
}
if s.deleteRetryStmt, err = db.Prepare(deleteRetrySQL); err != nil {
return
}
if s.selectRetryNextTransactionIDStmt, err = db.Prepare(selectRetryNextTransactionIDSQL); err != nil {
return
}
if s.selectRetryPDUsByTransactionStmt, err = db.Prepare(selectRetryPDUsByTransactionSQL); err != nil {
return
}
return
}
func (s *queueRetryStatements) insertQueueRetry(
ctx context.Context,
txn *sql.Tx,
transactionID string,
sendtype string,
event gomatrixserverlib.Event,
serverName gomatrixserverlib.ServerName,
) error {
stmt := sqlutil.TxStmt(txn, s.insertRetryStmt)
_, err := stmt.ExecContext(
ctx,
transactionID, // the transaction ID that we initially attempted
sendtype, // either "pdu", "invite", "send_to_device"
event.EventID(), // the event ID
event.OriginServerTS(), // the event origin server TS
serverName, // destination server name
event.JSON(), // JSON body
)
return err
}
func (s *queueRetryStatements) deleteQueueRetry(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteRetryStmt)
_, err := stmt.ExecContext(ctx, pq.StringArray(eventIDs))
return err
}
func (s *queueRetryStatements) selectRetryNextTransactionID(
ctx context.Context, txn *sql.Tx, serverName, sendType string,
) (string, error) {
var transactionID string
stmt := sqlutil.TxStmt(txn, s.selectRetryNextTransactionIDStmt)
err := stmt.QueryRowContext(ctx, serverName, types.FailedEventTypePDU).Scan(&transactionID)
return transactionID, err
}
func (s *queueRetryStatements) selectQueueRetryPDUs(
ctx context.Context, txn *sql.Tx, serverName string, transactionID string,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectRetryPDUsByTransactionStmt)
rows, err := stmt.QueryContext(ctx, serverName, types.FailedEventTypePDU, transactionID)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "queueRetryFromStmt: rows.close() failed")
var result []*gomatrixserverlib.HeaderedEvent
for rows.Next() {
var transactionID, eventID string
var originServerTS int64
var jsonBody []byte
if err = rows.Scan(&transactionID, &eventID, &originServerTS, &jsonBody); err != nil {
return nil, err
}
var event gomatrixserverlib.HeaderedEvent
if err = json.Unmarshal(jsonBody, &event); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %w", err)
}
if event.EventID() != eventID {
return nil, fmt.Errorf("event ID %q doesn't match expected %q", event.EventID(), eventID)
}
result = append(result, &event)
}
return result, rows.Err()
}

View file

@ -18,15 +18,18 @@ package postgres
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
"github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
) )
// Database stores information needed by the federation sender // Database stores information needed by the federation sender
type Database struct { type Database struct {
joinedHostsStatements joinedHostsStatements
roomStatements roomStatements
queueRetryStatements
sqlutil.PartitionOffsetStatements sqlutil.PartitionOffsetStatements
db *sql.DB db *sql.DB
} }
@ -55,6 +58,10 @@ func (d *Database) prepare() error {
return err return err
} }
if err = d.queueRetryStatements.prepare(d.db); err != nil {
return err
}
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender") return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
} }
@ -120,3 +127,45 @@ func (d *Database) GetJoinedHosts(
) ([]types.JoinedHost, error) { ) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID) return d.selectJoinedHosts(ctx, roomID)
} }
// GetFailedPDUs retrieves PDUs that we have failed to send on
// a specific destination queue.
func (d *Database) GetFailedPDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
transactionID, err := d.selectRetryNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU)
if err != nil {
return nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err)
}
events, err := d.selectQueueRetryPDUs(ctx, nil, string(serverName), transactionID)
if err != nil {
return nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err)
}
return events, nil
}
// StoreFailedPDUs stores PDUs that we have failed to send on
// a specific destination queue.
func (d *Database) StoreFailedPDUs(
ctx context.Context,
transactionID gomatrixserverlib.TransactionID,
serverName gomatrixserverlib.ServerName,
pdus []*gomatrixserverlib.HeaderedEvent,
) error {
for _, pdu := range pdus {
if _, err := d.insertRetryStmt.ExecContext(
ctx,
string(transactionID), // transaction ID
types.FailedEventTypePDU, // type of event that was queued
pdu.EventID(), // event ID
pdu.OriginServerTS(), // event origin server TS
string(serverName), // destination server name
pdu.JSON(), // JSON body
); err != nil {
return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err)
}
}
return nil
}

View file

@ -0,0 +1,167 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const queueRetrySchema = `
-- The queue_retry table contains events that we failed to
-- send to a destination host, such that we can try them again
-- later.
CREATE TABLE IF NOT EXISTS federationsender_queue_retry (
-- The string ID of the room.
transaction_id TEXT NOT NULL,
-- The event type: "pdu", "invite", "send_to_device".
send_type TEXT NOT NULL,
-- The event ID of the m.room.member join event.
event_id TEXT NOT NULL,
-- The origin server TS of the event.
origin_server_ts BIGINT NOT NULL,
-- The domain part of the user ID the m.room.member event is for.
server_name TEXT NOT NULL,
-- The JSON body.
json_body BYTEA NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_retry_event_id_idx
ON federationsender_queue_retry (event_id);
`
const insertRetrySQL = "" +
"INSERT INTO federationsender_queue_retry (transaction_id, send_type, event_id, origin_server_ts, server_name, json_body)" +
" VALUES ($1, $2, $3, $4, $5, $6)"
const deleteRetrySQL = "" +
"DELETE FROM federationsender_queue_retry WHERE event_id = ANY($1)"
const selectRetryNextTransactionIDSQL = "" +
"SELECT transaction_id FROM federationsender_queue_retry" +
" WHERE server_name = $1 AND send_type = $2" +
" ORDER BY transaction_id ASC" +
" LIMIT 1"
const selectRetryPDUsByTransactionSQL = "" +
"SELECT event_id, server_name, origin_server_ts, json_body FROM federationsender_queue_retry" +
" WHERE server_name = $1 AND send_type = $2 AND transaction_id = $3" +
" LIMIT 50"
type queueRetryStatements struct {
insertRetryStmt *sql.Stmt
deleteRetryStmt *sql.Stmt
selectRetryNextTransactionIDStmt *sql.Stmt
selectRetryPDUsByTransactionStmt *sql.Stmt
}
func (s *queueRetryStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(queueRetrySchema)
if err != nil {
return
}
if s.insertRetryStmt, err = db.Prepare(insertRetrySQL); err != nil {
return
}
if s.deleteRetryStmt, err = db.Prepare(deleteRetrySQL); err != nil {
return
}
if s.selectRetryNextTransactionIDStmt, err = db.Prepare(selectRetryNextTransactionIDSQL); err != nil {
return
}
if s.selectRetryPDUsByTransactionStmt, err = db.Prepare(selectRetryPDUsByTransactionSQL); err != nil {
return
}
return
}
func (s *queueRetryStatements) insertQueueRetry(
ctx context.Context,
txn *sql.Tx,
transactionID string,
sendtype string,
event gomatrixserverlib.Event,
serverName gomatrixserverlib.ServerName,
) error {
stmt := sqlutil.TxStmt(txn, s.insertRetryStmt)
_, err := stmt.ExecContext(
ctx,
transactionID, // the transaction ID that we initially attempted
sendtype, // either "pdu", "invite", "send_to_device"
event.EventID(), // the event ID
event.OriginServerTS(), // the event origin server TS
serverName, // destination server name
event.JSON(), // JSON body
)
return err
}
func (s *queueRetryStatements) deleteQueueRetry(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteRetryStmt)
_, err := stmt.ExecContext(ctx, pq.StringArray(eventIDs))
return err
}
func (s *queueRetryStatements) selectRetryNextTransactionID(
ctx context.Context, txn *sql.Tx, serverName, sendType string,
) (string, error) {
var transactionID string
stmt := sqlutil.TxStmt(txn, s.selectRetryNextTransactionIDStmt)
err := stmt.QueryRowContext(ctx, serverName, types.FailedEventTypePDU).Scan(&transactionID)
return transactionID, err
}
func (s *queueRetryStatements) selectQueueRetryPDUs(
ctx context.Context, txn *sql.Tx, serverName string, transactionID string,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectRetryPDUsByTransactionStmt)
rows, err := stmt.QueryContext(ctx, serverName, types.FailedEventTypePDU, transactionID)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "queueRetryFromStmt: rows.close() failed")
var result []*gomatrixserverlib.HeaderedEvent
for rows.Next() {
var transactionID, eventID string
var originServerTS int64
var jsonBody []byte
if err = rows.Scan(&transactionID, &eventID, &originServerTS, &jsonBody); err != nil {
return nil, err
}
var event gomatrixserverlib.HeaderedEvent
if err = json.Unmarshal(jsonBody, &event); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %w", err)
}
if event.EventID() != eventID {
return nil, fmt.Errorf("event ID %q doesn't match expected %q", event.EventID(), eventID)
}
result = append(result, &event)
}
return result, rows.Err()
}

View file

@ -18,17 +18,20 @@ package sqlite3
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
) )
// Database stores information needed by the federation sender // Database stores information needed by the federation sender
type Database struct { type Database struct {
joinedHostsStatements joinedHostsStatements
roomStatements roomStatements
queueRetryStatements
sqlutil.PartitionOffsetStatements sqlutil.PartitionOffsetStatements
db *sql.DB db *sql.DB
} }
@ -61,6 +64,10 @@ func (d *Database) prepare() error {
return err return err
} }
if err = d.queueRetryStatements.prepare(d.db); err != nil {
return err
}
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender") return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
} }
@ -126,3 +133,45 @@ func (d *Database) GetJoinedHosts(
) ([]types.JoinedHost, error) { ) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID) return d.selectJoinedHosts(ctx, roomID)
} }
// GetFailedPDUs retrieves PDUs that we have failed to send on
// a specific destination queue.
func (d *Database) GetFailedPDUs(
ctx context.Context,
serverName gomatrixserverlib.ServerName,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
transactionID, err := d.selectRetryNextTransactionID(ctx, nil, string(serverName), types.FailedEventTypePDU)
if err != nil {
return nil, fmt.Errorf("d.selectRetryNextTransactionID: %w", err)
}
events, err := d.selectQueueRetryPDUs(ctx, nil, string(serverName), transactionID)
if err != nil {
return nil, fmt.Errorf("d.selectQueueRetryPDUs: %w", err)
}
return events, nil
}
// StoreFailedPDUs stores PDUs that we have failed to send on
// a specific destination queue.
func (d *Database) StoreFailedPDUs(
ctx context.Context,
transactionID gomatrixserverlib.TransactionID,
serverName gomatrixserverlib.ServerName,
pdus []*gomatrixserverlib.HeaderedEvent,
) error {
for _, pdu := range pdus {
if _, err := d.insertRetryStmt.ExecContext(
ctx,
string(transactionID), // transaction ID
types.FailedEventTypePDU, // type of event that was queued
pdu.EventID(), // event ID
pdu.OriginServerTS(), // event origin server TS
string(serverName), // destination server name
pdu.JSON(), // JSON body
); err != nil {
return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err)
}
}
return nil
}

View file

@ -20,6 +20,12 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
const (
FailedEventTypePDU = "pdu"
FailedEventTypeInvite = "invite"
FailedEventTypeSendToDevice = "send_to_device"
)
// A JoinedHost is a server that is joined to a matrix room. // A JoinedHost is a server that is joined to a matrix room.
type JoinedHost struct { type JoinedHost struct {
// The MemberEventID of a m.room.member join event. // The MemberEventID of a m.room.member join event.