Export NID instead of receipts, remove federation event cache as no longer needed

This commit is contained in:
Neil Alexander 2020-12-08 16:29:13 +00:00
parent 1d868763f8
commit de9520b408
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
21 changed files with 166 additions and 271 deletions

View file

@ -46,7 +46,7 @@ func NewInternalAPI(
) api.FederationSenderInternalAPI { ) api.FederationSenderInternalAPI {
cfg := &base.Cfg.FederationSender cfg := &base.Cfg.FederationSender
federationSenderDB, err := storage.NewDatabase(&cfg.Database, base.Caches) federationSenderDB, err := storage.NewDatabase(&cfg.Database)
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to connect to federation sender db") logrus.WithError(err).Panic("failed to connect to federation sender db")
} }

View file

@ -23,7 +23,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/statistics"
"github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/storage/shared" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -67,7 +67,7 @@ type destinationQueue struct {
// Send event adds the event to the pending queue for the destination. // Send event adds the event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to // If the queue is empty then it starts a background goroutine to
// start sending events to that destination. // start sending events to that destination.
func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, receipt *shared.Receipt) { func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, nid types.ContentNID) {
if event == nil { if event == nil {
log.Errorf("attempt to send nil PDU with destination %q", oq.destination) log.Errorf("attempt to send nil PDU with destination %q", oq.destination)
return return
@ -79,7 +79,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
context.TODO(), context.TODO(),
"", // TODO: remove this, as we don't need to persist the transaction ID "", // TODO: remove this, as we don't need to persist the transaction ID
oq.destination, // the destination server name oq.destination, // the destination server name
receipt, // NIDs from federationsender_queue_json table nid, // NIDs from federationsender_queue_json table
); err != nil { ); err != nil {
log.WithError(err).Errorf("failed to associate PDU %q with destination %q", event.EventID(), oq.destination) log.WithError(err).Errorf("failed to associate PDU %q with destination %q", event.EventID(), oq.destination)
return return
@ -93,7 +93,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
if len(oq.pendingPDUs) < maxPDUsInMemory { if len(oq.pendingPDUs) < maxPDUsInMemory {
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{ oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{
pdu: event, pdu: event,
receipt: receipt, nid: nid,
}) })
} else { } else {
oq.overflowed.Store(true) oq.overflowed.Store(true)
@ -111,7 +111,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
// sendEDU adds the EDU event to the pending queue for the destination. // sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to // If the queue is empty then it starts a background goroutine to
// start sending events to that destination. // start sending events to that destination.
func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *shared.Receipt) { func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, nid types.ContentNID) {
if event == nil { if event == nil {
log.Errorf("attempt to send nil EDU with destination %q", oq.destination) log.Errorf("attempt to send nil EDU with destination %q", oq.destination)
return return
@ -122,7 +122,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
if err := oq.db.AssociateEDUWithDestination( if err := oq.db.AssociateEDUWithDestination(
context.TODO(), context.TODO(),
oq.destination, // the destination server name oq.destination, // the destination server name
receipt, // NIDs from federationsender_queue_json table nid, // NIDs from federationsender_queue_json table
); err != nil { ); err != nil {
log.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination) log.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination)
return return
@ -136,7 +136,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
if len(oq.pendingEDUs) < maxEDUsInMemory { if len(oq.pendingEDUs) < maxEDUsInMemory {
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{ oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{
edu: event, edu: event,
receipt: receipt, nid: nid,
}) })
} else { } else {
oq.overflowed.Store(true) oq.overflowed.Store(true)
@ -183,20 +183,20 @@ func (oq *destinationQueue) getPendingFromDatabase() {
// have cached. We will index them based on the receipt, // have cached. We will index them based on the receipt,
// which ultimately just contains the index of the PDU/EDU // which ultimately just contains the index of the PDU/EDU
// in the database. // in the database.
gotPDUs := map[string]struct{}{} gotPDUs := map[types.ContentNID]struct{}{}
gotEDUs := map[string]struct{}{} gotEDUs := map[types.ContentNID]struct{}{}
for _, pdu := range oq.pendingPDUs { for _, pdu := range oq.pendingPDUs {
gotPDUs[pdu.receipt.String()] = struct{}{} gotPDUs[pdu.nid] = struct{}{}
} }
for _, edu := range oq.pendingEDUs { for _, edu := range oq.pendingEDUs {
gotEDUs[edu.receipt.String()] = struct{}{} gotEDUs[edu.nid] = struct{}{}
} }
if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 { if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 {
// We have room in memory for some PDUs - let's request no more than that. // We have room in memory for some PDUs - let's request no more than that.
if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil { if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil {
for receipt, pdu := range pdus { for receipt, pdu := range pdus {
if _, ok := gotPDUs[receipt.String()]; ok { if _, ok := gotPDUs[receipt]; ok {
continue continue
} }
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu}) oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu})
@ -210,7 +210,7 @@ func (oq *destinationQueue) getPendingFromDatabase() {
// We have room in memory for some EDUs - let's request no more than that. // We have room in memory for some EDUs - let's request no more than that.
if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil { if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil {
for receipt, edu := range edus { for receipt, edu := range edus {
if _, ok := gotEDUs[receipt.String()]; ok { if _, ok := gotEDUs[receipt]; ok {
continue continue
} }
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu}) oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu})
@ -375,8 +375,8 @@ func (oq *destinationQueue) nextTransaction(
return false, 0, 0, nil return false, 0, 0, nil
} }
var pduReceipts []*shared.Receipt var pduReceipts []types.ContentNID
var eduReceipts []*shared.Receipt var eduReceipts []types.ContentNID
// Go through PDUs that we retrieved from the database, if any, // Go through PDUs that we retrieved from the database, if any,
// and add them into the transaction. // and add them into the transaction.
@ -387,7 +387,7 @@ func (oq *destinationQueue) nextTransaction(
// Append the JSON of the event, since this is a json.RawMessage type in the // Append the JSON of the event, since this is a json.RawMessage type in the
// gomatrixserverlib.Transaction struct // gomatrixserverlib.Transaction struct
t.PDUs = append(t.PDUs, pdu.pdu.JSON()) t.PDUs = append(t.PDUs, pdu.pdu.JSON())
pduReceipts = append(pduReceipts, pdu.receipt) pduReceipts = append(pduReceipts, pdu.nid)
} }
// Do the same for pending EDUS in the queue. // Do the same for pending EDUS in the queue.
@ -396,7 +396,7 @@ func (oq *destinationQueue) nextTransaction(
continue continue
} }
t.EDUs = append(t.EDUs, *edu.edu) t.EDUs = append(t.EDUs, *edu.edu)
eduReceipts = append(eduReceipts, edu.receipt) eduReceipts = append(eduReceipts, edu.nid)
} }
logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))

View file

@ -24,7 +24,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/statistics"
"github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/storage/shared" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -102,12 +102,12 @@ type SigningInfo struct {
} }
type queuedPDU struct { type queuedPDU struct {
receipt *shared.Receipt nid types.ContentNID
pdu *gomatrixserverlib.HeaderedEvent pdu *gomatrixserverlib.HeaderedEvent
} }
type queuedEDU struct { type queuedEDU struct {
receipt *shared.Receipt nid types.ContentNID
edu *gomatrixserverlib.EDU edu *gomatrixserverlib.EDU
} }

View file

@ -17,7 +17,6 @@ package storage
import ( import (
"context" "context"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -34,19 +33,19 @@ type Database interface {
GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
PurgeRoomState(ctx context.Context, roomID string) error PurgeRoomState(ctx context.Context, roomID string) error
StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) StoreJSON(ctx context.Context, js string) (types.ContentNID, error)
GetPendingPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (pdus map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent, err error) GetPendingPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (pdus map[types.ContentNID]*gomatrixserverlib.HeaderedEvent, err error)
GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[*shared.Receipt]*gomatrixserverlib.EDU, err error) GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[types.ContentNID]*gomatrixserverlib.EDU, err error)
AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid types.ContentNID) error
AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, receipt *shared.Receipt) error AssociateEDUWithDestination(ctx context.Context, serverName gomatrixserverlib.ServerName, nid types.ContentNID) error
CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, nids []types.ContentNID) error
CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, nids []types.ContentNID) error
GetPendingPDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error) GetPendingPDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (types.ContentNID, error)
GetPendingEDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error) GetPendingEDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (types.ContentNID, error)
GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)

