diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index a1cc7992a..b0dc54bf4 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -411,13 +411,13 @@ func (oq *destinationQueue) nextTransaction( if pduReceipts != nil { //logrus.Infof("Cleaning PDUs %q", pduReceipt.String()) if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipts); err != nil { - log.WithError(err).Errorf("failed to clean PDUs for server %q", t.Destination) + log.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination) } } if eduReceipts != nil { //logrus.Infof("Cleaning EDUs %q", eduReceipt.String()) if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipts); err != nil { - log.WithError(err).Errorf("failed to clean EDUs for server %q", t.Destination) + log.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination) } } return true, len(t.PDUs), len(t.EDUs), nil @@ -429,7 +429,7 @@ func (oq *destinationQueue) nextTransaction( log.WithFields(log.Fields{ "destination": oq.destination, log.ErrorKey: err, - }).Info("problem sending transaction") + }).Infof("Failed to send transaction %q", t.TransactionID) return false, 0, 0, err } } diff --git a/federationsender/storage/shared/storage_edus.go b/federationsender/storage/shared/storage_edus.go index 2b9e2622e..86fee1a37 100644 --- a/federationsender/storage/shared/storage_edus.go +++ b/federationsender/storage/shared/storage_edus.go @@ -52,36 +52,42 @@ func (d *Database) GetPendingEDUs( ctx context.Context, serverName gomatrixserverlib.ServerName, limit int, -) (map[*Receipt]*gomatrixserverlib.EDU, error) { - edus := make(map[*Receipt]*gomatrixserverlib.EDU) - nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, nil, serverName, limit) - if err != nil { - return nil, fmt.Errorf("SelectQueueEDUs: %w", err) - } - - retrieve := make([]int64, 0, len(nids)) - for _, nid := range nids { - if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok { - edus[&Receipt{nid}] = edu - } else { - retrieve = append(retrieve, nid) +) ( + edus map[*Receipt]*gomatrixserverlib.EDU, + err error, +) { + edus = make(map[*Receipt]*gomatrixserverlib.EDU) + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit) + if err != nil { + return fmt.Errorf("SelectQueueEDUs: %w", err) } - } - blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, nil, retrieve) - if err != nil { - return nil, fmt.Errorf("SelectQueueJSON: %w", err) - } - - for nid, blob := range blobs { - var event gomatrixserverlib.EDU - if err := json.Unmarshal(blob, &event); err != nil { - return nil, fmt.Errorf("json.Unmarshal: %w", err) + retrieve := make([]int64, 0, len(nids)) + for _, nid := range nids { + if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok { + edus[&Receipt{nid}] = edu + } else { + retrieve = append(retrieve, nid) + } } - edus[&Receipt{nid}] = &event - } - return edus, nil + blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve) + if err != nil { + return fmt.Errorf("SelectQueueJSON: %w", err) + } + + for nid, blob := range blobs { + var event gomatrixserverlib.EDU + if err := json.Unmarshal(blob, &event); err != nil { + return fmt.Errorf("json.Unmarshal: %w", err) + } + edus[&Receipt{nid}] = &event + } + + return nil + }) + return } // CleanEDUs cleans up all specified EDUs. This is done when a diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go index a9c4c447b..bc298a905 100644 --- a/federationsender/storage/shared/storage_pdus.go +++ b/federationsender/storage/shared/storage_pdus.go @@ -53,42 +53,48 @@ func (d *Database) GetPendingPDUs( ctx context.Context, serverName gomatrixserverlib.ServerName, limit int, -) (map[*Receipt]*gomatrixserverlib.HeaderedEvent, error) { +) ( + events map[*Receipt]*gomatrixserverlib.HeaderedEvent, + err error, +) { // Strictly speaking this doesn't need to be using the writer // since we are only performing selects, but since we don't have // a guarantee of transactional isolation, it's actually useful // to know in SQLite mode that nothing else is trying to modify // the database. - events := make(map[*Receipt]*gomatrixserverlib.HeaderedEvent) - nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, nil, serverName, limit) - if err != nil { - return nil, fmt.Errorf("SelectQueuePDUs: %w", err) - } - - retrieve := make([]int64, 0, len(nids)) - for _, nid := range nids { - if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok { - events[&Receipt{nid}] = event - } else { - retrieve = append(retrieve, nid) + events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent) + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit) + if err != nil { + return fmt.Errorf("SelectQueuePDUs: %w", err) } - } - blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, nil, retrieve) - if err != nil { - return nil, fmt.Errorf("SelectQueueJSON: %w", err) - } - - for nid, blob := range blobs { - var event gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal(blob, &event); err != nil { - return nil, fmt.Errorf("json.Unmarshal: %w", err) + retrieve := make([]int64, 0, len(nids)) + for _, nid := range nids { + if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok { + events[&Receipt{nid}] = event + } else { + retrieve = append(retrieve, nid) + } } - events[&Receipt{nid}] = &event - d.Cache.StoreFederationSenderQueuedPDU(nid, &event) - } - return events, nil + blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve) + if err != nil { + return fmt.Errorf("SelectQueueJSON: %w", err) + } + + for nid, blob := range blobs { + var event gomatrixserverlib.HeaderedEvent + if err := json.Unmarshal(blob, &event); err != nil { + return fmt.Errorf("json.Unmarshal: %w", err) + } + events[&Receipt{nid}] = &event + d.Cache.StoreFederationSenderQueuedPDU(nid, &event) + } + + return nil + }) + return } // CleanTransactionPDUs cleans up all associated events for a