Actually store EDUs once we retrieved from the database (#2651)
We now actually cache the EDUs once we got them from the database and ensures we only evict them if we successfully deleted them.
This commit is contained in:
parent
59bc0a6f4e
commit
7484689ad1
|
@ -110,6 +110,7 @@ func (d *Database) GetPendingEDUs(
|
||||||
return fmt.Errorf("json.Unmarshal: %w", err)
|
return fmt.Errorf("json.Unmarshal: %w", err)
|
||||||
}
|
}
|
||||||
edus[&Receipt{nid}] = &event
|
edus[&Receipt{nid}] = &event
|
||||||
|
d.Cache.StoreFederationQueuedEDU(nid, &event)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -177,20 +178,18 @@ func (d *Database) GetPendingEDUServerNames(
|
||||||
return d.FederationQueueEDUs.SelectQueueEDUServerNames(ctx, nil)
|
return d.FederationQueueEDUs.SelectQueueEDUServerNames(ctx, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteExpiredEDUs deletes expired EDUs
|
// DeleteExpiredEDUs deletes expired EDUs and evicts them from the cache.
|
||||||
func (d *Database) DeleteExpiredEDUs(ctx context.Context) error {
|
func (d *Database) DeleteExpiredEDUs(ctx context.Context) error {
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
var jsonNIDs []int64
|
||||||
|
err := d.Writer.Do(d.DB, nil, func(txn *sql.Tx) (err error) {
|
||||||
expiredBefore := gomatrixserverlib.AsTimestamp(time.Now())
|
expiredBefore := gomatrixserverlib.AsTimestamp(time.Now())
|
||||||
jsonNIDs, err := d.FederationQueueEDUs.SelectExpiredEDUs(ctx, txn, expiredBefore)
|
jsonNIDs, err = d.FederationQueueEDUs.SelectExpiredEDUs(ctx, txn, expiredBefore)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(jsonNIDs) == 0 {
|
if len(jsonNIDs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
for i := range jsonNIDs {
|
|
||||||
d.Cache.EvictFederationQueuedEDU(jsonNIDs[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, jsonNIDs); err != nil {
|
if err = d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, jsonNIDs); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -198,4 +197,14 @@ func (d *Database) DeleteExpiredEDUs(ctx context.Context) error {
|
||||||
|
|
||||||
return d.FederationQueueEDUs.DeleteExpiredEDUs(ctx, txn, expiredBefore)
|
return d.FederationQueueEDUs.DeleteExpiredEDUs(ctx, txn, expiredBefore)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range jsonNIDs {
|
||||||
|
d.Cache.EvictFederationQueuedEDU(jsonNIDs[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue