Give EDUs same caching treatment as PDUs

This commit is contained in:
Neil Alexander 2020-12-04 13:10:19 +00:00
parent 37279c996d
commit 2a6af202f0
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
8 changed files with 52 additions and 17 deletions

View file

@ -33,7 +33,7 @@ type Database struct {
}
// NewDatabase opens a new database
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationEventCache) (*Database, error) {
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) {
var d Database
var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil {

View file

@ -29,7 +29,7 @@ import (
type Database struct {
DB *sql.DB
Cache caching.FederationEventCache
Cache caching.FederationSenderCache
Writer sqlutil.Writer
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs

View file

@ -69,7 +69,16 @@ func (d *Database) GetNextTransactionEDUs(
nids: nids,
}
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids)
retrieve := make([]int64, 0, len(nids))
for _, nid := range nids {
if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
edus = append(edus, edu)
} else {
retrieve = append(retrieve, nid)
}
}
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
if err != nil {
return fmt.Errorf("SelectQueueJSON: %w", err)
}
@ -111,6 +120,7 @@ func (d *Database) CleanEDUs(
}
if count == 0 {
deleteNIDs = append(deleteNIDs, nid)
d.Cache.EvictFederationSenderQueuedEDU(nid)
}
}

View file

@ -87,7 +87,7 @@ func (d *Database) GetNextTransactionPDUs(
retrieve := make([]int64, 0, len(nids))
for _, nid := range nids {
if event, ok := d.Cache.GetFederationEvent(nid); ok {
if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
events = append(events, event)
} else {
retrieve = append(retrieve, nid)
@ -105,7 +105,7 @@ func (d *Database) GetNextTransactionPDUs(
return fmt.Errorf("json.Unmarshal: %w", err)
}
events = append(events, &event)
d.Cache.StoreFederationEvent(nid, &event)
d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
}
return nil
@ -138,7 +138,7 @@ func (d *Database) CleanPDUs(
}
if count == 0 {
deleteNIDs = append(deleteNIDs, nid)
d.Cache.EvictFederationEvent(nid)
d.Cache.EvictFederationSenderQueuedPDU(nid)
}
}

View file

@ -35,7 +35,7 @@ type Database struct {
}
// NewDatabase opens a new database
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationEventCache) (*Database, error) {
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) {
var d Database
var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil {

View file

@ -26,7 +26,7 @@ import (
)
// NewDatabase opens a new database
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationEventCache) (Database, error) {
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(dbProperties, cache)

View file

@ -23,7 +23,7 @@ import (
)
// NewDatabase opens a new database
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationEventCache) (Database, error) {
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(dbProperties, cache)

View file

@ -12,15 +12,19 @@ const (
FederationEventCacheMutable = true // to allow use of Unset only
)
// FederationEventCache contains the subset of functions needed for
// FederationSenderCache contains the subset of functions needed for
// a federation event cache.
type FederationEventCache interface {
GetFederationEvent(eventNID int64) (event *gomatrixserverlib.HeaderedEvent, ok bool)
StoreFederationEvent(eventNID int64, event *gomatrixserverlib.HeaderedEvent)
EvictFederationEvent(eventNID int64)
type FederationSenderCache interface {
GetFederationSenderQueuedPDU(eventNID int64) (event *gomatrixserverlib.HeaderedEvent, ok bool)
StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent)
EvictFederationSenderQueuedPDU(eventNID int64)
GetFederationSenderQueuedEDU(eventNID int64) (event *gomatrixserverlib.EDU, ok bool)
StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU)
EvictFederationSenderQueuedEDU(eventNID int64)
}
func (c Caches) GetFederationEvent(eventNID int64) (*gomatrixserverlib.HeaderedEvent, bool) {
func (c Caches) GetFederationSenderQueuedPDU(eventNID int64) (*gomatrixserverlib.HeaderedEvent, bool) {
key := fmt.Sprintf("%d", eventNID)
val, found := c.FederationEvents.Get(key)
if found && val != nil {
@ -31,12 +35,33 @@ func (c Caches) GetFederationEvent(eventNID int64) (*gomatrixserverlib.HeaderedE
return nil, false
}
func (c Caches) StoreFederationEvent(eventNID int64, event *gomatrixserverlib.HeaderedEvent) {
func (c Caches) StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) {
key := fmt.Sprintf("%d", eventNID)
c.FederationEvents.Set(key, event)
}
func (c Caches) EvictFederationEvent(eventNID int64) {
func (c Caches) EvictFederationSenderQueuedPDU(eventNID int64) {
key := fmt.Sprintf("%d", eventNID)
c.FederationEvents.Unset(key)
}
func (c Caches) GetFederationSenderQueuedEDU(eventNID int64) (*gomatrixserverlib.EDU, bool) {
key := fmt.Sprintf("%d", eventNID)
val, found := c.FederationEvents.Get(key)
if found && val != nil {
if event, ok := val.(*gomatrixserverlib.EDU); ok {
return event, true
}
}
return nil, false
}
func (c Caches) StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) {
key := fmt.Sprintf("%d", eventNID)
c.FederationEvents.Set(key, event)
}
func (c Caches) EvictFederationSenderQueuedEDU(eventNID int64) {
key := fmt.Sprintf("%d", eventNID)
c.FederationEvents.Unset(key)
}