Tweak logging
This commit is contained in:
parent
6029c79dfa
commit
91ac6e9880
|
@ -411,13 +411,13 @@ func (oq *destinationQueue) nextTransaction(
|
||||||
if pduReceipts != nil {
|
if pduReceipts != nil {
|
||||||
//logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
|
//logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
|
||||||
if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipts); err != nil {
|
if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipts); err != nil {
|
||||||
log.WithError(err).Errorf("failed to clean PDUs for server %q", t.Destination)
|
log.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if eduReceipts != nil {
|
if eduReceipts != nil {
|
||||||
//logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
|
//logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
|
||||||
if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipts); err != nil {
|
if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipts); err != nil {
|
||||||
log.WithError(err).Errorf("failed to clean EDUs for server %q", t.Destination)
|
log.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true, len(t.PDUs), len(t.EDUs), nil
|
return true, len(t.PDUs), len(t.EDUs), nil
|
||||||
|
@ -429,7 +429,7 @@ func (oq *destinationQueue) nextTransaction(
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"destination": oq.destination,
|
"destination": oq.destination,
|
||||||
log.ErrorKey: err,
|
log.ErrorKey: err,
|
||||||
}).Info("problem sending transaction")
|
}).Infof("Failed to send transaction %q", t.TransactionID)
|
||||||
return false, 0, 0, err
|
return false, 0, 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,36 +52,42 @@ func (d *Database) GetPendingEDUs(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
limit int,
|
limit int,
|
||||||
) (map[*Receipt]*gomatrixserverlib.EDU, error) {
|
) (
|
||||||
edus := make(map[*Receipt]*gomatrixserverlib.EDU)
|
edus map[*Receipt]*gomatrixserverlib.EDU,
|
||||||
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, nil, serverName, limit)
|
err error,
|
||||||
if err != nil {
|
) {
|
||||||
return nil, fmt.Errorf("SelectQueueEDUs: %w", err)
|
edus = make(map[*Receipt]*gomatrixserverlib.EDU)
|
||||||
}
|
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
|
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
|
||||||
retrieve := make([]int64, 0, len(nids))
|
if err != nil {
|
||||||
for _, nid := range nids {
|
return fmt.Errorf("SelectQueueEDUs: %w", err)
|
||||||
if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
|
|
||||||
edus[&Receipt{nid}] = edu
|
|
||||||
} else {
|
|
||||||
retrieve = append(retrieve, nid)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, nil, retrieve)
|
retrieve := make([]int64, 0, len(nids))
|
||||||
if err != nil {
|
for _, nid := range nids {
|
||||||
return nil, fmt.Errorf("SelectQueueJSON: %w", err)
|
if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
|
||||||
}
|
edus[&Receipt{nid}] = edu
|
||||||
|
} else {
|
||||||
for nid, blob := range blobs {
|
retrieve = append(retrieve, nid)
|
||||||
var event gomatrixserverlib.EDU
|
}
|
||||||
if err := json.Unmarshal(blob, &event); err != nil {
|
|
||||||
return nil, fmt.Errorf("json.Unmarshal: %w", err)
|
|
||||||
}
|
}
|
||||||
edus[&Receipt{nid}] = &event
|
|
||||||
}
|
|
||||||
|
|
||||||
return edus, nil
|
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("SelectQueueJSON: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for nid, blob := range blobs {
|
||||||
|
var event gomatrixserverlib.EDU
|
||||||
|
if err := json.Unmarshal(blob, &event); err != nil {
|
||||||
|
return fmt.Errorf("json.Unmarshal: %w", err)
|
||||||
|
}
|
||||||
|
edus[&Receipt{nid}] = &event
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// CleanEDUs cleans up all specified EDUs. This is done when a
|
// CleanEDUs cleans up all specified EDUs. This is done when a
|
||||||
|
|
|
@ -53,42 +53,48 @@ func (d *Database) GetPendingPDUs(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
limit int,
|
limit int,
|
||||||
) (map[*Receipt]*gomatrixserverlib.HeaderedEvent, error) {
|
) (
|
||||||
|
events map[*Receipt]*gomatrixserverlib.HeaderedEvent,
|
||||||
|
err error,
|
||||||
|
) {
|
||||||
// Strictly speaking this doesn't need to be using the writer
|
// Strictly speaking this doesn't need to be using the writer
|
||||||
// since we are only performing selects, but since we don't have
|
// since we are only performing selects, but since we don't have
|
||||||
// a guarantee of transactional isolation, it's actually useful
|
// a guarantee of transactional isolation, it's actually useful
|
||||||
// to know in SQLite mode that nothing else is trying to modify
|
// to know in SQLite mode that nothing else is trying to modify
|
||||||
// the database.
|
// the database.
|
||||||
events := make(map[*Receipt]*gomatrixserverlib.HeaderedEvent)
|
events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent)
|
||||||
nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, nil, serverName, limit)
|
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
if err != nil {
|
nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit)
|
||||||
return nil, fmt.Errorf("SelectQueuePDUs: %w", err)
|
if err != nil {
|
||||||
}
|
return fmt.Errorf("SelectQueuePDUs: %w", err)
|
||||||
|
|
||||||
retrieve := make([]int64, 0, len(nids))
|
|
||||||
for _, nid := range nids {
|
|
||||||
if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
|
|
||||||
events[&Receipt{nid}] = event
|
|
||||||
} else {
|
|
||||||
retrieve = append(retrieve, nid)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, nil, retrieve)
|
retrieve := make([]int64, 0, len(nids))
|
||||||
if err != nil {
|
for _, nid := range nids {
|
||||||
return nil, fmt.Errorf("SelectQueueJSON: %w", err)
|
if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
|
||||||
}
|
events[&Receipt{nid}] = event
|
||||||
|
} else {
|
||||||
for nid, blob := range blobs {
|
retrieve = append(retrieve, nid)
|
||||||
var event gomatrixserverlib.HeaderedEvent
|
}
|
||||||
if err := json.Unmarshal(blob, &event); err != nil {
|
|
||||||
return nil, fmt.Errorf("json.Unmarshal: %w", err)
|
|
||||||
}
|
}
|
||||||
events[&Receipt{nid}] = &event
|
|
||||||
d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
|
|
||||||
}
|
|
||||||
|
|
||||||
return events, nil
|
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("SelectQueueJSON: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for nid, blob := range blobs {
|
||||||
|
var event gomatrixserverlib.HeaderedEvent
|
||||||
|
if err := json.Unmarshal(blob, &event); err != nil {
|
||||||
|
return fmt.Errorf("json.Unmarshal: %w", err)
|
||||||
|
}
|
||||||
|
events[&Receipt{nid}] = &event
|
||||||
|
d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// CleanTransactionPDUs cleans up all associated events for a
|
// CleanTransactionPDUs cleans up all associated events for a
|
||||||
|
|
Loading…
Reference in a new issue