Refactor federation sender, again

This commit is contained in:
Neil Alexander 2020-12-04 16:22:01 +00:00
parent 5d65a879a5
commit bfcf1307e8
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
9 changed files with 176 additions and 221 deletions

View file

@ -55,15 +55,15 @@ type destinationQueue struct {
transactionIDMutex sync.Mutex // protects transactionID transactionIDMutex sync.Mutex // protects transactionID
transactionID gomatrixserverlib.TransactionID // last transaction ID transactionID gomatrixserverlib.TransactionID // last transaction ID
transactionCount atomic.Int32 // how many events in this transaction so far transactionCount atomic.Int32 // how many events in this transaction so far
notifyPDUs chan bool // interrupts idle wait for PDUs notifyPDUs chan *queuedPDU // interrupts idle wait for PDUs
notifyEDUs chan bool // interrupts idle wait for EDUs notifyEDUs chan *queuedEDU // interrupts idle wait for EDUs
interruptBackoff chan bool // interrupts backoff interruptBackoff chan bool // interrupts backoff
} }
// Send event adds the event to the pending queue for the destination. // Send event adds the event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to // If the queue is empty then it starts a background goroutine to
// start sending events to that destination. // start sending events to that destination.
func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) { func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, receipt *shared.Receipt) {
// Create a transaction ID. We'll either do this if we don't have // Create a transaction ID. We'll either do this if we don't have
// one made up yet, or if we've exceeded the number of maximum // one made up yet, or if we've exceeded the number of maximum
// events allowed in a single tranaction. We'll reset the counter // events allowed in a single tranaction. We'll reset the counter
@ -84,7 +84,7 @@ func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
oq.destination, // the destination server name oq.destination, // the destination server name
receipt, // NIDs from federationsender_queue_json table receipt, // NIDs from federationsender_queue_json table
); err != nil { ); err != nil {
log.WithError(err).Errorf("failed to associate PDU receipt %q with destination %q", receipt.String(), oq.destination) log.WithError(err).Errorf("failed to associate PDU %q with destination %q", event.EventID(), oq.destination)
return return
} }
// We've successfully added a PDU to the transaction so increase // We've successfully added a PDU to the transaction so increase
@ -98,7 +98,10 @@ func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
// If we're blocking on waiting PDUs then tell the queue that we // If we're blocking on waiting PDUs then tell the queue that we
// have work to do. // have work to do.
select { select {
case oq.notifyPDUs <- true: case oq.notifyPDUs <- &queuedPDU{
receipt: receipt,
pdu: event,
}:
default: default:
} }
} }
@ -107,7 +110,7 @@ func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
// sendEDU adds the EDU event to the pending queue for the destination. // sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to // If the queue is empty then it starts a background goroutine to
// start sending events to that destination. // start sending events to that destination.
func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) { func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *shared.Receipt) {
// Create a database entry that associates the given PDU NID with // Create a database entry that associates the given PDU NID with
// this destination queue. We'll then be able to retrieve the PDU // this destination queue. We'll then be able to retrieve the PDU
// later. // later.
@ -116,7 +119,7 @@ func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
oq.destination, // the destination server name oq.destination, // the destination server name
receipt, // NIDs from federationsender_queue_json table receipt, // NIDs from federationsender_queue_json table
); err != nil { ); err != nil {
log.WithError(err).Errorf("failed to associate EDU receipt %q with destination %q", receipt.String(), oq.destination) log.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination)
return return
} }
// We've successfully added an EDU to the transaction so increase // We've successfully added an EDU to the transaction so increase
@ -130,7 +133,10 @@ func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
// If we're blocking on waiting EDUs then tell the queue that we // If we're blocking on waiting EDUs then tell the queue that we
// have work to do. // have work to do.
select { select {
case oq.notifyEDUs <- true: case oq.notifyEDUs <- &queuedEDU{
receipt: receipt,
edu: event,
}:
default: default:
} }
} }
@ -149,51 +155,33 @@ func (oq *destinationQueue) wakeQueueIfNeeded() {
if !oq.running.Load() { if !oq.running.Load() {
// Start the queue. // Start the queue.
go oq.backgroundSend() go oq.backgroundSend()
}
}
// waitForPDUs returns a channel for pending PDUs, which will be // Check to see if there's anything to do for this server
// used in backgroundSend select. It returns a closed channel if // in the database.
// there is something pending right now, or an open channel if ctx := context.Background()
// we're waiting for something. go func(ctx context.Context) {
func (oq *destinationQueue) waitForPDUs() chan bool { if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, maxPDUsPerTransaction); err == nil {
pendingPDUs, err := oq.db.GetPendingPDUCount(context.TODO(), oq.destination) for receipt, pdu := range pdus {
if err != nil { select {
log.WithError(err).Errorf("Failed to get pending PDU count on queue %q", oq.destination) case oq.notifyPDUs <- &queuedPDU{receipt, pdu}:
default:
return
} }
// If there are PDUs pending right now then we'll return a closed
// channel. This will mean that the backgroundSend will not block.
if pendingPDUs > 0 {
ch := make(chan bool, 1)
close(ch)
return ch
} }
// If there are no PDUs pending right now then instead we'll return
// the notify channel, so that backgroundSend can pick up normal
// notifications from sendEvent.
return oq.notifyPDUs
} }
}(ctx)
// waitForEDUs returns a channel for pending EDUs, which will be go func(ctx context.Context) {
// used in backgroundSend select. It returns a closed channel if if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, maxEDUsPerTransaction); err == nil {
// there is something pending right now, or an open channel if for receipt, edu := range edus {
// we're waiting for something. select {
func (oq *destinationQueue) waitForEDUs() chan bool { case oq.notifyEDUs <- &queuedEDU{receipt, edu}:
pendingEDUs, err := oq.db.GetPendingEDUCount(context.TODO(), oq.destination) default:
if err != nil { return
log.WithError(err).Errorf("Failed to get pending EDU count on queue %q", oq.destination)
} }
// If there are EDUs pending right now then we'll return a closed
// channel. This will mean that the backgroundSend will not block.
if pendingEDUs > 0 {
ch := make(chan bool, 1)
close(ch)
return ch
} }
// If there are no EDUs pending right now then instead we'll return }
// the notify channel, so that backgroundSend can pick up normal }(ctx)
// notifications from sendEvent. }
return oq.notifyEDUs
} }
// backgroundSend is the worker goroutine for sending events. // backgroundSend is the worker goroutine for sending events.
@ -206,20 +194,41 @@ func (oq *destinationQueue) backgroundSend() {
} }
defer oq.running.Store(false) defer oq.running.Store(false)
for { pendingPDUs := []*queuedPDU{}
pendingPDUs, pendingEDUs := false, false pendingEDUs := []*queuedEDU{}
for {
// If we have nothing to do then wait either for incoming events, or // If we have nothing to do then wait either for incoming events, or
// until we hit an idle timeout. // until we hit an idle timeout.
select { select {
case <-oq.waitForPDUs(): case pdu := <-oq.notifyPDUs:
// We were woken up because there are new PDUs waiting in the // We were woken up because there are new PDUs waiting in the
// database. // database.
pendingPDUs = true pendingPDUs = append(pendingPDUs, pdu)
case <-oq.waitForEDUs(): pendingPDULoop:
for i := 1; i < maxPDUsPerTransaction; i++ {
select {
case edu := <-oq.notifyEDUs:
pendingEDUs = append(pendingEDUs, edu)
default:
break pendingPDULoop
}
}
case edu := <-oq.notifyEDUs:
// We were woken up because there are new PDUs waiting in the // We were woken up because there are new PDUs waiting in the
// database. // database.
pendingEDUs = true pendingEDUs = append(pendingEDUs, edu)
pendingEDULoop:
for i := 1; i < maxEDUsPerTransaction; i++ {
select {
case edu := <-oq.notifyEDUs:
pendingEDUs = append(pendingEDUs, edu)
default:
break pendingEDULoop
}
}
case <-time.After(queueIdleTimeout): case <-time.After(queueIdleTimeout):
// The worker is idle so stop the goroutine. It'll get // The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to // restarted automatically the next time we have an event to
@ -251,9 +260,8 @@ func (oq *destinationQueue) backgroundSend() {
} }
// If we have pending PDUs or EDUs then construct a transaction. // If we have pending PDUs or EDUs then construct a transaction.
if pendingPDUs || pendingEDUs {
// Try sending the next transaction and see what happens. // Try sending the next transaction and see what happens.
transaction, terr := oq.nextTransaction() transaction, terr := oq.nextTransaction(pendingPDUs, pendingEDUs)
if terr != nil { if terr != nil {
// We failed to send the transaction. Mark it as a failure. // We failed to send the transaction. Mark it as a failure.
oq.statistics.Failure() oq.statistics.Failure()
@ -261,7 +269,8 @@ func (oq *destinationQueue) backgroundSend() {
// If we successfully sent the transaction then clear out // If we successfully sent the transaction then clear out
// the pending events and EDUs, and wipe our transaction ID. // the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success() oq.statistics.Success()
} pendingPDUs = pendingPDUs[:0]
pendingEDUs = pendingEDUs[:0]
} }
} }
} }
@ -270,7 +279,10 @@ func (oq *destinationQueue) backgroundSend() {
// queue and sends it. Returns true if a transaction was sent or // queue and sends it. Returns true if a transaction was sent or
// false otherwise. // false otherwise.
// nolint:gocyclo // nolint:gocyclo
func (oq *destinationQueue) nextTransaction() (bool, error) { func (oq *destinationQueue) nextTransaction(
pdus []*queuedPDU,
edus []*queuedEDU,
) (bool, error) {
// Before we do anything, we need to roll over the transaction // Before we do anything, we need to roll over the transaction
// ID that is being used to coalesce events into the next TX. // ID that is being used to coalesce events into the next TX.
// Otherwise it's possible that we'll pick up an incomplete // Otherwise it's possible that we'll pick up an incomplete
@ -290,31 +302,6 @@ func (oq *destinationQueue) nextTransaction() (bool, error) {
t.Destination = oq.destination t.Destination = oq.destination
t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now()) t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now())
// Ask the database for any pending PDUs from the next transaction.
// maxPDUsPerTransaction is an upper limit but we probably won't
// actually retrieve that many events.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
txid, pdus, pduReceipt, err := oq.db.GetNextTransactionPDUs(
ctx, // context
oq.destination, // server name
maxPDUsPerTransaction, // max events to retrieve
)
if err != nil {
log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination)
return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err)
}
edus, eduReceipt, err := oq.db.GetNextTransactionEDUs(
ctx, // context
oq.destination, // server name
maxEDUsPerTransaction, // max events to retrieve
)
if err != nil {
log.WithError(err).Errorf("failed to get next transaction EDUs for server %q", oq.destination)
return false, fmt.Errorf("oq.db.GetNextTransactionEDUs: %w", err)
}
// If we didn't get anything from the database and there are no // If we didn't get anything from the database and there are no
// pending EDUs then there's nothing to do - stop here. // pending EDUs then there's nothing to do - stop here.
if len(pdus) == 0 && len(edus) == 0 { if len(pdus) == 0 && len(edus) == 0 {
@ -324,23 +311,27 @@ func (oq *destinationQueue) nextTransaction() (bool, error) {
// Pick out the transaction ID from the database. If we didn't // Pick out the transaction ID from the database. If we didn't
// get a transaction ID (i.e. because there are no PDUs but only // get a transaction ID (i.e. because there are no PDUs but only
// EDUs) then generate a transaction ID. // EDUs) then generate a transaction ID.
t.TransactionID = txid
if t.TransactionID == "" { if t.TransactionID == "" {
now := gomatrixserverlib.AsTimestamp(time.Now()) now := gomatrixserverlib.AsTimestamp(time.Now())
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount())) t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
} }
var pduReceipts []*shared.Receipt
var eduReceipts []*shared.Receipt
// Go through PDUs that we retrieved from the database, if any, // Go through PDUs that we retrieved from the database, if any,
// and add them into the transaction. // and add them into the transaction.
for _, pdu := range pdus { for _, pdu := range pdus {
// Append the JSON of the event, since this is a json.RawMessage type in the // Append the JSON of the event, since this is a json.RawMessage type in the
// gomatrixserverlib.Transaction struct // gomatrixserverlib.Transaction struct
t.PDUs = append(t.PDUs, (*pdu).JSON()) t.PDUs = append(t.PDUs, pdu.pdu.JSON())
pduReceipts = append(pduReceipts, pdu.receipt)
} }
// Do the same for pending EDUS in the queue. // Do the same for pending EDUS in the queue.
for _, edu := range edus { for _, edu := range edus {
t.EDUs = append(t.EDUs, *edu) t.EDUs = append(t.EDUs, *edu.edu)
eduReceipts = append(pduReceipts, edu.receipt)
} }
logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
@ -349,22 +340,22 @@ func (oq *destinationQueue) nextTransaction() (bool, error) {
// TODO: we should check for 500-ish fails vs 400-ish here, // TODO: we should check for 500-ish fails vs 400-ish here,
// since we shouldn't queue things indefinitely in response // since we shouldn't queue things indefinitely in response
// to a 400-ish error // to a 400-ish error
ctx, cancel = context.WithTimeout(context.Background(), time.Minute*5) ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel() defer cancel()
_, err = oq.client.SendTransaction(ctx, t) _, err := oq.client.SendTransaction(ctx, t)
switch err.(type) { switch err.(type) {
case nil: case nil:
// Clean up the transaction in the database. // Clean up the transaction in the database.
if pduReceipt != nil { if pduReceipts != nil {
//logrus.Infof("Cleaning PDUs %q", pduReceipt.String()) //logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipt); err != nil { if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipts); err != nil {
log.WithError(err).Errorf("failed to clean PDUs %q for server %q", pduReceipt.String(), t.Destination) log.WithError(err).Errorf("failed to clean PDUs for server %q", t.Destination)
} }
} }
if eduReceipt != nil { if eduReceipts != nil {
//logrus.Infof("Cleaning EDUs %q", eduReceipt.String()) //logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipt); err != nil { if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipts); err != nil {
log.WithError(err).Errorf("failed to clean EDUs %q for server %q", eduReceipt.String(), t.Destination) log.WithError(err).Errorf("failed to clean EDUs for server %q", t.Destination)
} }
} }
return true, nil return true, nil

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/statistics"
"github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -100,6 +101,16 @@ type SigningInfo struct {
PrivateKey ed25519.PrivateKey PrivateKey ed25519.PrivateKey
} }
type queuedPDU struct {
receipt *shared.Receipt
pdu *gomatrixserverlib.HeaderedEvent
}
type queuedEDU struct {
receipt *shared.Receipt
edu *gomatrixserverlib.EDU
}
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue { func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
oqs.queuesMutex.Lock() oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock() defer oqs.queuesMutex.Unlock()
@ -112,8 +123,8 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
destination: destination, destination: destination,
client: oqs.client, client: oqs.client,
statistics: oqs.statistics.ForServer(destination), statistics: oqs.statistics.ForServer(destination),
notifyPDUs: make(chan bool, 1), notifyPDUs: make(chan *queuedPDU, 128),
notifyEDUs: make(chan bool, 1), notifyEDUs: make(chan *queuedEDU, 128),
interruptBackoff: make(chan bool), interruptBackoff: make(chan bool),
signing: oqs.signing, signing: oqs.signing,
} }
@ -188,7 +199,7 @@ func (oqs *OutgoingQueues) SendEvent(
} }
for destination := range destmap { for destination := range destmap {
oqs.getQueue(destination).sendEvent(nid) oqs.getQueue(destination).sendEvent(ev, nid)
} }
return nil return nil
@ -258,7 +269,7 @@ func (oqs *OutgoingQueues) SendEDU(
} }
for destination := range destmap { for destination := range destmap {
oqs.getQueue(destination).sendEDU(nid) oqs.getQueue(destination).sendEDU(e, nid)
} }
return nil return nil

