From ebaa0cf5d46260783676703078032406cc12d370 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 30 Jun 2020 15:05:08 +0100 Subject: [PATCH] Debug logging --- federationsender/queue/destinationqueue.go | 2 +- federationsender/storage/postgres/queue_json_table.go | 2 +- federationsender/storage/postgres/storage.go | 9 ++++++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index c30567250..2d81bc67b 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -107,7 +107,7 @@ func (oq *destinationQueue) sendEvent(nid int64) { oq.destination, // the destination server name []int64{nid}, // NID from federationsender_queue_json table ); err != nil { - log.WithError(err).Errorf("failed to associate PDU with ID %d with destination %q", oq.destination) + log.WithError(err).Errorf("failed to associate PDU NID %d with destination %q", nid, oq.destination) return } // We've successfully added a PDU to the transaction so increase diff --git a/federationsender/storage/postgres/queue_json_table.go b/federationsender/storage/postgres/queue_json_table.go index 1095c6ad4..c2fa46455 100644 --- a/federationsender/storage/postgres/queue_json_table.go +++ b/federationsender/storage/postgres/queue_json_table.go @@ -90,7 +90,7 @@ func (s *queueJSONStatements) deleteQueueJSON( return err } -func (s *queueJSONStatements) selectJSON( +func (s *queueJSONStatements) selectQueueJSON( ctx context.Context, txn *sql.Tx, jsonNIDs []int64, ) (map[int64][]byte, error) { blobs := map[int64][]byte{} diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index b90ac874a..619904919 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -191,7 +191,7 @@ func (d *Database) GetNextTransactionPDUs( return "", nil, fmt.Errorf("d.selectQueuePDUs: %w", err) } - blobs, err := d.selectJSON(ctx, nil, nids) + blobs, err := d.selectQueueJSON(ctx, nil, nids) if err != nil { return "", nil, fmt.Errorf("d.selectJSON: %w", err) } @@ -216,11 +216,15 @@ func (d *Database) CleanTransactionPDUs( serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID, ) error { + fmt.Println("Cleaning up after transaction", transactionID) + nids, err := d.selectQueuePDUs(ctx, nil, serverName, transactionID, 50) if err != nil { return fmt.Errorf("d.selectQueuePDUs: %w", err) } + fmt.Println("Transaction", transactionID, "has", len(nids), "NIDs") + if err = d.deleteQueueTransaction(ctx, nil, serverName, transactionID); err != nil { return fmt.Errorf("d.deleteQueueTransaction: %w", err) } @@ -237,6 +241,9 @@ func (d *Database) CleanTransactionPDUs( } } + fmt.Println("There are", len(deleteNIDs), "unreferenced NIDs ready for deletion") + fmt.Println("There are", len(nids)-len(deleteNIDs), "NIDs still referenced") + if len(deleteNIDs) > 0 { if err = d.deleteQueueJSON(ctx, nil, deleteNIDs); err != nil { return fmt.Errorf("d.deleteQueueJSON: %w", err)