From de9520b408d4e7fbcf3ca6f0a6cfa209dca64287 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 8 Dec 2020 16:29:13 +0000 Subject: [PATCH] Export NID instead of receipts, remove federation event cache as no longer needed --- federationsender/federationsender.go | 2 +- federationsender/queue/destinationqueue.go | 38 +++++------ federationsender/queue/queue.go | 10 +-- federationsender/storage/interface.go | 19 +++--- .../storage/postgres/queue_edus_table.go | 27 +++++--- .../storage/postgres/queue_json_table.go | 25 ++++--- .../storage/postgres/queue_pdus_table.go | 27 +++++--- federationsender/storage/postgres/storage.go | 4 +- federationsender/storage/shared/storage.go | 24 ++----- .../storage/shared/storage_edus.go | 46 +++++-------- .../storage/shared/storage_pdus.go | 39 ++++------- .../storage/sqlite3/queue_edus_table.go | 21 +++--- .../storage/sqlite3/queue_json_table.go | 17 ++--- .../storage/sqlite3/queue_pdus_table.go | 21 +++--- federationsender/storage/sqlite3/storage.go | 4 +- federationsender/storage/storage.go | 7 +- federationsender/storage/tables/interface.go | 26 +++---- federationsender/types/types.go | 2 + internal/caching/cache_federationevents.go | 67 ------------------- internal/caching/caches.go | 1 - internal/caching/impl_inmemorylru.go | 10 --- 21 files changed, 166 insertions(+), 271 deletions(-) delete mode 100644 internal/caching/cache_federationevents.go diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index a24e0f488..fc0ba6d5f 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -46,7 +46,7 @@ func NewInternalAPI( ) api.FederationSenderInternalAPI { cfg := &base.Cfg.FederationSender - federationSenderDB, err := storage.NewDatabase(&cfg.Database, base.Caches) + federationSenderDB, err := storage.NewDatabase(&cfg.Database) if err != nil { logrus.WithError(err).Panic("failed to connect to federation sender db") } diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 31eeaebc5..23294022a 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -23,7 +23,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/storage" - "github.com/matrix-org/dendrite/federationsender/storage/shared" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" @@ -67,7 +67,7 @@ type destinationQueue struct { // Send event adds the event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to // start sending events to that destination. -func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, receipt *shared.Receipt) { +func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, nid types.ContentNID) { if event == nil { log.Errorf("attempt to send nil PDU with destination %q", oq.destination) return @@ -79,7 +79,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re context.TODO(), "", // TODO: remove this, as we don't need to persist the transaction ID oq.destination, // the destination server name - receipt, // NIDs from federationsender_queue_json table + nid, // NIDs from federationsender_queue_json table ); err != nil { log.WithError(err).Errorf("failed to associate PDU %q with destination %q", event.EventID(), oq.destination) return @@ -92,8 +92,8 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re oq.pendingMutex.Lock() if len(oq.pendingPDUs) < maxPDUsInMemory { oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{ - pdu: event, - receipt: receipt, + pdu: event, + nid: nid, }) } else { oq.overflowed.Store(true) @@ -111,7 +111,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re // sendEDU adds the EDU event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to // start sending events to that destination. -func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *shared.Receipt) { +func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, nid types.ContentNID) { if event == nil { log.Errorf("attempt to send nil EDU with destination %q", oq.destination) return @@ -122,7 +122,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share if err := oq.db.AssociateEDUWithDestination( context.TODO(), oq.destination, // the destination server name - receipt, // NIDs from federationsender_queue_json table + nid, // NIDs from federationsender_queue_json table ); err != nil { log.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination) return @@ -135,8 +135,8 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share oq.pendingMutex.Lock() if len(oq.pendingEDUs) < maxEDUsInMemory { oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{ - edu: event, - receipt: receipt, + edu: event, + nid: nid, }) } else { oq.overflowed.Store(true) @@ -183,20 +183,20 @@ func (oq *destinationQueue) getPendingFromDatabase() { // have cached. We will index them based on the receipt, // which ultimately just contains the index of the PDU/EDU // in the database. - gotPDUs := map[string]struct{}{} - gotEDUs := map[string]struct{}{} + gotPDUs := map[types.ContentNID]struct{}{} + gotEDUs := map[types.ContentNID]struct{}{} for _, pdu := range oq.pendingPDUs { - gotPDUs[pdu.receipt.String()] = struct{}{} + gotPDUs[pdu.nid] = struct{}{} } for _, edu := range oq.pendingEDUs { - gotEDUs[edu.receipt.String()] = struct{}{} + gotEDUs[edu.nid] = struct{}{} } if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 { // We have room in memory for some PDUs - let's request no more than that. if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil { for receipt, pdu := range pdus { - if _, ok := gotPDUs[receipt.String()]; ok { + if _, ok := gotPDUs[receipt]; ok { continue } oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu}) @@ -210,7 +210,7 @@ func (oq *destinationQueue) getPendingFromDatabase() { // We have room in memory for some EDUs - let's request no more than that. if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil { for receipt, edu := range edus { - if _, ok := gotEDUs[receipt.String()]; ok { + if _, ok := gotEDUs[receipt]; ok { continue } oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu}) @@ -375,8 +375,8 @@ func (oq *destinationQueue) nextTransaction( return false, 0, 0, nil } - var pduReceipts []*shared.Receipt - var eduReceipts []*shared.Receipt + var pduReceipts []types.ContentNID + var eduReceipts []types.ContentNID // Go through PDUs that we retrieved from the database, if any, // and add them into the transaction. @@ -387,7 +387,7 @@ func (oq *destinationQueue) nextTransaction( // Append the JSON of the event, since this is a json.RawMessage type in the // gomatrixserverlib.Transaction struct t.PDUs = append(t.PDUs, pdu.pdu.JSON()) - pduReceipts = append(pduReceipts, pdu.receipt) + pduReceipts = append(pduReceipts, pdu.nid) } // Do the same for pending EDUS in the queue. @@ -396,7 +396,7 @@ func (oq *destinationQueue) nextTransaction( continue } t.EDUs = append(t.EDUs, *edu.edu) - eduReceipts = append(eduReceipts, edu.receipt) + eduReceipts = append(eduReceipts, edu.nid) } logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index da30e4de1..070c38f18 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -24,7 +24,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/storage" - "github.com/matrix-org/dendrite/federationsender/storage/shared" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -102,13 +102,13 @@ type SigningInfo struct { } type queuedPDU struct { - receipt *shared.Receipt - pdu *gomatrixserverlib.HeaderedEvent + nid types.ContentNID + pdu *gomatrixserverlib.HeaderedEvent } type queuedEDU struct { - receipt *shared.Receipt - edu *gomatrixserverlib.EDU + nid types.ContentNID + edu *gomatrixserverlib.EDU } func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue { diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index 03d616f1b..9be3ab15a 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -17,7 +17,6 @@ package storage import ( "context" - "github.com/matrix-org/dendrite/federationsender/storage/shared" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/gomatrixserverlib" @@ -34,19 +33,19 @@ type Database interface { GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) PurgeRoomState(ctx context.Context, roomID string) error - StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) + StoreJSON(ctx context.Context, js string) (types.ContentNID, error) - GetPendingPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (pdus map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent, err error) - GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[*shared.Receipt]*gomatrixserverlib.EDU, err error) + GetPendingPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (pdus map[types.ContentNID]*gomatrixserverlib.HeaderedEvent, err error) + GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[types.ContentNID]*gomatrixserverlib.EDU, err error) - AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error - AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error + AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid types.ContentNID) error + AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, nid types.ContentNID) error - CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error - CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error + CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, nids []types.ContentNID) error + CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, nids []types.ContentNID) error - GetPendingPDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error) - GetPendingEDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error) + GetPendingPDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (types.ContentNID, error) + GetPendingEDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (types.ContentNID, error) GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) diff --git a/federationsender/storage/postgres/queue_edus_table.go b/federationsender/storage/postgres/queue_edus_table.go index 6cac489bf..8c7471c83 100644 --- a/federationsender/storage/postgres/queue_edus_table.go +++ b/federationsender/storage/postgres/queue_edus_table.go @@ -19,6 +19,7 @@ import ( "database/sql" "github.com/lib/pq" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" @@ -105,7 +106,7 @@ func (s *queueEDUsStatements) InsertQueueEDU( txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, - nid int64, + nid types.ContentNID, ) error { stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) _, err := stmt.ExecContext( @@ -120,10 +121,14 @@ func (s *queueEDUsStatements) InsertQueueEDU( func (s *queueEDUsStatements) DeleteQueueEDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, - jsonNIDs []int64, + jsonNIDs []types.ContentNID, ) error { stmt := sqlutil.TxStmt(txn, s.deleteQueueEDUStmt) - _, err := stmt.ExecContext(ctx, serverName, pq.Int64Array(jsonNIDs)) + nids := make([]int64, 0, len(jsonNIDs)) + for _, n := range jsonNIDs { + nids = append(nids, int64(n)) + } + _, err := stmt.ExecContext(ctx, serverName, pq.Int64Array(nids)) return err } @@ -131,16 +136,16 @@ func (s *queueEDUsStatements) SelectQueueEDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int, -) ([]int64, error) { +) ([]types.ContentNID, error) { stmt := sqlutil.TxStmt(txn, s.selectQueueEDUStmt) rows, err := stmt.QueryContext(ctx, serverName, limit) if err != nil { return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed") - var result []int64 + var result []types.ContentNID for rows.Next() { - var nid int64 + var nid types.ContentNID if err = rows.Scan(&nid); err != nil { return nil, err } @@ -150,9 +155,9 @@ func (s *queueEDUsStatements) SelectQueueEDUs( } func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount( - ctx context.Context, txn *sql.Tx, jsonNID int64, -) (int64, error) { - var count int64 + ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID, +) (types.ContentNID, error) { + var count types.ContentNID stmt := sqlutil.TxStmt(txn, s.selectQueueEDUReferenceJSONCountStmt) err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) if err == sql.ErrNoRows { @@ -163,8 +168,8 @@ func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount( func (s *queueEDUsStatements) SelectQueueEDUCount( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, -) (int64, error) { - var count int64 +) (types.ContentNID, error) { + var count types.ContentNID stmt := sqlutil.TxStmt(txn, s.selectQueueEDUCountStmt) err := stmt.QueryRowContext(ctx, serverName).Scan(&count) if err == sql.ErrNoRows { diff --git a/federationsender/storage/postgres/queue_json_table.go b/federationsender/storage/postgres/queue_json_table.go index 853073741..09976de71 100644 --- a/federationsender/storage/postgres/queue_json_table.go +++ b/federationsender/storage/postgres/queue_json_table.go @@ -19,6 +19,7 @@ import ( "database/sql" "github.com/lib/pq" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" ) @@ -76,9 +77,9 @@ func NewPostgresQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) { func (s *queueJSONStatements) InsertQueueJSON( ctx context.Context, txn *sql.Tx, json string, -) (int64, error) { +) (types.ContentNID, error) { stmt := sqlutil.TxStmt(txn, s.insertJSONStmt) - var lastid int64 + var lastid types.ContentNID if err := stmt.QueryRowContext(ctx, json).Scan(&lastid); err != nil { return 0, err } @@ -86,25 +87,33 @@ func (s *queueJSONStatements) InsertQueueJSON( } func (s *queueJSONStatements) DeleteQueueJSON( - ctx context.Context, txn *sql.Tx, nids []int64, + ctx context.Context, txn *sql.Tx, jsonNIDs []types.ContentNID, ) error { stmt := sqlutil.TxStmt(txn, s.deleteJSONStmt) + nids := make([]int64, 0, len(jsonNIDs)) + for _, n := range jsonNIDs { + nids = append(nids, int64(n)) + } _, err := stmt.ExecContext(ctx, pq.Int64Array(nids)) return err } func (s *queueJSONStatements) SelectQueueJSON( - ctx context.Context, txn *sql.Tx, jsonNIDs []int64, -) (map[int64][]byte, error) { - blobs := map[int64][]byte{} + ctx context.Context, txn *sql.Tx, jsonNIDs []types.ContentNID, +) (map[types.ContentNID][]byte, error) { + blobs := map[types.ContentNID][]byte{} stmt := sqlutil.TxStmt(txn, s.selectJSONStmt) - rows, err := stmt.QueryContext(ctx, pq.Int64Array(jsonNIDs)) + nids := make([]int64, 0, len(jsonNIDs)) + for _, n := range jsonNIDs { + nids = append(nids, int64(n)) + } + rows, err := stmt.QueryContext(ctx, pq.Int64Array(nids)) if err != nil { return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed") for rows.Next() { - var nid int64 + var nid types.ContentNID var blob []byte if err = rows.Scan(&nid, &blob); err != nil { return nil, err diff --git a/federationsender/storage/postgres/queue_pdus_table.go b/federationsender/storage/postgres/queue_pdus_table.go index f9a477483..2c3f4654c 100644 --- a/federationsender/storage/postgres/queue_pdus_table.go +++ b/federationsender/storage/postgres/queue_pdus_table.go @@ -19,6 +19,7 @@ import ( "database/sql" "github.com/lib/pq" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" @@ -105,7 +106,7 @@ func (s *queuePDUsStatements) InsertQueuePDU( txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, - nid int64, + nid types.ContentNID, ) error { stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) _, err := stmt.ExecContext( @@ -120,17 +121,21 @@ func (s *queuePDUsStatements) InsertQueuePDU( func (s *queuePDUsStatements) DeleteQueuePDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, - jsonNIDs []int64, + jsonNIDs []types.ContentNID, ) error { stmt := sqlutil.TxStmt(txn, s.deleteQueuePDUsStmt) - _, err := stmt.ExecContext(ctx, serverName, pq.Int64Array(jsonNIDs)) + nids := make([]int64, 0, len(jsonNIDs)) + for _, n := range jsonNIDs { + nids = append(nids, int64(n)) + } + _, err := stmt.ExecContext(ctx, serverName, pq.Int64Array(nids)) return err } func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount( - ctx context.Context, txn *sql.Tx, jsonNID int64, -) (int64, error) { - var count int64 + ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID, +) (types.ContentNID, error) { + var count types.ContentNID stmt := sqlutil.TxStmt(txn, s.selectQueuePDUReferenceJSONCountStmt) err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) if err == sql.ErrNoRows { @@ -144,8 +149,8 @@ func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount( func (s *queuePDUsStatements) SelectQueuePDUCount( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, -) (int64, error) { - var count int64 +) (types.ContentNID, error) { + var count types.ContentNID stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsCountStmt) err := stmt.QueryRowContext(ctx, serverName).Scan(&count) if err == sql.ErrNoRows { @@ -161,16 +166,16 @@ func (s *queuePDUsStatements) SelectQueuePDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int, -) ([]int64, error) { +) ([]types.ContentNID, error) { stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt) rows, err := stmt.QueryContext(ctx, serverName, limit) if err != nil { return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed") - var result []int64 + var result []types.ContentNID for rows.Next() { - var nid int64 + var nid types.ContentNID if err = rows.Scan(&nid); err != nil { return nil, err } diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index 75b54bbcb..f314f8493 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -19,7 +19,6 @@ import ( "database/sql" "github.com/matrix-org/dendrite/federationsender/storage/shared" - "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" ) @@ -33,7 +32,7 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { var d Database var err error if d.db, err = sqlutil.Open(dbProperties); err != nil { @@ -66,7 +65,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS } d.Database = shared.Database{ DB: d.db, - Cache: cache, Writer: d.writer, FederationSenderJoinedHosts: joinedHosts, FederationSenderQueuePDUs: queuePDUs, diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index fbf84c705..d0efc309a 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -21,14 +21,12 @@ import ( "github.com/matrix-org/dendrite/federationsender/storage/tables" "github.com/matrix-org/dendrite/federationsender/types" - "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" ) type Database struct { DB *sql.DB - Cache caching.FederationSenderCache Writer sqlutil.Writer FederationSenderQueuePDUs tables.FederationSenderQueuePDUs FederationSenderQueueEDUs tables.FederationSenderQueueEDUs @@ -38,18 +36,6 @@ type Database struct { FederationSenderBlacklist tables.FederationSenderBlacklist } -// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. -// We don't actually export the NIDs but we need the caller to be able -// to pass them back so that we can clean up if the transaction sends -// successfully. -type Receipt struct { - nid int64 -} - -func (r *Receipt) String() string { - return fmt.Sprintf("%d", r.nid) -} - // UpdateRoom updates the joined hosts for a room and returns what the joined // hosts were before the update, or nil if this was a duplicate message. // This is called when we receive a message from kafka, so we pass in @@ -129,19 +115,17 @@ func (d *Database) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) // metadata entries. func (d *Database) StoreJSON( ctx context.Context, js string, -) (*Receipt, error) { - var nid int64 +) (types.ContentNID, error) { + var nid types.ContentNID var err error _ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { nid, err = d.FederationSenderQueueJSON.InsertQueueJSON(ctx, txn, js) return err }) if err != nil { - return nil, fmt.Errorf("d.insertQueueJSON: %w", err) + return 0, fmt.Errorf("d.insertQueueJSON: %w", err) } - return &Receipt{ - nid: nid, - }, nil + return nid, nil } func (d *Database) PurgeRoomState( diff --git a/federationsender/storage/shared/storage_edus.go b/federationsender/storage/shared/storage_edus.go index 86fee1a37..5558fef19 100644 --- a/federationsender/storage/shared/storage_edus.go +++ b/federationsender/storage/shared/storage_edus.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -30,15 +31,15 @@ import ( func (d *Database) AssociateEDUWithDestination( ctx context.Context, serverName gomatrixserverlib.ServerName, - receipt *Receipt, + nid types.ContentNID, ) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { if err := d.FederationSenderQueueEDUs.InsertQueueEDU( - ctx, // context - txn, // SQL transaction - "", // TODO: EDU type for coalescing - serverName, // destination server name - receipt.nid, // NID from the federationsender_queue_json table + ctx, // context + txn, // SQL transaction + "", // TODO: EDU type for coalescing + serverName, // destination server name + nid, // NID from the federationsender_queue_json table ); err != nil { return fmt.Errorf("InsertQueueEDU: %w", err) } @@ -53,26 +54,17 @@ func (d *Database) GetPendingEDUs( serverName gomatrixserverlib.ServerName, limit int, ) ( - edus map[*Receipt]*gomatrixserverlib.EDU, + edus map[types.ContentNID]*gomatrixserverlib.EDU, err error, ) { - edus = make(map[*Receipt]*gomatrixserverlib.EDU) + edus = make(map[types.ContentNID]*gomatrixserverlib.EDU) err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit) if err != nil { return fmt.Errorf("SelectQueueEDUs: %w", err) } - retrieve := make([]int64, 0, len(nids)) - for _, nid := range nids { - if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok { - edus[&Receipt{nid}] = edu - } else { - retrieve = append(retrieve, nid) - } - } - - blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve) + blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids) if err != nil { return fmt.Errorf("SelectQueueJSON: %w", err) } @@ -82,7 +74,7 @@ func (d *Database) GetPendingEDUs( if err := json.Unmarshal(blob, &event); err != nil { return fmt.Errorf("json.Unmarshal: %w", err) } - edus[&Receipt{nid}] = &event + edus[nid] = &event } return nil @@ -95,15 +87,10 @@ func (d *Database) GetPendingEDUs( func (d *Database) CleanEDUs( ctx context.Context, serverName gomatrixserverlib.ServerName, - receipts []*Receipt, + nids []types.ContentNID, ) error { - if len(receipts) == 0 { - return errors.New("expected receipt") - } - - nids := make([]int64, len(receipts)) - for i := range receipts { - nids[i] = receipts[i].nid + if len(nids) == 0 { + return errors.New("expected one or more NIDs") } return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { @@ -111,7 +98,7 @@ func (d *Database) CleanEDUs( return err } - var deleteNIDs []int64 + var deleteNIDs []types.ContentNID for _, nid := range nids { count, err := d.FederationSenderQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid) if err != nil { @@ -119,7 +106,6 @@ func (d *Database) CleanEDUs( } if count == 0 { deleteNIDs = append(deleteNIDs, nid) - d.Cache.EvictFederationSenderQueuedEDU(nid) } } @@ -138,7 +124,7 @@ func (d *Database) CleanEDUs( func (d *Database) GetPendingEDUCount( ctx context.Context, serverName gomatrixserverlib.ServerName, -) (int64, error) { +) (types.ContentNID, error) { return d.FederationSenderQueueEDUs.SelectQueueEDUCount(ctx, nil, serverName) } diff --git a/federationsender/storage/shared/storage_pdus.go b/federationsender/storage/shared/storage_pdus.go index bc298a905..dc2fea7e0 100644 --- a/federationsender/storage/shared/storage_pdus.go +++ b/federationsender/storage/shared/storage_pdus.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -31,7 +32,7 @@ func (d *Database) AssociatePDUWithDestination( ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, - receipt *Receipt, + nid types.ContentNID, ) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { if err := d.FederationSenderQueuePDUs.InsertQueuePDU( @@ -39,7 +40,7 @@ func (d *Database) AssociatePDUWithDestination( txn, // SQL transaction transactionID, // transaction ID serverName, // destination server name - receipt.nid, // NID from the federationsender_queue_json table + nid, // NID from the federationsender_queue_json table ); err != nil { return fmt.Errorf("InsertQueuePDU: %w", err) } @@ -54,7 +55,7 @@ func (d *Database) GetPendingPDUs( serverName gomatrixserverlib.ServerName, limit int, ) ( - events map[*Receipt]*gomatrixserverlib.HeaderedEvent, + events map[types.ContentNID]*gomatrixserverlib.HeaderedEvent, err error, ) { // Strictly speaking this doesn't need to be using the writer @@ -62,23 +63,14 @@ func (d *Database) GetPendingPDUs( // a guarantee of transactional isolation, it's actually useful // to know in SQLite mode that nothing else is trying to modify // the database. - events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent) + events = make(map[types.ContentNID]*gomatrixserverlib.HeaderedEvent) err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit) if err != nil { return fmt.Errorf("SelectQueuePDUs: %w", err) } - retrieve := make([]int64, 0, len(nids)) - for _, nid := range nids { - if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok { - events[&Receipt{nid}] = event - } else { - retrieve = append(retrieve, nid) - } - } - - blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve) + blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids) if err != nil { return fmt.Errorf("SelectQueueJSON: %w", err) } @@ -88,8 +80,7 @@ func (d *Database) GetPendingPDUs( if err := json.Unmarshal(blob, &event); err != nil { return fmt.Errorf("json.Unmarshal: %w", err) } - events[&Receipt{nid}] = &event - d.Cache.StoreFederationSenderQueuedPDU(nid, &event) + events[nid] = &event } return nil @@ -103,15 +94,10 @@ func (d *Database) GetPendingPDUs( func (d *Database) CleanPDUs( ctx context.Context, serverName gomatrixserverlib.ServerName, - receipts []*Receipt, + nids []types.ContentNID, ) error { - if len(receipts) == 0 { - return errors.New("expected receipt") - } - - nids := make([]int64, len(receipts)) - for i := range receipts { - nids[i] = receipts[i].nid + if len(nids) == 0 { + return errors.New("expected one or more ContentNIDs") } return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { @@ -119,7 +105,7 @@ func (d *Database) CleanPDUs( return err } - var deleteNIDs []int64 + var deleteNIDs []types.ContentNID for _, nid := range nids { count, err := d.FederationSenderQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid) if err != nil { @@ -127,7 +113,6 @@ func (d *Database) CleanPDUs( } if count == 0 { deleteNIDs = append(deleteNIDs, nid) - d.Cache.EvictFederationSenderQueuedPDU(nid) } } @@ -146,7 +131,7 @@ func (d *Database) CleanPDUs( func (d *Database) GetPendingPDUCount( ctx context.Context, serverName gomatrixserverlib.ServerName, -) (int64, error) { +) (types.ContentNID, error) { return d.FederationSenderQueuePDUs.SelectQueuePDUCount(ctx, nil, serverName) } diff --git a/federationsender/storage/sqlite3/queue_edus_table.go b/federationsender/storage/sqlite3/queue_edus_table.go index a6d609508..5e6fb0e2d 100644 --- a/federationsender/storage/sqlite3/queue_edus_table.go +++ b/federationsender/storage/sqlite3/queue_edus_table.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" @@ -102,7 +103,7 @@ func (s *queueEDUsStatements) InsertQueueEDU( txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, - nid int64, + nid types.ContentNID, ) error { stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) _, err := stmt.ExecContext( @@ -117,7 +118,7 @@ func (s *queueEDUsStatements) InsertQueueEDU( func (s *queueEDUsStatements) DeleteQueueEDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, - jsonNIDs []int64, + jsonNIDs []types.ContentNID, ) error { deleteSQL := strings.Replace(deleteQueueEDUsSQL, "($2)", sqlutil.QueryVariadicOffset(len(jsonNIDs), 1), 1) deleteStmt, err := txn.Prepare(deleteSQL) @@ -140,16 +141,16 @@ func (s *queueEDUsStatements) SelectQueueEDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int, -) ([]int64, error) { +) ([]types.ContentNID, error) { stmt := sqlutil.TxStmt(txn, s.selectQueueEDUStmt) rows, err := stmt.QueryContext(ctx, serverName, limit) if err != nil { return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed") - var result []int64 + var result []types.ContentNID for rows.Next() { - var nid int64 + var nid types.ContentNID if err = rows.Scan(&nid); err != nil { return nil, err } @@ -159,9 +160,9 @@ func (s *queueEDUsStatements) SelectQueueEDUs( } func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount( - ctx context.Context, txn *sql.Tx, jsonNID int64, -) (int64, error) { - var count int64 + ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID, +) (types.ContentNID, error) { + var count types.ContentNID stmt := sqlutil.TxStmt(txn, s.selectQueueEDUReferenceJSONCountStmt) err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) if err == sql.ErrNoRows { @@ -172,8 +173,8 @@ func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount( func (s *queueEDUsStatements) SelectQueueEDUCount( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, -) (int64, error) { - var count int64 +) (types.ContentNID, error) { + var count types.ContentNID stmt := sqlutil.TxStmt(txn, s.selectQueueEDUCountStmt) err := stmt.QueryRowContext(ctx, serverName).Scan(&count) if err == sql.ErrNoRows { diff --git a/federationsender/storage/sqlite3/queue_json_table.go b/federationsender/storage/sqlite3/queue_json_table.go index 3e3f60f63..600ce743c 100644 --- a/federationsender/storage/sqlite3/queue_json_table.go +++ b/federationsender/storage/sqlite3/queue_json_table.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" ) @@ -71,21 +72,21 @@ func NewSQLiteQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) { func (s *queueJSONStatements) InsertQueueJSON( ctx context.Context, txn *sql.Tx, json string, -) (lastid int64, err error) { +) (types.ContentNID, error) { stmt := sqlutil.TxStmt(txn, s.insertJSONStmt) res, err := stmt.ExecContext(ctx, json) if err != nil { return 0, fmt.Errorf("stmt.QueryContext: %w", err) } - lastid, err = res.LastInsertId() + lastid, err := res.LastInsertId() if err != nil { return 0, fmt.Errorf("res.LastInsertId: %w", err) } - return + return types.ContentNID(lastid), nil } func (s *queueJSONStatements) DeleteQueueJSON( - ctx context.Context, txn *sql.Tx, nids []int64, + ctx context.Context, txn *sql.Tx, nids []types.ContentNID, ) error { deleteSQL := strings.Replace(deleteJSONSQL, "($1)", sqlutil.QueryVariadic(len(nids)), 1) deleteStmt, err := txn.Prepare(deleteSQL) @@ -104,8 +105,8 @@ func (s *queueJSONStatements) DeleteQueueJSON( } func (s *queueJSONStatements) SelectQueueJSON( - ctx context.Context, txn *sql.Tx, jsonNIDs []int64, -) (map[int64][]byte, error) { + ctx context.Context, txn *sql.Tx, jsonNIDs []types.ContentNID, +) (map[types.ContentNID][]byte, error) { selectSQL := strings.Replace(selectJSONSQL, "($1)", sqlutil.QueryVariadic(len(jsonNIDs)), 1) selectStmt, err := txn.Prepare(selectSQL) if err != nil { @@ -117,7 +118,7 @@ func (s *queueJSONStatements) SelectQueueJSON( iNIDs[k] = v } - blobs := map[int64][]byte{} + blobs := map[types.ContentNID][]byte{} stmt := sqlutil.TxStmt(txn, selectStmt) rows, err := stmt.QueryContext(ctx, iNIDs...) if err != nil { @@ -125,7 +126,7 @@ func (s *queueJSONStatements) SelectQueueJSON( } defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed") for rows.Next() { - var nid int64 + var nid types.ContentNID var blob []byte if err = rows.Scan(&nid, &blob); err != nil { return nil, fmt.Errorf("s.selectQueueJSON rows.Scan: %w", err) diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go index e0fdbda5f..454fb49e1 100644 --- a/federationsender/storage/sqlite3/queue_pdus_table.go +++ b/federationsender/storage/sqlite3/queue_pdus_table.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" @@ -117,7 +118,7 @@ func (s *queuePDUsStatements) InsertQueuePDU( txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, - nid int64, + nid types.ContentNID, ) error { stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) _, err := stmt.ExecContext( @@ -132,7 +133,7 @@ func (s *queuePDUsStatements) InsertQueuePDU( func (s *queuePDUsStatements) DeleteQueuePDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, - jsonNIDs []int64, + jsonNIDs []types.ContentNID, ) error { deleteSQL := strings.Replace(deleteQueuePDUsSQL, "($2)", sqlutil.QueryVariadicOffset(len(jsonNIDs), 1), 1) deleteStmt, err := txn.Prepare(deleteSQL) @@ -164,9 +165,9 @@ func (s *queuePDUsStatements) SelectQueuePDUNextTransactionID( } func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount( - ctx context.Context, txn *sql.Tx, jsonNID int64, -) (int64, error) { - var count int64 + ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID, +) (types.ContentNID, error) { + var count types.ContentNID stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt) err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) if err == sql.ErrNoRows { @@ -177,8 +178,8 @@ func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount( func (s *queuePDUsStatements) SelectQueuePDUCount( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, -) (int64, error) { - var count int64 +) (types.ContentNID, error) { + var count types.ContentNID stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsCountStmt) err := stmt.QueryRowContext(ctx, serverName).Scan(&count) if err == sql.ErrNoRows { @@ -194,16 +195,16 @@ func (s *queuePDUsStatements) SelectQueuePDUs( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int, -) ([]int64, error) { +) ([]types.ContentNID, error) { stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt) rows, err := stmt.QueryContext(ctx, serverName, limit) if err != nil { return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed") - var result []int64 + var result []types.ContentNID for rows.Next() { - var nid int64 + var nid types.ContentNID if err = rows.Scan(&nid); err != nil { return nil, err } diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index e66d76909..4f663f645 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -21,7 +21,6 @@ import ( _ "github.com/mattn/go-sqlite3" "github.com/matrix-org/dendrite/federationsender/storage/shared" - "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" ) @@ -35,7 +34,7 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { var d Database var err error if d.db, err = sqlutil.Open(dbProperties); err != nil { @@ -68,7 +67,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS } d.Database = shared.Database{ DB: d.db, - Cache: cache, Writer: d.writer, FederationSenderJoinedHosts: joinedHosts, FederationSenderQueuePDUs: queuePDUs, diff --git a/federationsender/storage/storage.go b/federationsender/storage/storage.go index 5462c3523..f36138229 100644 --- a/federationsender/storage/storage.go +++ b/federationsender/storage/storage.go @@ -21,17 +21,16 @@ import ( "github.com/matrix-org/dendrite/federationsender/storage/postgres" "github.com/matrix-org/dendrite/federationsender/storage/sqlite3" - "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/setup/config" ) // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties, cache) + return sqlite3.NewDatabase(dbProperties) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(dbProperties, cache) + return postgres.NewDatabase(dbProperties) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index 69e952de2..63b5cfffc 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -23,27 +23,27 @@ import ( ) type FederationSenderQueuePDUs interface { - InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error - DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error - SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error) - SelectQueuePDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error) - SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error) + InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid types.ContentNID) error + DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []types.ContentNID) error + SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID) (types.ContentNID, error) + SelectQueuePDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (types.ContentNID, error) + SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]types.ContentNID, error) SelectQueuePDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error) } type FederationSenderQueueEDUs interface { - InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid int64) error - DeleteQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error - SelectQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error) - SelectQueueEDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error) - SelectQueueEDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error) + InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid types.ContentNID) error + DeleteQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []types.ContentNID) error + SelectQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]types.ContentNID, error) + SelectQueueEDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID) (types.ContentNID, error) + SelectQueueEDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (types.ContentNID, error) SelectQueueEDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error) } type FederationSenderQueueJSON interface { - InsertQueueJSON(ctx context.Context, txn *sql.Tx, json string) (int64, error) - DeleteQueueJSON(ctx context.Context, txn *sql.Tx, nids []int64) error - SelectQueueJSON(ctx context.Context, txn *sql.Tx, jsonNIDs []int64) (map[int64][]byte, error) + InsertQueueJSON(ctx context.Context, txn *sql.Tx, json string) (types.ContentNID, error) + DeleteQueueJSON(ctx context.Context, txn *sql.Tx, nids []types.ContentNID) error + SelectQueueJSON(ctx context.Context, txn *sql.Tx, jsonNIDs []types.ContentNID) (map[types.ContentNID][]byte, error) } type FederationSenderJoinedHosts interface { diff --git a/federationsender/types/types.go b/federationsender/types/types.go index 398d32677..29d4d5569 100644 --- a/federationsender/types/types.go +++ b/federationsender/types/types.go @@ -20,6 +20,8 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +type ContentNID int64 + // A JoinedHost is a server that is joined to a matrix room. type JoinedHost struct { // The MemberEventID of a m.room.member join event. diff --git a/internal/caching/cache_federationevents.go b/internal/caching/cache_federationevents.go deleted file mode 100644 index a48c11fd2..000000000 --- a/internal/caching/cache_federationevents.go +++ /dev/null @@ -1,67 +0,0 @@ -package caching - -import ( - "fmt" - - "github.com/matrix-org/gomatrixserverlib" -) - -const ( - FederationEventCacheName = "federation_event" - FederationEventCacheMaxEntries = 256 - FederationEventCacheMutable = true // to allow use of Unset only -) - -// FederationSenderCache contains the subset of functions needed for -// a federation event cache. -type FederationSenderCache interface { - GetFederationSenderQueuedPDU(eventNID int64) (event *gomatrixserverlib.HeaderedEvent, ok bool) - StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) - EvictFederationSenderQueuedPDU(eventNID int64) - - GetFederationSenderQueuedEDU(eventNID int64) (event *gomatrixserverlib.EDU, ok bool) - StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) - EvictFederationSenderQueuedEDU(eventNID int64) -} - -func (c Caches) GetFederationSenderQueuedPDU(eventNID int64) (*gomatrixserverlib.HeaderedEvent, bool) { - key := fmt.Sprintf("%d", eventNID) - val, found := c.FederationEvents.Get(key) - if found && val != nil { - if event, ok := val.(*gomatrixserverlib.HeaderedEvent); ok { - return event, true - } - } - return nil, false -} - -func (c Caches) StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) { - key := fmt.Sprintf("%d", eventNID) - c.FederationEvents.Set(key, event) -} - -func (c Caches) EvictFederationSenderQueuedPDU(eventNID int64) { - key := fmt.Sprintf("%d", eventNID) - c.FederationEvents.Unset(key) -} - -func (c Caches) GetFederationSenderQueuedEDU(eventNID int64) (*gomatrixserverlib.EDU, bool) { - key := fmt.Sprintf("%d", eventNID) - val, found := c.FederationEvents.Get(key) - if found && val != nil { - if event, ok := val.(*gomatrixserverlib.EDU); ok { - return event, true - } - } - return nil, false -} - -func (c Caches) StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) { - key := fmt.Sprintf("%d", eventNID) - c.FederationEvents.Set(key, event) -} - -func (c Caches) EvictFederationSenderQueuedEDU(eventNID int64) { - key := fmt.Sprintf("%d", eventNID) - c.FederationEvents.Unset(key) -} diff --git a/internal/caching/caches.go b/internal/caching/caches.go index e7b7f550d..655cc037c 100644 --- a/internal/caching/caches.go +++ b/internal/caching/caches.go @@ -10,7 +10,6 @@ type Caches struct { RoomServerEventTypeNIDs Cache // RoomServerNIDsCache RoomServerRoomNIDs Cache // RoomServerNIDsCache RoomServerRoomIDs Cache // RoomServerNIDsCache - FederationEvents Cache // FederationEventsCache } // Cache is the interface that an implementation must satisfy. diff --git a/internal/caching/impl_inmemorylru.go b/internal/caching/impl_inmemorylru.go index f05e8f3c6..e99c18d74 100644 --- a/internal/caching/impl_inmemorylru.go +++ b/internal/caching/impl_inmemorylru.go @@ -63,15 +63,6 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) { if err != nil { return nil, err } - federationEvents, err := NewInMemoryLRUCachePartition( - FederationEventCacheName, - FederationEventCacheMutable, - FederationEventCacheMaxEntries, - enablePrometheus, - ) - if err != nil { - return nil, err - } return &Caches{ RoomVersions: roomVersions, ServerKeys: serverKeys, @@ -79,7 +70,6 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) { RoomServerEventTypeNIDs: roomServerEventTypeNIDs, RoomServerRoomNIDs: roomServerRoomNIDs, RoomServerRoomIDs: roomServerRoomIDs, - FederationEvents: federationEvents, }, nil }