View file

@ -36,14 +36,14 @@ type Database interface {
StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) StoreJSON(ctx context.Context, js string) (*shared.Receipt, error)
GetPendingPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (pdus map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent, err error)
GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[*shared.Receipt]*gomatrixserverlib.EDU, err error)
AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
GetNextTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, *shared.Receipt, error) CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error
GetNextTransactionEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) ([]*gomatrixserverlib.EDU, *shared.Receipt, error) CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error
CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error
GetPendingPDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error) GetPendingPDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error)
GetPendingEDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error) GetPendingEDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error)

View file

@ -45,13 +45,7 @@ const insertQueuePDUSQL = "" +
const deleteQueuePDUSQL = "" + const deleteQueuePDUSQL = "" +
"DELETE FROM federationsender_queue_pdus WHERE server_name = $1 AND json_nid = ANY($2)" "DELETE FROM federationsender_queue_pdus WHERE server_name = $1 AND json_nid = ANY($2)"
const selectQueuePDUNextTransactionIDSQL = "" + const selectQueuePDUsSQL = "" +
"SELECT transaction_id FROM federationsender_queue_pdus" +
" WHERE server_name = $1" +
" ORDER BY transaction_id ASC" +
" LIMIT 1"
const selectQueuePDUsByTransactionSQL = "" +
"SELECT json_nid FROM federationsender_queue_pdus" + "SELECT json_nid FROM federationsender_queue_pdus" +
" WHERE server_name = $1 AND transaction_id = $2" + " WHERE server_name = $1 AND transaction_id = $2" +
" LIMIT $3" " LIMIT $3"
@ -71,8 +65,7 @@ type queuePDUsStatements struct {
db *sql.DB db *sql.DB
insertQueuePDUStmt *sql.Stmt insertQueuePDUStmt *sql.Stmt
deleteQueuePDUsStmt *sql.Stmt deleteQueuePDUsStmt *sql.Stmt
selectQueuePDUNextTransactionIDStmt *sql.Stmt selectQueuePDUsStmt *sql.Stmt
selectQueuePDUsByTransactionStmt *sql.Stmt
selectQueuePDUReferenceJSONCountStmt *sql.Stmt selectQueuePDUReferenceJSONCountStmt *sql.Stmt
selectQueuePDUsCountStmt *sql.Stmt selectQueuePDUsCountStmt *sql.Stmt
selectQueuePDUServerNamesStmt *sql.Stmt selectQueuePDUServerNamesStmt *sql.Stmt
@ -92,10 +85,7 @@ func NewPostgresQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) {
if s.deleteQueuePDUsStmt, err = s.db.Prepare(deleteQueuePDUSQL); err != nil { if s.deleteQueuePDUsStmt, err = s.db.Prepare(deleteQueuePDUSQL); err != nil {
return return
} }
if s.selectQueuePDUNextTransactionIDStmt, err = s.db.Prepare(selectQueuePDUNextTransactionIDSQL); err != nil { if s.selectQueuePDUsStmt, err = s.db.Prepare(selectQueuePDUsSQL); err != nil {
return
}
if s.selectQueuePDUsByTransactionStmt, err = s.db.Prepare(selectQueuePDUsByTransactionSQL); err != nil {
return return
} }
if s.selectQueuePDUReferenceJSONCountStmt, err = s.db.Prepare(selectQueuePDUReferenceJSONCountSQL); err != nil { if s.selectQueuePDUReferenceJSONCountStmt, err = s.db.Prepare(selectQueuePDUReferenceJSONCountSQL); err != nil {
@ -137,18 +127,6 @@ func (s *queuePDUsStatements) DeleteQueuePDUs(
return err return err
} }
func (s *queuePDUsStatements) SelectQueuePDUNextTransactionID(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) (gomatrixserverlib.TransactionID, error) {
var transactionID gomatrixserverlib.TransactionID
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUNextTransactionIDStmt)
err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID)
if err == sql.ErrNoRows {
return "", nil
}
return transactionID, err
}
func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount( func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount(
ctx context.Context, txn *sql.Tx, jsonNID int64, ctx context.Context, txn *sql.Tx, jsonNID int64,
) (int64, error) { ) (int64, error) {
@ -182,11 +160,10 @@ func (s *queuePDUsStatements) SelectQueuePDUCount(
func (s *queuePDUsStatements) SelectQueuePDUs( func (s *queuePDUsStatements) SelectQueuePDUs(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
transactionID gomatrixserverlib.TransactionID,
limit int, limit int,
) ([]int64, error) { ) ([]int64, error) {
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt)
rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit) rows, err := stmt.QueryContext(ctx, serverName, limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -17,7 +17,6 @@ package shared
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/json"
"fmt" "fmt"
"github.com/matrix-org/dendrite/federationsender/storage/tables" "github.com/matrix-org/dendrite/federationsender/storage/tables"
@ -44,16 +43,7 @@ type Database struct {
// to pass them back so that we can clean up if the transaction sends // to pass them back so that we can clean up if the transaction sends
// successfully. // successfully.
type Receipt struct { type Receipt struct {
nids []int64 nid int64
}
func (e *Receipt) Empty() bool {
return len(e.nids) == 0
}
func (e *Receipt) String() string {
j, _ := json.Marshal(e.nids)
return string(j)
} }
// UpdateRoom updates the joined hosts for a room and returns what the joined // UpdateRoom updates the joined hosts for a room and returns what the joined
@ -146,7 +136,7 @@ func (d *Database) StoreJSON(
return nil, fmt.Errorf("d.insertQueueJSON: %w", err) return nil, fmt.Errorf("d.insertQueueJSON: %w", err)
} }
return &Receipt{ return &Receipt{
nids: []int64{nid}, nid: nid,
}, nil }, nil
} }

View file

@ -33,46 +33,40 @@ func (d *Database) AssociateEDUWithDestination(
receipt *Receipt, receipt *Receipt,
) error { ) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
for _, nid := range receipt.nids {
if err := d.FederationSenderQueueEDUs.InsertQueueEDU( if err := d.FederationSenderQueueEDUs.InsertQueueEDU(
ctx, // context ctx, // context
txn, // SQL transaction txn, // SQL transaction
"", // TODO: EDU type for coalescing "", // TODO: EDU type for coalescing
serverName, // destination server name serverName, // destination server name
nid, // NID from the federationsender_queue_json table receipt.nid, // NID from the federationsender_queue_json table
); err != nil { ); err != nil {
return fmt.Errorf("InsertQueueEDU: %w", err) return fmt.Errorf("InsertQueueEDU: %w", err)
} }
}
return nil return nil
}) })
} }
// GetNextTransactionEDUs retrieves events from the database for // GetNextTransactionEDUs retrieves events from the database for
// the next pending transaction, up to the limit specified. // the next pending transaction, up to the limit specified.
func (d *Database) GetNextTransactionEDUs( func (d *Database) GetPendingEDUs(
ctx context.Context, ctx context.Context,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) ( ) (
edus []*gomatrixserverlib.EDU, edus map[*Receipt]*gomatrixserverlib.EDU,
receipt *Receipt,
err error, err error,
) { ) {
edus = make(map[*Receipt]*gomatrixserverlib.EDU)
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit) nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueueEDUs: %w", err) return fmt.Errorf("SelectQueueEDUs: %w", err)
} }
receipt = &Receipt{
nids: nids,
}
retrieve := make([]int64, 0, len(nids)) retrieve := make([]int64, 0, len(nids))
for _, nid := range nids { for _, nid := range nids {
if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok { if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
edus = append(edus, edu) edus[&Receipt{nid}] = edu
} else { } else {
retrieve = append(retrieve, nid) retrieve = append(retrieve, nid)
} }
@ -83,12 +77,12 @@ func (d *Database) GetNextTransactionEDUs(
return fmt.Errorf("SelectQueueJSON: %w", err) return fmt.Errorf("SelectQueueJSON: %w", err)
} }
for _, blob := range blobs { for nid, blob := range blobs {
var event gomatrixserverlib.EDU var event gomatrixserverlib.EDU
if err := json.Unmarshal(blob, &event); err != nil { if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err) return fmt.Errorf("json.Unmarshal: %w", err)
} }
edus = append(edus, &event) edus[&Receipt{nid}] = &event
} }
return nil return nil
@ -101,19 +95,24 @@ func (d *Database) GetNextTransactionEDUs(
func (d *Database) CleanEDUs( func (d *Database) CleanEDUs(
ctx context.Context, ctx context.Context,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
receipt *Receipt, receipts []*Receipt,
) error { ) error {
if receipt == nil { if len(receipts) == 0 {
return errors.New("expected receipt") return errors.New("expected receipt")
} }
nids := make([]int64, len(receipts))
for i := range receipts {
nids[i] = receipts[i].nid
}
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, receipt.nids); err != nil { if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, nids); err != nil {
return err return err
} }
var deleteNIDs []int64 var deleteNIDs []int64
for _, nid := range receipt.nids { for _, nid := range nids {
count, err := d.FederationSenderQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid) count, err := d.FederationSenderQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueueEDUReferenceJSONCount: %w", err) return fmt.Errorf("SelectQueueEDUReferenceJSONCount: %w", err)

View file

@ -34,31 +34,27 @@ func (d *Database) AssociatePDUWithDestination(
receipt *Receipt, receipt *Receipt,
) error { ) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
for _, nid := range receipt.nids {
if err := d.FederationSenderQueuePDUs.InsertQueuePDU( if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
ctx, // context ctx, // context
txn, // SQL transaction txn, // SQL transaction
transactionID, // transaction ID transactionID, // transaction ID
serverName, // destination server name serverName, // destination server name
nid, // NID from the federationsender_queue_json table receipt.nid, // NID from the federationsender_queue_json table
); err != nil { ); err != nil {
return fmt.Errorf("InsertQueuePDU: %w", err) return fmt.Errorf("InsertQueuePDU: %w", err)
} }
}
return nil return nil
}) })
} }
// GetNextTransactionPDUs retrieves events from the database for // GetNextTransactionPDUs retrieves events from the database for
// the next pending transaction, up to the limit specified. // the next pending transaction, up to the limit specified.
func (d *Database) GetNextTransactionPDUs( func (d *Database) GetPendingPDUs(
ctx context.Context, ctx context.Context,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) ( ) (
transactionID gomatrixserverlib.TransactionID, events map[*Receipt]*gomatrixserverlib.HeaderedEvent,
events []*gomatrixserverlib.HeaderedEvent,
receipt *Receipt,
err error, err error,
) { ) {
// Strictly speaking this doesn't need to be using the writer // Strictly speaking this doesn't need to be using the writer
@ -66,29 +62,17 @@ func (d *Database) GetNextTransactionPDUs(
// a guarantee of transactional isolation, it's actually useful // a guarantee of transactional isolation, it's actually useful
// to know in SQLite mode that nothing else is trying to modify // to know in SQLite mode that nothing else is trying to modify
// the database. // the database.
events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent)
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
transactionID, err = d.FederationSenderQueuePDUs.SelectQueuePDUNextTransactionID(ctx, txn, serverName) nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit)
if err != nil {
return fmt.Errorf("SelectQueuePDUNextTransactionID: %w", err)
}
if transactionID == "" {
return nil
}
nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, transactionID, limit)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueuePDUs: %w", err) return fmt.Errorf("SelectQueuePDUs: %w", err)
} }
receipt = &Receipt{
nids: nids,
}
retrieve := make([]int64, 0, len(nids)) retrieve := make([]int64, 0, len(nids))
for _, nid := range nids { for _, nid := range nids {
if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok { if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
events = append(events, event) events[&Receipt{nid}] = event
} else { } else {
retrieve = append(retrieve, nid) retrieve = append(retrieve, nid)
} }
@ -104,7 +88,7 @@ func (d *Database) GetNextTransactionPDUs(
if err := json.Unmarshal(blob, &event); err != nil { if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err) return fmt.Errorf("json.Unmarshal: %w", err)
} }
events = append(events, &event) events[&Receipt{nid}] = &event
d.Cache.StoreFederationSenderQueuedPDU(nid, &event) d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
} }
@ -119,19 +103,24 @@ func (d *Database) GetNextTransactionPDUs(
func (d *Database) CleanPDUs( func (d *Database) CleanPDUs(
ctx context.Context, ctx context.Context,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
receipt *Receipt, receipts []*Receipt,
) error { ) error {
if receipt == nil { if len(receipts) == 0 {
return errors.New("expected receipt") return errors.New("expected receipt")
} }
nids := make([]int64, len(receipts))
for i := range receipts {
nids[i] = receipts[i].nid
}
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil { if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, nids); err != nil {
return err return err
} }
var deleteNIDs []int64 var deleteNIDs []int64
for _, nid := range receipt.nids { for _, nid := range nids {
count, err := d.FederationSenderQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid) count, err := d.FederationSenderQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueuePDUReferenceJSONCount: %w", err) return fmt.Errorf("SelectQueuePDUReferenceJSONCount: %w", err)

View file

@ -53,9 +53,9 @@ const selectQueueNextTransactionIDSQL = "" +
" ORDER BY transaction_id ASC" + " ORDER BY transaction_id ASC" +
" LIMIT 1" " LIMIT 1"
const selectQueuePDUsByTransactionSQL = "" + const selectQueuePDUsSQL = "" +
"SELECT json_nid FROM federationsender_queue_pdus" + "SELECT json_nid FROM federationsender_queue_pdus" +
" WHERE server_name = $1 AND transaction_id = $2" + " WHERE server_name = $1" +
" LIMIT $3" " LIMIT $3"
const selectQueuePDUsReferenceJSONCountSQL = "" + const selectQueuePDUsReferenceJSONCountSQL = "" +
@ -73,7 +73,7 @@ type queuePDUsStatements struct {
db *sql.DB db *sql.DB
insertQueuePDUStmt *sql.Stmt insertQueuePDUStmt *sql.Stmt
selectQueueNextTransactionIDStmt *sql.Stmt selectQueueNextTransactionIDStmt *sql.Stmt
selectQueuePDUsByTransactionStmt *sql.Stmt selectQueuePDUsStmt *sql.Stmt
selectQueueReferenceJSONCountStmt *sql.Stmt selectQueueReferenceJSONCountStmt *sql.Stmt
selectQueuePDUsCountStmt *sql.Stmt selectQueuePDUsCountStmt *sql.Stmt
selectQueueServerNamesStmt *sql.Stmt selectQueueServerNamesStmt *sql.Stmt
@ -97,7 +97,7 @@ func NewSQLiteQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) {
if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil { if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil {
return return
} }
if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil { if s.selectQueuePDUsStmt, err = db.Prepare(selectQueuePDUsSQL); err != nil {
return return
} }
if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueuePDUsReferenceJSONCountSQL); err != nil { if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueuePDUsReferenceJSONCountSQL); err != nil {
@ -193,11 +193,10 @@ func (s *queuePDUsStatements) SelectQueuePDUCount(
func (s *queuePDUsStatements) SelectQueuePDUs( func (s *queuePDUsStatements) SelectQueuePDUs(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
transactionID gomatrixserverlib.TransactionID,
limit int, limit int,
) ([]int64, error) { ) ([]int64, error) {
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt)
rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit) rows, err := stmt.QueryContext(ctx, serverName, limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -25,10 +25,9 @@ import (
type FederationSenderQueuePDUs interface { type FederationSenderQueuePDUs interface {
InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error
DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error
SelectQueuePDUNextTransactionID(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (gomatrixserverlib.TransactionID, error)
SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error) SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error)
SelectQueuePDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error) SelectQueuePDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error)
SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID, limit int) ([]int64, error) SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error)
SelectQueuePDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error) SelectQueuePDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error)
} }