View file

@ -19,6 +19,7 @@ import (
"database/sql" "database/sql"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -105,7 +106,7 @@ func (s *queueEDUsStatements) InsertQueueEDU(
txn *sql.Tx, txn *sql.Tx,
eduType string, eduType string,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
nid int64, nid types.ContentNID,
) error { ) error {
stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt)
_, err := stmt.ExecContext( _, err := stmt.ExecContext(
@ -120,10 +121,14 @@ func (s *queueEDUsStatements) InsertQueueEDU(
func (s *queueEDUsStatements) DeleteQueueEDUs( func (s *queueEDUsStatements) DeleteQueueEDUs(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
jsonNIDs []int64, jsonNIDs []types.ContentNID,
) error { ) error {
stmt := sqlutil.TxStmt(txn, s.deleteQueueEDUStmt) stmt := sqlutil.TxStmt(txn, s.deleteQueueEDUStmt)
_, err := stmt.ExecContext(ctx, serverName, pq.Int64Array(jsonNIDs)) nids := make([]int64, 0, len(jsonNIDs))
for _, n := range jsonNIDs {
nids = append(nids, int64(n))
}
_, err := stmt.ExecContext(ctx, serverName, pq.Int64Array(nids))
return err return err
} }
@ -131,16 +136,16 @@ func (s *queueEDUsStatements) SelectQueueEDUs(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) ([]int64, error) { ) ([]types.ContentNID, error) {
stmt := sqlutil.TxStmt(txn, s.selectQueueEDUStmt) stmt := sqlutil.TxStmt(txn, s.selectQueueEDUStmt)
rows, err := stmt.QueryContext(ctx, serverName, limit) rows, err := stmt.QueryContext(ctx, serverName, limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed")
var result []int64 var result []types.ContentNID
for rows.Next() { for rows.Next() {
var nid int64 var nid types.ContentNID
if err = rows.Scan(&nid); err != nil { if err = rows.Scan(&nid); err != nil {
return nil, err return nil, err
} }
@ -150,9 +155,9 @@ func (s *queueEDUsStatements) SelectQueueEDUs(
} }
func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount( func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount(
ctx context.Context, txn *sql.Tx, jsonNID int64, ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID,
) (int64, error) { ) (types.ContentNID, error) {
var count int64 var count types.ContentNID
stmt := sqlutil.TxStmt(txn, s.selectQueueEDUReferenceJSONCountStmt) stmt := sqlutil.TxStmt(txn, s.selectQueueEDUReferenceJSONCountStmt)
err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
@ -163,8 +168,8 @@ func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount(
func (s *queueEDUsStatements) SelectQueueEDUCount( func (s *queueEDUsStatements) SelectQueueEDUCount(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) (int64, error) { ) (types.ContentNID, error) {
var count int64 var count types.ContentNID
stmt := sqlutil.TxStmt(txn, s.selectQueueEDUCountStmt) stmt := sqlutil.TxStmt(txn, s.selectQueueEDUCountStmt)
err := stmt.QueryRowContext(ctx, serverName).Scan(&count) err := stmt.QueryRowContext(ctx, serverName).Scan(&count)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {

View file

@ -19,6 +19,7 @@ import (
"database/sql" "database/sql"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
) )
@ -76,9 +77,9 @@ func NewPostgresQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) {
func (s *queueJSONStatements) InsertQueueJSON( func (s *queueJSONStatements) InsertQueueJSON(
ctx context.Context, txn *sql.Tx, json string, ctx context.Context, txn *sql.Tx, json string,
) (int64, error) { ) (types.ContentNID, error) {
stmt := sqlutil.TxStmt(txn, s.insertJSONStmt) stmt := sqlutil.TxStmt(txn, s.insertJSONStmt)
var lastid int64 var lastid types.ContentNID
if err := stmt.QueryRowContext(ctx, json).Scan(&lastid); err != nil { if err := stmt.QueryRowContext(ctx, json).Scan(&lastid); err != nil {
return 0, err return 0, err
} }
@ -86,25 +87,33 @@ func (s *queueJSONStatements) InsertQueueJSON(
} }
func (s *queueJSONStatements) DeleteQueueJSON( func (s *queueJSONStatements) DeleteQueueJSON(
ctx context.Context, txn *sql.Tx, nids []int64, ctx context.Context, txn *sql.Tx, jsonNIDs []types.ContentNID,
) error { ) error {
stmt := sqlutil.TxStmt(txn, s.deleteJSONStmt) stmt := sqlutil.TxStmt(txn, s.deleteJSONStmt)
nids := make([]int64, 0, len(jsonNIDs))
for _, n := range jsonNIDs {
nids = append(nids, int64(n))
}
_, err := stmt.ExecContext(ctx, pq.Int64Array(nids)) _, err := stmt.ExecContext(ctx, pq.Int64Array(nids))
return err return err
} }
func (s *queueJSONStatements) SelectQueueJSON( func (s *queueJSONStatements) SelectQueueJSON(
ctx context.Context, txn *sql.Tx, jsonNIDs []int64, ctx context.Context, txn *sql.Tx, jsonNIDs []types.ContentNID,
) (map[int64][]byte, error) { ) (map[types.ContentNID][]byte, error) {
blobs := map[int64][]byte{} blobs := map[types.ContentNID][]byte{}
stmt := sqlutil.TxStmt(txn, s.selectJSONStmt) stmt := sqlutil.TxStmt(txn, s.selectJSONStmt)
rows, err := stmt.QueryContext(ctx, pq.Int64Array(jsonNIDs)) nids := make([]int64, 0, len(jsonNIDs))
for _, n := range jsonNIDs {
nids = append(nids, int64(n))
}
rows, err := stmt.QueryContext(ctx, pq.Int64Array(nids))
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed")
for rows.Next() { for rows.Next() {
var nid int64 var nid types.ContentNID
var blob []byte var blob []byte
if err = rows.Scan(&nid, &blob); err != nil { if err = rows.Scan(&nid, &blob); err != nil {
return nil, err return nil, err

View file

@ -19,6 +19,7 @@ import (
"database/sql" "database/sql"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -105,7 +106,7 @@ func (s *queuePDUsStatements) InsertQueuePDU(
txn *sql.Tx, txn *sql.Tx,
transactionID gomatrixserverlib.TransactionID, transactionID gomatrixserverlib.TransactionID,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
nid int64, nid types.ContentNID,
) error { ) error {
stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt)
_, err := stmt.ExecContext( _, err := stmt.ExecContext(
@ -120,17 +121,21 @@ func (s *queuePDUsStatements) InsertQueuePDU(
func (s *queuePDUsStatements) DeleteQueuePDUs( func (s *queuePDUsStatements) DeleteQueuePDUs(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
jsonNIDs []int64, jsonNIDs []types.ContentNID,
) error { ) error {
stmt := sqlutil.TxStmt(txn, s.deleteQueuePDUsStmt) stmt := sqlutil.TxStmt(txn, s.deleteQueuePDUsStmt)
_, err := stmt.ExecContext(ctx, serverName, pq.Int64Array(jsonNIDs)) nids := make([]int64, 0, len(jsonNIDs))
for _, n := range jsonNIDs {
nids = append(nids, int64(n))
}
_, err := stmt.ExecContext(ctx, serverName, pq.Int64Array(nids))
return err return err
} }
func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount( func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount(
ctx context.Context, txn *sql.Tx, jsonNID int64, ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID,
) (int64, error) { ) (types.ContentNID, error) {
var count int64 var count types.ContentNID
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUReferenceJSONCountStmt) stmt := sqlutil.TxStmt(txn, s.selectQueuePDUReferenceJSONCountStmt)
err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
@ -144,8 +149,8 @@ func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount(
func (s *queuePDUsStatements) SelectQueuePDUCount( func (s *queuePDUsStatements) SelectQueuePDUCount(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) (int64, error) { ) (types.ContentNID, error) {
var count int64 var count types.ContentNID
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsCountStmt) stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsCountStmt)
err := stmt.QueryRowContext(ctx, serverName).Scan(&count) err := stmt.QueryRowContext(ctx, serverName).Scan(&count)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
@ -161,16 +166,16 @@ func (s *queuePDUsStatements) SelectQueuePDUs(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) ([]int64, error) { ) ([]types.ContentNID, error) {
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt) stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt)
rows, err := stmt.QueryContext(ctx, serverName, limit) rows, err := stmt.QueryContext(ctx, serverName, limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed")
var result []int64 var result []types.ContentNID
for rows.Next() { for rows.Next() {
var nid int64 var nid types.ContentNID
if err = rows.Scan(&nid); err != nil { if err = rows.Scan(&nid); err != nil {
return nil, err return nil, err
} }

View file

@ -19,7 +19,6 @@ import (
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/federationsender/storage/shared" "github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
) )
@ -33,7 +32,7 @@ type Database struct {
} }
// NewDatabase opens a new database // NewDatabase opens a new database
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) { func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
var d Database var d Database
var err error var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil { if d.db, err = sqlutil.Open(dbProperties); err != nil {
@ -66,7 +65,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
} }
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
Cache: cache,
Writer: d.writer, Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts, FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs, FederationSenderQueuePDUs: queuePDUs,

View file

@ -21,14 +21,12 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage/tables" "github.com/matrix-org/dendrite/federationsender/storage/tables"
"github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
type Database struct { type Database struct {
DB *sql.DB DB *sql.DB
Cache caching.FederationSenderCache
Writer sqlutil.Writer Writer sqlutil.Writer
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
@ -38,18 +36,6 @@ type Database struct {
FederationSenderBlacklist tables.FederationSenderBlacklist FederationSenderBlacklist tables.FederationSenderBlacklist
} }
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
// We don't actually export the NIDs but we need the caller to be able
// to pass them back so that we can clean up if the transaction sends
// successfully.
type Receipt struct {
nid int64
}
func (r *Receipt) String() string {
return fmt.Sprintf("%d", r.nid)
}
// UpdateRoom updates the joined hosts for a room and returns what the joined // UpdateRoom updates the joined hosts for a room and returns what the joined
// hosts were before the update, or nil if this was a duplicate message. // hosts were before the update, or nil if this was a duplicate message.
// This is called when we receive a message from kafka, so we pass in // This is called when we receive a message from kafka, so we pass in
@ -129,19 +115,17 @@ func (d *Database) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string)
// metadata entries. // metadata entries.
func (d *Database) StoreJSON( func (d *Database) StoreJSON(
ctx context.Context, js string, ctx context.Context, js string,
) (*Receipt, error) { ) (types.ContentNID, error) {
var nid int64 var nid types.ContentNID
var err error var err error
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { _ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nid, err = d.FederationSenderQueueJSON.InsertQueueJSON(ctx, txn, js) nid, err = d.FederationSenderQueueJSON.InsertQueueJSON(ctx, txn, js)
return err return err
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("d.insertQueueJSON: %w", err) return 0, fmt.Errorf("d.insertQueueJSON: %w", err)
} }
return &Receipt{ return nid, nil
nid: nid,
}, nil
} }
func (d *Database) PurgeRoomState( func (d *Database) PurgeRoomState(

View file

@ -21,6 +21,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -30,7 +31,7 @@ import (
func (d *Database) AssociateEDUWithDestination( func (d *Database) AssociateEDUWithDestination(
ctx context.Context, ctx context.Context,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
receipt *Receipt, nid types.ContentNID,
) error { ) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if err := d.FederationSenderQueueEDUs.InsertQueueEDU( if err := d.FederationSenderQueueEDUs.InsertQueueEDU(
@ -38,7 +39,7 @@ func (d *Database) AssociateEDUWithDestination(
txn, // SQL transaction txn, // SQL transaction
"", // TODO: EDU type for coalescing "", // TODO: EDU type for coalescing
serverName, // destination server name serverName, // destination server name
receipt.nid, // NID from the federationsender_queue_json table nid, // NID from the federationsender_queue_json table
); err != nil { ); err != nil {
return fmt.Errorf("InsertQueueEDU: %w", err) return fmt.Errorf("InsertQueueEDU: %w", err)
} }
@ -53,26 +54,17 @@ func (d *Database) GetPendingEDUs(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) ( ) (
edus map[*Receipt]*gomatrixserverlib.EDU, edus map[types.ContentNID]*gomatrixserverlib.EDU,
err error, err error,
) { ) {
edus = make(map[*Receipt]*gomatrixserverlib.EDU) edus = make(map[types.ContentNID]*gomatrixserverlib.EDU)
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit) nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueueEDUs: %w", err) return fmt.Errorf("SelectQueueEDUs: %w", err)
} }
retrieve := make([]int64, 0, len(nids)) blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids)
for _, nid := range nids {
if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
edus[&Receipt{nid}] = edu
} else {
retrieve = append(retrieve, nid)
}
}
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueueJSON: %w", err) return fmt.Errorf("SelectQueueJSON: %w", err)
} }
@ -82,7 +74,7 @@ func (d *Database) GetPendingEDUs(
if err := json.Unmarshal(blob, &event); err != nil { if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err) return fmt.Errorf("json.Unmarshal: %w", err)
} }
edus[&Receipt{nid}] = &event edus[nid] = &event
} }
return nil return nil
@ -95,15 +87,10 @@ func (d *Database) GetPendingEDUs(
func (d *Database) CleanEDUs( func (d *Database) CleanEDUs(
ctx context.Context, ctx context.Context,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
receipts []*Receipt, nids []types.ContentNID,
) error { ) error {
if len(receipts) == 0 { if len(nids) == 0 {
return errors.New("expected receipt") return errors.New("expected one or more NIDs")
}
nids := make([]int64, len(receipts))
for i := range receipts {
nids[i] = receipts[i].nid
} }
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
@ -111,7 +98,7 @@ func (d *Database) CleanEDUs(
return err return err
} }
var deleteNIDs []int64 var deleteNIDs []types.ContentNID
for _, nid := range nids { for _, nid := range nids {
count, err := d.FederationSenderQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid) count, err := d.FederationSenderQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid)
if err != nil { if err != nil {
@ -119,7 +106,6 @@ func (d *Database) CleanEDUs(
} }
if count == 0 { if count == 0 {
deleteNIDs = append(deleteNIDs, nid) deleteNIDs = append(deleteNIDs, nid)
d.Cache.EvictFederationSenderQueuedEDU(nid)
} }
} }
@ -138,7 +124,7 @@ func (d *Database) CleanEDUs(
func (d *Database) GetPendingEDUCount( func (d *Database) GetPendingEDUCount(
ctx context.Context, ctx context.Context,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
) (int64, error) { ) (types.ContentNID, error) {
return d.FederationSenderQueueEDUs.SelectQueueEDUCount(ctx, nil, serverName) return d.FederationSenderQueueEDUs.SelectQueueEDUCount(ctx, nil, serverName)
} }

View file

@ -21,6 +21,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -31,7 +32,7 @@ func (d *Database) AssociatePDUWithDestination(
ctx context.Context, ctx context.Context,
transactionID gomatrixserverlib.TransactionID, transactionID gomatrixserverlib.TransactionID,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
receipt *Receipt, nid types.ContentNID,
) error { ) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if err := d.FederationSenderQueuePDUs.InsertQueuePDU( if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
@ -39,7 +40,7 @@ func (d *Database) AssociatePDUWithDestination(
txn, // SQL transaction txn, // SQL transaction
transactionID, // transaction ID transactionID, // transaction ID
serverName, // destination server name serverName, // destination server name
receipt.nid, // NID from the federationsender_queue_json table nid, // NID from the federationsender_queue_json table
); err != nil { ); err != nil {
return fmt.Errorf("InsertQueuePDU: %w", err) return fmt.Errorf("InsertQueuePDU: %w", err)
} }
@ -54,7 +55,7 @@ func (d *Database) GetPendingPDUs(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) ( ) (
events map[*Receipt]*gomatrixserverlib.HeaderedEvent, events map[types.ContentNID]*gomatrixserverlib.HeaderedEvent,
err error, err error,
) { ) {
// Strictly speaking this doesn't need to be using the writer // Strictly speaking this doesn't need to be using the writer
@ -62,23 +63,14 @@ func (d *Database) GetPendingPDUs(
// a guarantee of transactional isolation, it's actually useful // a guarantee of transactional isolation, it's actually useful
// to know in SQLite mode that nothing else is trying to modify // to know in SQLite mode that nothing else is trying to modify
// the database. // the database.
events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent) events = make(map[types.ContentNID]*gomatrixserverlib.HeaderedEvent)
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit) nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueuePDUs: %w", err) return fmt.Errorf("SelectQueuePDUs: %w", err)
} }
retrieve := make([]int64, 0, len(nids)) blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, nids)
for _, nid := range nids {
if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
events[&Receipt{nid}] = event
} else {
retrieve = append(retrieve, nid)
}
}
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueueJSON: %w", err) return fmt.Errorf("SelectQueueJSON: %w", err)
} }
@ -88,8 +80,7 @@ func (d *Database) GetPendingPDUs(
if err := json.Unmarshal(blob, &event); err != nil { if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err) return fmt.Errorf("json.Unmarshal: %w", err)
} }
events[&Receipt{nid}] = &event events[nid] = &event
d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
} }
return nil return nil
@ -103,15 +94,10 @@ func (d *Database) GetPendingPDUs(
func (d *Database) CleanPDUs( func (d *Database) CleanPDUs(
ctx context.Context, ctx context.Context,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
receipts []*Receipt, nids []types.ContentNID,
) error { ) error {
if len(receipts) == 0 { if len(nids) == 0 {
return errors.New("expected receipt") return errors.New("expected one or more ContentNIDs")
}
nids := make([]int64, len(receipts))
for i := range receipts {
nids[i] = receipts[i].nid
} }
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
@ -119,7 +105,7 @@ func (d *Database) CleanPDUs(
return err return err
} }
var deleteNIDs []int64 var deleteNIDs []types.ContentNID
for _, nid := range nids { for _, nid := range nids {
count, err := d.FederationSenderQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid) count, err := d.FederationSenderQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid)
if err != nil { if err != nil {
@ -127,7 +113,6 @@ func (d *Database) CleanPDUs(
} }
if count == 0 { if count == 0 {
deleteNIDs = append(deleteNIDs, nid) deleteNIDs = append(deleteNIDs, nid)
d.Cache.EvictFederationSenderQueuedPDU(nid)
} }
} }
@ -146,7 +131,7 @@ func (d *Database) CleanPDUs(
func (d *Database) GetPendingPDUCount( func (d *Database) GetPendingPDUCount(
ctx context.Context, ctx context.Context,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
) (int64, error) { ) (types.ContentNID, error) {
return d.FederationSenderQueuePDUs.SelectQueuePDUCount(ctx, nil, serverName) return d.FederationSenderQueuePDUs.SelectQueuePDUCount(ctx, nil, serverName)
} }

View file

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -102,7 +103,7 @@ func (s *queueEDUsStatements) InsertQueueEDU(
txn *sql.Tx, txn *sql.Tx,
eduType string, eduType string,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
nid int64, nid types.ContentNID,
) error { ) error {
stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt)
_, err := stmt.ExecContext( _, err := stmt.ExecContext(
@ -117,7 +118,7 @@ func (s *queueEDUsStatements) InsertQueueEDU(
func (s *queueEDUsStatements) DeleteQueueEDUs( func (s *queueEDUsStatements) DeleteQueueEDUs(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
jsonNIDs []int64, jsonNIDs []types.ContentNID,
) error { ) error {
deleteSQL := strings.Replace(deleteQueueEDUsSQL, "($2)", sqlutil.QueryVariadicOffset(len(jsonNIDs), 1), 1) deleteSQL := strings.Replace(deleteQueueEDUsSQL, "($2)", sqlutil.QueryVariadicOffset(len(jsonNIDs), 1), 1)
deleteStmt, err := txn.Prepare(deleteSQL) deleteStmt, err := txn.Prepare(deleteSQL)
@ -140,16 +141,16 @@ func (s *queueEDUsStatements) SelectQueueEDUs(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) ([]int64, error) { ) ([]types.ContentNID, error) {
stmt := sqlutil.TxStmt(txn, s.selectQueueEDUStmt) stmt := sqlutil.TxStmt(txn, s.selectQueueEDUStmt)
rows, err := stmt.QueryContext(ctx, serverName, limit) rows, err := stmt.QueryContext(ctx, serverName, limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed")
var result []int64 var result []types.ContentNID
for rows.Next() { for rows.Next() {
var nid int64 var nid types.ContentNID
if err = rows.Scan(&nid); err != nil { if err = rows.Scan(&nid); err != nil {
return nil, err return nil, err
} }
@ -159,9 +160,9 @@ func (s *queueEDUsStatements) SelectQueueEDUs(
} }
func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount( func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount(
ctx context.Context, txn *sql.Tx, jsonNID int64, ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID,
) (int64, error) { ) (types.ContentNID, error) {
var count int64 var count types.ContentNID
stmt := sqlutil.TxStmt(txn, s.selectQueueEDUReferenceJSONCountStmt) stmt := sqlutil.TxStmt(txn, s.selectQueueEDUReferenceJSONCountStmt)
err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
@ -172,8 +173,8 @@ func (s *queueEDUsStatements) SelectQueueEDUReferenceJSONCount(
func (s *queueEDUsStatements) SelectQueueEDUCount( func (s *queueEDUsStatements) SelectQueueEDUCount(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) (int64, error) { ) (types.ContentNID, error) {
var count int64 var count types.ContentNID
stmt := sqlutil.TxStmt(txn, s.selectQueueEDUCountStmt) stmt := sqlutil.TxStmt(txn, s.selectQueueEDUCountStmt)
err := stmt.QueryRowContext(ctx, serverName).Scan(&count) err := stmt.QueryRowContext(ctx, serverName).Scan(&count)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {

View file

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
) )
@ -71,21 +72,21 @@ func NewSQLiteQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) {
func (s *queueJSONStatements) InsertQueueJSON( func (s *queueJSONStatements) InsertQueueJSON(
ctx context.Context, txn *sql.Tx, json string, ctx context.Context, txn *sql.Tx, json string,
) (lastid int64, err error) { ) (types.ContentNID, error) {
stmt := sqlutil.TxStmt(txn, s.insertJSONStmt) stmt := sqlutil.TxStmt(txn, s.insertJSONStmt)
res, err := stmt.ExecContext(ctx, json) res, err := stmt.ExecContext(ctx, json)
if err != nil { if err != nil {
return 0, fmt.Errorf("stmt.QueryContext: %w", err) return 0, fmt.Errorf("stmt.QueryContext: %w", err)
} }
lastid, err = res.LastInsertId() lastid, err := res.LastInsertId()
if err != nil { if err != nil {
return 0, fmt.Errorf("res.LastInsertId: %w", err) return 0, fmt.Errorf("res.LastInsertId: %w", err)
} }
return return types.ContentNID(lastid), nil
} }
func (s *queueJSONStatements) DeleteQueueJSON( func (s *queueJSONStatements) DeleteQueueJSON(
ctx context.Context, txn *sql.Tx, nids []int64, ctx context.Context, txn *sql.Tx, nids []types.ContentNID,
) error { ) error {
deleteSQL := strings.Replace(deleteJSONSQL, "($1)", sqlutil.QueryVariadic(len(nids)), 1) deleteSQL := strings.Replace(deleteJSONSQL, "($1)", sqlutil.QueryVariadic(len(nids)), 1)
deleteStmt, err := txn.Prepare(deleteSQL) deleteStmt, err := txn.Prepare(deleteSQL)
@ -104,8 +105,8 @@ func (s *queueJSONStatements) DeleteQueueJSON(
} }
func (s *queueJSONStatements) SelectQueueJSON( func (s *queueJSONStatements) SelectQueueJSON(
ctx context.Context, txn *sql.Tx, jsonNIDs []int64, ctx context.Context, txn *sql.Tx, jsonNIDs []types.ContentNID,
) (map[int64][]byte, error) { ) (map[types.ContentNID][]byte, error) {
selectSQL := strings.Replace(selectJSONSQL, "($1)", sqlutil.QueryVariadic(len(jsonNIDs)), 1) selectSQL := strings.Replace(selectJSONSQL, "($1)", sqlutil.QueryVariadic(len(jsonNIDs)), 1)
selectStmt, err := txn.Prepare(selectSQL) selectStmt, err := txn.Prepare(selectSQL)
if err != nil { if err != nil {
@ -117,7 +118,7 @@ func (s *queueJSONStatements) SelectQueueJSON(
iNIDs[k] = v iNIDs[k] = v
} }
blobs := map[int64][]byte{} blobs := map[types.ContentNID][]byte{}
stmt := sqlutil.TxStmt(txn, selectStmt) stmt := sqlutil.TxStmt(txn, selectStmt)
rows, err := stmt.QueryContext(ctx, iNIDs...) rows, err := stmt.QueryContext(ctx, iNIDs...)
if err != nil { if err != nil {
@ -125,7 +126,7 @@ func (s *queueJSONStatements) SelectQueueJSON(
} }
defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed")
for rows.Next() { for rows.Next() {
var nid int64 var nid types.ContentNID
var blob []byte var blob []byte
if err = rows.Scan(&nid, &blob); err != nil { if err = rows.Scan(&nid, &blob); err != nil {
return nil, fmt.Errorf("s.selectQueueJSON rows.Scan: %w", err) return nil, fmt.Errorf("s.selectQueueJSON rows.Scan: %w", err)

View file

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -117,7 +118,7 @@ func (s *queuePDUsStatements) InsertQueuePDU(
txn *sql.Tx, txn *sql.Tx,
transactionID gomatrixserverlib.TransactionID, transactionID gomatrixserverlib.TransactionID,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
nid int64, nid types.ContentNID,
) error { ) error {
stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt)
_, err := stmt.ExecContext( _, err := stmt.ExecContext(
@ -132,7 +133,7 @@ func (s *queuePDUsStatements) InsertQueuePDU(
func (s *queuePDUsStatements) DeleteQueuePDUs( func (s *queuePDUsStatements) DeleteQueuePDUs(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
jsonNIDs []int64, jsonNIDs []types.ContentNID,
) error { ) error {
deleteSQL := strings.Replace(deleteQueuePDUsSQL, "($2)", sqlutil.QueryVariadicOffset(len(jsonNIDs), 1), 1) deleteSQL := strings.Replace(deleteQueuePDUsSQL, "($2)", sqlutil.QueryVariadicOffset(len(jsonNIDs), 1), 1)
deleteStmt, err := txn.Prepare(deleteSQL) deleteStmt, err := txn.Prepare(deleteSQL)
@ -164,9 +165,9 @@ func (s *queuePDUsStatements) SelectQueuePDUNextTransactionID(
} }
func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount( func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount(
ctx context.Context, txn *sql.Tx, jsonNID int64, ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID,
) (int64, error) { ) (types.ContentNID, error) {
var count int64 var count types.ContentNID
stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt) stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt)
err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
@ -177,8 +178,8 @@ func (s *queuePDUsStatements) SelectQueuePDUReferenceJSONCount(
func (s *queuePDUsStatements) SelectQueuePDUCount( func (s *queuePDUsStatements) SelectQueuePDUCount(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) (int64, error) { ) (types.ContentNID, error) {
var count int64 var count types.ContentNID
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsCountStmt) stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsCountStmt)
err := stmt.QueryRowContext(ctx, serverName).Scan(&count) err := stmt.QueryRowContext(ctx, serverName).Scan(&count)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
@ -194,16 +195,16 @@ func (s *queuePDUsStatements) SelectQueuePDUs(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) ([]int64, error) { ) ([]types.ContentNID, error) {
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt) stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsStmt)
rows, err := stmt.QueryContext(ctx, serverName, limit) rows, err := stmt.QueryContext(ctx, serverName, limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed")
var result []int64 var result []types.ContentNID
for rows.Next() { for rows.Next() {
var nid int64 var nid types.ContentNID
if err = rows.Scan(&nid); err != nil { if err = rows.Scan(&nid); err != nil {
return nil, err return nil, err
} }

View file

@ -21,7 +21,6 @@ import (
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/federationsender/storage/shared" "github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
) )
@ -35,7 +34,7 @@ type Database struct {
} }
// NewDatabase opens a new database // NewDatabase opens a new database
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) { func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
var d Database var d Database
var err error var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil { if d.db, err = sqlutil.Open(dbProperties); err != nil {
@ -68,7 +67,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
} }
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
Cache: cache,
Writer: d.writer, Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts, FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs, FederationSenderQueuePDUs: queuePDUs,

View file

@ -21,17 +21,16 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage/postgres" "github.com/matrix-org/dendrite/federationsender/storage/postgres"
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3" "github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
) )
// NewDatabase opens a new database // NewDatabase opens a new database
func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) { func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
switch { switch {
case dbProperties.ConnectionString.IsSQLite(): case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(dbProperties, cache) return sqlite3.NewDatabase(dbProperties)
case dbProperties.ConnectionString.IsPostgres(): case dbProperties.ConnectionString.IsPostgres():
return postgres.NewDatabase(dbProperties, cache) return postgres.NewDatabase(dbProperties)
default: default:
return nil, fmt.Errorf("unexpected database type") return nil, fmt.Errorf("unexpected database type")
} }

View file

@ -23,27 +23,27 @@ import (
) )
type FederationSenderQueuePDUs interface { type FederationSenderQueuePDUs interface {
InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid types.ContentNID) error
DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []types.ContentNID) error
SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error) SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID) (types.ContentNID, error)
SelectQueuePDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error) SelectQueuePDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (types.ContentNID, error)
SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error) SelectQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]types.ContentNID, error)
SelectQueuePDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error) SelectQueuePDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error)
} }
type FederationSenderQueueEDUs interface { type FederationSenderQueueEDUs interface {
InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid int64) error InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid types.ContentNID) error
DeleteQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error DeleteQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []types.ContentNID) error
SelectQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error) SelectQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]types.ContentNID, error)
SelectQueueEDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error) SelectQueueEDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID types.ContentNID) (types.ContentNID, error)
SelectQueueEDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error) SelectQueueEDUCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (types.ContentNID, error)
SelectQueueEDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error) SelectQueueEDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error)
} }
type FederationSenderQueueJSON interface { type FederationSenderQueueJSON interface {
InsertQueueJSON(ctx context.Context, txn *sql.Tx, json string) (int64, error) InsertQueueJSON(ctx context.Context, txn *sql.Tx, json string) (types.ContentNID, error)
DeleteQueueJSON(ctx context.Context, txn *sql.Tx, nids []int64) error DeleteQueueJSON(ctx context.Context, txn *sql.Tx, nids []types.ContentNID) error
SelectQueueJSON(ctx context.Context, txn *sql.Tx, jsonNIDs []int64) (map[int64][]byte, error) SelectQueueJSON(ctx context.Context, txn *sql.Tx, jsonNIDs []types.ContentNID) (map[types.ContentNID][]byte, error)
} }
type FederationSenderJoinedHosts interface { type FederationSenderJoinedHosts interface {

View file

@ -20,6 +20,8 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
type ContentNID int64
// A JoinedHost is a server that is joined to a matrix room. // A JoinedHost is a server that is joined to a matrix room.
type JoinedHost struct { type JoinedHost struct {
// The MemberEventID of a m.room.member join event. // The MemberEventID of a m.room.member join event.

View file

@ -1,67 +0,0 @@
package caching
import (
"fmt"
"github.com/matrix-org/gomatrixserverlib"
)
const (
FederationEventCacheName = "federation_event"
FederationEventCacheMaxEntries = 256
FederationEventCacheMutable = true // to allow use of Unset only
)
// FederationSenderCache contains the subset of functions needed for
// a federation event cache.
type FederationSenderCache interface {
GetFederationSenderQueuedPDU(eventNID int64) (event *gomatrixserverlib.HeaderedEvent, ok bool)
StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent)
EvictFederationSenderQueuedPDU(eventNID int64)
GetFederationSenderQueuedEDU(eventNID int64) (event *gomatrixserverlib.EDU, ok bool)
StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU)
EvictFederationSenderQueuedEDU(eventNID int64)
}
func (c Caches) GetFederationSenderQueuedPDU(eventNID int64) (*gomatrixserverlib.HeaderedEvent, bool) {
key := fmt.Sprintf("%d", eventNID)
val, found := c.FederationEvents.Get(key)
if found && val != nil {
if event, ok := val.(*gomatrixserverlib.HeaderedEvent); ok {
return event, true
}
}
return nil, false
}
func (c Caches) StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) {
key := fmt.Sprintf("%d", eventNID)
c.FederationEvents.Set(key, event)
}
func (c Caches) EvictFederationSenderQueuedPDU(eventNID int64) {
key := fmt.Sprintf("%d", eventNID)
c.FederationEvents.Unset(key)
}
func (c Caches) GetFederationSenderQueuedEDU(eventNID int64) (*gomatrixserverlib.EDU, bool) {
key := fmt.Sprintf("%d", eventNID)
val, found := c.FederationEvents.Get(key)
if found && val != nil {
if event, ok := val.(*gomatrixserverlib.EDU); ok {
return event, true
}
}
return nil, false
}
func (c Caches) StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) {
key := fmt.Sprintf("%d", eventNID)
c.FederationEvents.Set(key, event)
}
func (c Caches) EvictFederationSenderQueuedEDU(eventNID int64) {
key := fmt.Sprintf("%d", eventNID)
c.FederationEvents.Unset(key)
}

View file

@ -10,7 +10,6 @@ type Caches struct {
RoomServerEventTypeNIDs Cache // RoomServerNIDsCache RoomServerEventTypeNIDs Cache // RoomServerNIDsCache
RoomServerRoomNIDs Cache // RoomServerNIDsCache RoomServerRoomNIDs Cache // RoomServerNIDsCache
RoomServerRoomIDs Cache // RoomServerNIDsCache RoomServerRoomIDs Cache // RoomServerNIDsCache
FederationEvents Cache // FederationEventsCache
} }
// Cache is the interface that an implementation must satisfy. // Cache is the interface that an implementation must satisfy.

View file

@ -63,15 +63,6 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
federationEvents, err := NewInMemoryLRUCachePartition(
FederationEventCacheName,
FederationEventCacheMutable,
FederationEventCacheMaxEntries,
enablePrometheus,
)
if err != nil {
return nil, err
}
return &Caches{ return &Caches{
RoomVersions: roomVersions, RoomVersions: roomVersions,
ServerKeys: serverKeys, ServerKeys: serverKeys,
@ -79,7 +70,6 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
RoomServerEventTypeNIDs: roomServerEventTypeNIDs, RoomServerEventTypeNIDs: roomServerEventTypeNIDs,
RoomServerRoomNIDs: roomServerRoomNIDs, RoomServerRoomNIDs: roomServerRoomNIDs,
RoomServerRoomIDs: roomServerRoomIDs, RoomServerRoomIDs: roomServerRoomIDs,
FederationEvents: federationEvents,
}, nil }, nil
} }