Integrate initial s&f interactions with db

This commit is contained in:
Devon Hudson 2022-11-23 11:21:07 -07:00
parent b237f2d62d
commit f218daaf64
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
14 changed files with 410 additions and 130 deletions

View file

@ -30,6 +30,12 @@ type FederationInternalAPI interface {
request *PerformBroadcastEDURequest, request *PerformBroadcastEDURequest,
response *PerformBroadcastEDUResponse, response *PerformBroadcastEDUResponse,
) error ) error
PerformStoreAsync(
ctx context.Context,
request *PerformStoreAsyncRequest,
response *PerformStoreAsyncResponse,
) error
} }
type ClientFederationAPI interface { type ClientFederationAPI interface {
@ -213,6 +219,14 @@ type PerformBroadcastEDURequest struct {
type PerformBroadcastEDUResponse struct { type PerformBroadcastEDUResponse struct {
} }
type PerformStoreAsyncRequest struct {
Txn gomatrixserverlib.Transaction `json:"transaction"`
UserID gomatrixserverlib.UserID `json:"user_id"`
}
type PerformStoreAsyncResponse struct {
}
type InputPublicKeysRequest struct { type InputPublicKeysRequest struct {
Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"` Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"`
} }

View file

@ -648,6 +648,27 @@ func (r *FederationInternalAPI) PerformBroadcastEDU(
return nil return nil
} }
// PerformStoreAsync implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformStoreAsync(
ctx context.Context,
request *api.PerformStoreAsyncRequest,
response *api.PerformStoreAsyncResponse,
) error {
receipt, err := r.db.StoreAsyncTransaction(ctx, request.Txn)
if err != nil {
return err
}
err = r.db.AssociateAsyncTransactionWithDestinations(
ctx,
map[gomatrixserverlib.UserID]struct{}{
request.UserID: {},
},
request.Txn.TransactionID,
receipt)
return err
}
func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverlib.ServerName) { func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverlib.ServerName) {
for _, srv := range destinations { for _, srv := range destinations {
_ = r.db.RemoveServerFromBlacklist(srv) _ = r.db.RemoveServerFromBlacklist(srv)

View file

@ -23,6 +23,7 @@ const (
FederationAPIPerformInviteRequestPath = "/federationapi/performInviteRequest" FederationAPIPerformInviteRequestPath = "/federationapi/performInviteRequest"
FederationAPIPerformOutboundPeekRequestPath = "/federationapi/performOutboundPeekRequest" FederationAPIPerformOutboundPeekRequestPath = "/federationapi/performOutboundPeekRequest"
FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU" FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU"
FederationAPIPerformStoreAsyncPath = "/federationapi/performStoreAsync"
FederationAPIGetUserDevicesPath = "/federationapi/client/getUserDevices" FederationAPIGetUserDevicesPath = "/federationapi/client/getUserDevices"
FederationAPIClaimKeysPath = "/federationapi/client/claimKeys" FederationAPIClaimKeysPath = "/federationapi/client/claimKeys"
@ -150,6 +151,17 @@ func (h *httpFederationInternalAPI) PerformBroadcastEDU(
) )
} }
func (h *httpFederationInternalAPI) PerformStoreAsync(
ctx context.Context,
request *api.PerformStoreAsyncRequest,
response *api.PerformStoreAsyncResponse,
) error {
return httputil.CallInternalRPCAPI(
"PerformStoreAsync", h.federationAPIURL+FederationAPIPerformStoreAsyncPath,
h.httpClient, ctx, request, response,
)
}
type getUserDevices struct { type getUserDevices struct {
S gomatrixserverlib.ServerName S gomatrixserverlib.ServerName
UserID string UserID string

View file

@ -43,6 +43,11 @@ func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) {
httputil.MakeInternalRPCAPI("FederationAPIPerformBroadcastEDU", intAPI.PerformBroadcastEDU), httputil.MakeInternalRPCAPI("FederationAPIPerformBroadcastEDU", intAPI.PerformBroadcastEDU),
) )
internalAPIMux.Handle(
FederationAPIPerformStoreAsyncPath,
httputil.MakeInternalRPCAPI("FederationAPIPerformStoreAsync", intAPI.PerformStoreAsync),
)
internalAPIMux.Handle( internalAPIMux.Handle(
FederationAPIPerformJoinRequestPath, FederationAPIPerformJoinRequestPath,
httputil.MakeInternalRPCAPI( httputil.MakeInternalRPCAPI(

View file

@ -1,8 +1,10 @@
package routing package routing
import ( import (
"encoding/json"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -13,12 +15,56 @@ func ForwardAsync(
httpReq *http.Request, httpReq *http.Request,
fedReq *gomatrixserverlib.FederationRequest, fedReq *gomatrixserverlib.FederationRequest,
fedAPI api.FederationInternalAPI, fedAPI api.FederationInternalAPI,
txnId gomatrixserverlib.TransactionID, txnID gomatrixserverlib.TransactionID,
userID gomatrixserverlib.UserID, userID gomatrixserverlib.UserID,
) util.JSONResponse { ) util.JSONResponse {
var txnEvents struct {
PDUs []json.RawMessage `json:"pdus"`
EDUs []gomatrixserverlib.EDU `json:"edus"`
}
// TODO: wrap in fedAPI call if err := json.Unmarshal(fedReq.Content(), &txnEvents); err != nil {
// fedAPI.db.AssociateAsyncTransactionWithDestinations(context.TODO(), userID, nil) println("The request body could not be decoded into valid JSON. " + err.Error())
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
}
}
// Transactions are limited in size; they can have at most 50 PDUs and 100 EDUs.
// https://matrix.org/docs/spec/server_server/latest#transactions
if len(txnEvents.PDUs) > 50 || len(txnEvents.EDUs) > 100 {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("max 50 pdus / 100 edus"),
}
}
t := gomatrixserverlib.Transaction{}
t.PDUs = txnEvents.PDUs
t.EDUs = txnEvents.EDUs
t.Origin = fedReq.Origin()
t.TransactionID = txnID
t.Destination = userID.Domain()
util.GetLogger(httpReq.Context()).Debugf("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, fedReq.Origin(), len(t.PDUs), len(t.EDUs))
req := api.PerformStoreAsyncRequest{
Txn: t,
UserID: userID,
}
res := api.PerformStoreAsyncResponse{}
err := fedAPI.PerformStoreAsync(httpReq.Context(), &req, &res)
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: jsonerror.BadJSON("could not store the transaction for forwarding"),
}
}
// Naming:
// mailServer? assign mailserver for user?
// configure my mailserver
// Homeserver, idendity server, mailserver... why not?
return util.JSONResponse{Code: 200} return util.JSONResponse{Code: 200}
} }

View file

@ -1,27 +1,146 @@
package routing_test package routing_test
import ( import (
// "context" "context"
"database/sql"
"encoding/json"
"fmt"
"net/http" "net/http"
"sync"
"testing" "testing"
"time"
"github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/federationapi/routing"
// "github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/federationapi/storage/shared"
// "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
const (
testOrigin = gomatrixserverlib.ServerName("kaer.morhen")
testDestination = gomatrixserverlib.ServerName("white.orchard")
)
type testDatabase struct {
nid int64
nidMutex sync.Mutex
transactions map[int64]json.RawMessage
associations map[gomatrixserverlib.ServerName][]int64
}
func createDatabase() *testDatabase {
return &testDatabase{
nid: 1,
nidMutex: sync.Mutex{},
transactions: make(map[int64]json.RawMessage),
associations: make(map[gomatrixserverlib.ServerName][]int64),
}
}
func (d *testDatabase) InsertQueueTransaction(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error {
if _, ok := d.associations[serverName]; !ok {
d.associations[serverName] = []int64{}
}
d.associations[serverName] = append(d.associations[serverName], nid)
return nil
}
func (d *testDatabase) DeleteQueueTransactions(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error {
for _, nid := range jsonNIDs {
for index, associatedNID := range d.associations[serverName] {
if associatedNID == nid {
d.associations[serverName] = append(d.associations[serverName][:index], d.associations[serverName][index+1:]...)
}
}
}
return nil
}
func (d *testDatabase) SelectQueueTransactions(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error) {
results := []int64{}
resultCount := limit
if limit > len(d.associations[serverName]) {
resultCount = len(d.associations[serverName])
}
for i := 0; i < resultCount; i++ {
results = append(results, d.associations[serverName][i])
}
return results, nil
}
func (d *testDatabase) SelectQueueTransactionCount(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (int64, error) {
return int64(len(d.associations[serverName])), nil
}
func (d *testDatabase) InsertTransactionJSON(ctx context.Context, txn *sql.Tx, json string) (int64, error) {
d.nidMutex.Lock()
defer d.nidMutex.Unlock()
nid := d.nid
d.transactions[nid] = []byte(json)
d.nid++
return nid, nil
}
func (d *testDatabase) DeleteTransactionJSON(ctx context.Context, txn *sql.Tx, nids []int64) error {
for _, nid := range nids {
delete(d.transactions, nid)
}
return nil
}
func (d *testDatabase) SelectTransactionJSON(ctx context.Context, txn *sql.Tx, jsonNIDs []int64) (map[int64][]byte, error) {
result := make(map[int64][]byte)
for _, nid := range jsonNIDs {
if transaction, ok := d.transactions[nid]; ok {
result[nid] = transaction
}
}
return result, nil
}
func createFederationRequest(userID gomatrixserverlib.UserID) (gomatrixserverlib.Transaction, gomatrixserverlib.FederationRequest) {
txn := gomatrixserverlib.Transaction{}
txn.PDUs = []json.RawMessage{
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":5,"event_id":"$gl2T9l3qm0kUbiIJ:kaer.morhen","hashes":{"sha256":"Qx3nRMHLDPSL5hBAzuX84FiSSP0K0Kju2iFoBWH4Za8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$UKNe10XzYzG0TeA9:kaer.morhen",{"sha256":"KtSRyMjt0ZSjsv2koixTRCxIRCGoOp6QrKscsW97XRo"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sqDgv3EG7ml5VREzmT9aZeBpS4gAPNIaIeJOwqjDhY0GPU/BcpX5wY4R7hYLrNe5cChgV+eFy/GWm1Zfg5FfDg"}},"type":"m.room.message"}`),
}
txn.Origin = testOrigin
txn.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano()))
txn.Destination = testDestination
var federationPathPrefixV1 = "/_matrix/federation/v1"
path := federationPathPrefixV1 + "/forward_async/" + string(txn.TransactionID) + "/" + userID.Raw()
request := gomatrixserverlib.NewFederationRequest("PUT", txn.Destination, path)
request.SetContent(txn)
return txn, request
}
func TestEmptyForwardReturnsOk(t *testing.T) { func TestEmptyForwardReturnsOk(t *testing.T) {
testDB := createDatabase()
db := shared.Database{
Writer: sqlutil.NewDummyWriter(),
FederationQueueTransactions: testDB,
FederationTransactionJSON: testDB,
}
httpReq := &http.Request{} httpReq := &http.Request{}
request := &gomatrixserverlib.FederationRequest{}
fedAPI := internal.FederationInternalAPI{}
userID, err := gomatrixserverlib.NewUserID("@local:domain", false) userID, err := gomatrixserverlib.NewUserID("@local:domain", false)
if err != nil { if err != nil {
t.Fatalf("Invalid userID: %s", err.Error()) t.Fatalf("Invalid userID: %s", err.Error())
} }
_, request := createFederationRequest(*userID)
response := routing.ForwardAsync(httpReq, request, &fedAPI, "1", *userID) fedAPI := internal.NewFederationInternalAPI(
&db, &config.FederationAPI{}, nil, nil, nil, nil, nil, nil,
)
response := routing.ForwardAsync(httpReq, &request, fedAPI, "1", *userID)
expected := 200 expected := 200
if response.Code != expected { if response.Code != expected {
@ -29,36 +148,45 @@ func TestEmptyForwardReturnsOk(t *testing.T) {
} }
} }
// func TestUniqueTransactionStoredInDatabase(t *testing.T) { func TestUniqueTransactionStoredInDatabase(t *testing.T) {
// db := shared.Database{} testDB := createDatabase()
// httpReq := &http.Request{} db := shared.Database{
// inputTransaction := gomatrixserverlib.Transaction{} Writer: sqlutil.NewDummyWriter(),
// request := &gomatrixserverlib.FederationRequest{} FederationQueueTransactions: testDB,
// fedAPI := internal.NewFederationInternalAPI( FederationTransactionJSON: testDB,
// &db, &config.FederationAPI{}, nil, nil, nil, nil, nil, nil, }
// )
// userID, err := gomatrixserverlib.NewUserID("@local:domain", false)
// if err != nil {
// t.Fatalf("Invalid userID: %s", err.Error())
// }
// response := routing.ForwardAsync(httpReq, request, fedAPI, "1", *userID) httpReq := &http.Request{}
// transaction, err := db.GetAsyncTransaction(context.TODO(), *userID) userID, err := gomatrixserverlib.NewUserID("@local:domain", false)
// transactionCount, err := db.GetAsyncTransactionCount(context.TODO(), *userID) if err != nil {
t.Fatalf("Invalid userID: %s", err.Error())
}
inputTransaction, request := createFederationRequest(*userID)
// expected := 200 fedAPI := internal.NewFederationInternalAPI(
// if response.Code != expected { &db, &config.FederationAPI{}, nil, nil, nil, nil, nil, nil,
// t.Fatalf("Expected Return Code: %v, Actual: %v", expected, response.Code) )
// }
// if transactionCount != 1 {
// t.Fatalf("Expected count of 1, Actual: %d", transactionCount)
// }
// if transaction.TransactionID != inputTransaction.TransactionID {
// t.Fatalf("Expected Transaction ID: %s, Actual: %s",
// inputTransaction.TransactionID, transaction.TransactionID)
// }
// }
// func TestDuplicateTransactionNotStoredInDatabase(t *testing.T) { response := routing.ForwardAsync(
httpReq, &request, fedAPI, inputTransaction.TransactionID, *userID)
transaction, err := db.GetAsyncTransaction(context.TODO(), *userID)
if err != nil {
t.Fatalf("Failed retrieving transaction: %s", err.Error())
}
transactionCount, err := db.GetAsyncTransactionCount(context.TODO(), *userID)
if err != nil {
t.Fatalf("Failed retrieving transaction count: %s", err.Error())
}
// } expected := 200
if response.Code != expected {
t.Fatalf("Expected Return Code: %v, Actual: %v", expected, response.Code)
}
if transactionCount != 1 {
t.Fatalf("Expected count of 1, Actual: %d", transactionCount)
}
if transaction.TransactionID != inputTransaction.TransactionID {
t.Fatalf("Expected Transaction ID: %s, Actual: %s",
inputTransaction.TransactionID, transaction.TransactionID)
}
}

View file

@ -133,6 +133,23 @@ func Setup(
}, },
)).Methods(http.MethodPut, http.MethodOptions) )).Methods(http.MethodPut, http.MethodOptions)
v1fedmux.Handle("/forward_async/{txnID}/{userID}", MakeFedAPI(
"federation_forward_async", "", cfg.Matrix.IsLocalServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
userID, err := gomatrixserverlib.NewUserID(vars["userID"], false)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidUsername("Username was invalid"),
}
}
return ForwardAsync(
httpReq, request, fsAPI, gomatrixserverlib.TransactionID(vars["txnID"]),
*userID,
)
},
)).Methods(http.MethodPut, http.MethodOptions)
v1fedmux.Handle("/invite/{roomID}/{eventID}", MakeFedAPI( v1fedmux.Handle("/invite/{roomID}/{eventID}", MakeFedAPI(
"federation_invite", cfg.Matrix.ServerName, cfg.Matrix.IsLocalServerName, keys, wakeup, "federation_invite", cfg.Matrix.ServerName, cfg.Matrix.IsLocalServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {

View file

@ -51,9 +51,10 @@ type Database interface {
GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
GetAsyncTransaction(ctx context.Context, userID gomatrixserverlib.UserID) (gomatrixserverlib.Transaction, error) StoreAsyncTransaction(ctx context.Context, txn gomatrixserverlib.Transaction) (*shared.Receipt, error)
GetAsyncTransaction(ctx context.Context, userID gomatrixserverlib.UserID) (*gomatrixserverlib.Transaction, error)
GetAsyncTransactionCount(ctx context.Context, userID gomatrixserverlib.UserID) (int64, error) GetAsyncTransactionCount(ctx context.Context, userID gomatrixserverlib.UserID) (int64, error)
AssociateAsyncTransactionWithDestinations(ctx context.Context, destinations map[gomatrixserverlib.UserID]struct{}, receipt *shared.Receipt) error AssociateAsyncTransactionWithDestinations(ctx context.Context, destinations map[gomatrixserverlib.UserID]struct{}, transactionID gomatrixserverlib.TransactionID, receipt *shared.Receipt) error
// these don't have contexts passed in as we want things to happen regardless of the request context // these don't have contexts passed in as we want things to happen regardless of the request context
AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error

View file

@ -54,10 +54,18 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
if err != nil { if err != nil {
return nil, err return nil, err
} }
queueTransactions, err := NewPostgresQueueTransactionsTable(d.db)
if err != nil {
return nil, err
}
queueJSON, err := NewPostgresQueueJSONTable(d.db) queueJSON, err := NewPostgresQueueJSONTable(d.db)
if err != nil { if err != nil {
return nil, err return nil, err
} }
transactionJSON, err := NewPostgresTransactionJSONTable(d.db)
if err != nil {
return nil, err
}
blacklist, err := NewPostgresBlacklistTable(d.db) blacklist, err := NewPostgresBlacklistTable(d.db)
if err != nil { if err != nil {
return nil, err return nil, err
@ -103,6 +111,8 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
FederationQueuePDUs: queuePDUs, FederationQueuePDUs: queuePDUs,
FederationQueueEDUs: queueEDUs, FederationQueueEDUs: queueEDUs,
FederationQueueJSON: queueJSON, FederationQueueJSON: queueJSON,
FederationQueueTransactions: queueTransactions,
FederationTransactionJSON: transactionJSON,
FederationBlacklist: blacklist, FederationBlacklist: blacklist,
FederationInboundPeeks: inboundPeeks, FederationInboundPeeks: inboundPeeks,
FederationOutboundPeeks: outboundPeeks, FederationOutboundPeeks: outboundPeeks,

View file

@ -17,6 +17,7 @@ package shared
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/json"
"fmt" "fmt"
"time" "time"
@ -27,11 +28,6 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
type transactionEntry struct {
transaction gomatrixserverlib.Transaction
userID []gomatrixserverlib.UserID
}
type Database struct { type Database struct {
DB *sql.DB DB *sql.DB
IsLocalServerName func(gomatrixserverlib.ServerName) bool IsLocalServerName func(gomatrixserverlib.ServerName) bool
@ -47,7 +43,8 @@ type Database struct {
NotaryServerKeysJSON tables.FederationNotaryServerKeysJSON NotaryServerKeysJSON tables.FederationNotaryServerKeysJSON
NotaryServerKeysMetadata tables.FederationNotaryServerKeysMetadata NotaryServerKeysMetadata tables.FederationNotaryServerKeysMetadata
ServerSigningKeys tables.FederationServerSigningKeys ServerSigningKeys tables.FederationServerSigningKeys
transactionDB map[Receipt]transactionEntry FederationQueueTransactions tables.FederationQueueTransactions
FederationTransactionJSON tables.FederationTransactionJSON
} }
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. // An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
@ -264,18 +261,43 @@ func (d *Database) GetNotaryKeys(
return sks, err return sks, err
} }
func (d *Database) StoreAsyncTransaction(
ctx context.Context, txn gomatrixserverlib.Transaction,
) (*Receipt, error) {
var err error
json, err := json.Marshal(txn)
if err != nil {
return nil, fmt.Errorf("d.JSONUnmarshall: %w", err)
}
var nid int64
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nid, err = d.FederationTransactionJSON.InsertTransactionJSON(ctx, txn, string(json))
return err
})
if err != nil {
return nil, fmt.Errorf("d.insertTransactionJSON: %w", err)
}
return &Receipt{
nid: nid,
}, nil
}
func (d *Database) AssociateAsyncTransactionWithDestinations( func (d *Database) AssociateAsyncTransactionWithDestinations(
ctx context.Context, ctx context.Context,
destinations map[gomatrixserverlib.UserID]struct{}, destinations map[gomatrixserverlib.UserID]struct{},
transactionID gomatrixserverlib.TransactionID,
receipt *Receipt, receipt *Receipt,
) error { ) error {
if transaction, ok := d.transactionDB[*receipt]; ok { for destination := range destinations {
for k := range destinations { err := d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
transaction.userID = append(transaction.userID, k) err := d.FederationQueueTransactions.InsertQueueTransaction(
ctx, txn, transactionID, destination.Domain(), receipt.nid)
return err
})
if err != nil {
return fmt.Errorf("d.insertQueueTransaction: %w", err)
} }
d.transactionDB[*receipt] = transaction
} else {
return fmt.Errorf("No transactions exist with that NID")
} }
return nil return nil
@ -284,21 +306,33 @@ func (d *Database) AssociateAsyncTransactionWithDestinations(
func (d *Database) GetAsyncTransaction( func (d *Database) GetAsyncTransaction(
ctx context.Context, ctx context.Context,
userID gomatrixserverlib.UserID, userID gomatrixserverlib.UserID,
) (gomatrixserverlib.Transaction, error) { ) (*gomatrixserverlib.Transaction, error) {
return gomatrixserverlib.Transaction{}, nil nids, err := d.FederationQueueTransactions.SelectQueueTransactions(ctx, nil, userID.Domain(), 1)
if err != nil {
return nil, fmt.Errorf("d.SelectQueueTransaction: %w", err)
}
txn, err := d.FederationTransactionJSON.SelectTransactionJSON(ctx, nil, nids)
if err != nil {
return nil, fmt.Errorf("d.SelectTransactionJSON: %w", err)
}
transaction := &gomatrixserverlib.Transaction{}
err = json.Unmarshal(txn[nids[0]], transaction)
if err != nil {
return nil, fmt.Errorf("Unmarshall transaction: %w", err)
}
return transaction, nil
} }
func (d *Database) GetAsyncTransactionCount( func (d *Database) GetAsyncTransactionCount(
ctx context.Context, ctx context.Context,
userID gomatrixserverlib.UserID, userID gomatrixserverlib.UserID,
) (int64, error) { ) (int64, error) {
count := int64(0) count, err := d.FederationQueueTransactions.SelectQueueTransactionCount(ctx, nil, userID.Domain())
for _, transaction := range d.transactionDB { if err != nil {
for _, user := range transaction.userID { return 0, fmt.Errorf("d.SelectQueueTransactionCount: %w", err)
if user == userID {
count++
}
}
} }
return count, nil return count, nil
} }

View file

@ -1,11 +1,5 @@
// Copyright 2017-2018 New Vector Ltd // Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C. // Copyright 2019-2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// //
// Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
@ -57,6 +51,14 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
if err != nil { if err != nil {
return nil, err return nil, err
} }
queueTransactions, err := NewSQLiteQueueTransactionsTable(d.db)
if err != nil {
return nil, err
}
transactionJSON, err := NewSQLiteTransactionJSONTable(d.db)
if err != nil {
return nil, err
}
blacklist, err := NewSQLiteBlacklistTable(d.db) blacklist, err := NewSQLiteBlacklistTable(d.db)
if err != nil { if err != nil {
return nil, err return nil, err
@ -102,6 +104,8 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions,
FederationQueuePDUs: queuePDUs, FederationQueuePDUs: queuePDUs,
FederationQueueEDUs: queueEDUs, FederationQueueEDUs: queueEDUs,
FederationQueueJSON: queueJSON, FederationQueueJSON: queueJSON,
FederationQueueTransactions: queueTransactions,
FederationTransactionJSON: transactionJSON,
FederationBlacklist: blacklist, FederationBlacklist: blacklist,
FederationOutboundPeeks: outboundPeeks, FederationOutboundPeeks: outboundPeeks,
FederationInboundPeeks: inboundPeeks, FederationInboundPeeks: inboundPeeks,

View file

@ -23,7 +23,7 @@ const (
testDestination = gomatrixserverlib.ServerName("white.orchard") testDestination = gomatrixserverlib.ServerName("white.orchard")
) )
func mustCreateTransaction(userID gomatrixserverlib.UserID) gomatrixserverlib.Transaction { func mustCreateTransaction() gomatrixserverlib.Transaction {
txn := gomatrixserverlib.Transaction{} txn := gomatrixserverlib.Transaction{}
txn.PDUs = []json.RawMessage{ txn.PDUs = []json.RawMessage{
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":5,"event_id":"$gl2T9l3qm0kUbiIJ:kaer.morhen","hashes":{"sha256":"Qx3nRMHLDPSL5hBAzuX84FiSSP0K0Kju2iFoBWH4Za8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$UKNe10XzYzG0TeA9:kaer.morhen",{"sha256":"KtSRyMjt0ZSjsv2koixTRCxIRCGoOp6QrKscsW97XRo"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sqDgv3EG7ml5VREzmT9aZeBpS4gAPNIaIeJOwqjDhY0GPU/BcpX5wY4R7hYLrNe5cChgV+eFy/GWm1Zfg5FfDg"}},"type":"m.room.message"}`), []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":5,"event_id":"$gl2T9l3qm0kUbiIJ:kaer.morhen","hashes":{"sha256":"Qx3nRMHLDPSL5hBAzuX84FiSSP0K0Kju2iFoBWH4Za8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$UKNe10XzYzG0TeA9:kaer.morhen",{"sha256":"KtSRyMjt0ZSjsv2koixTRCxIRCGoOp6QrKscsW97XRo"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sqDgv3EG7ml5VREzmT9aZeBpS4gAPNIaIeJOwqjDhY0GPU/BcpX5wY4R7hYLrNe5cChgV+eFy/GWm1Zfg5FfDg"}},"type":"m.room.message"}`),
@ -73,11 +73,7 @@ func TestShoudInsertTransaction(t *testing.T) {
db, close := mustCreateTransactionJSONTable(t, dbType) db, close := mustCreateTransactionJSONTable(t, dbType)
defer close() defer close()
userID, err := gomatrixserverlib.NewUserID("@local:domain", false) transaction := mustCreateTransaction()
if err != nil {
t.Fatalf("Invalid userID: %s", err.Error())
}
transaction := mustCreateTransaction(*userID)
tx, err := json.Marshal(transaction) tx, err := json.Marshal(transaction)
if err != nil { if err != nil {
t.Fatalf("Invalid transaction: %s", err.Error()) t.Fatalf("Invalid transaction: %s", err.Error())
@ -96,11 +92,7 @@ func TestShouldRetrieveInsertedTransaction(t *testing.T) {
db, close := mustCreateTransactionJSONTable(t, dbType) db, close := mustCreateTransactionJSONTable(t, dbType)
defer close() defer close()
userID, err := gomatrixserverlib.NewUserID("@local:domain", false) transaction := mustCreateTransaction()
if err != nil {
t.Fatalf("Invalid userID: %s", err.Error())
}
transaction := mustCreateTransaction(*userID)
tx, err := json.Marshal(transaction) tx, err := json.Marshal(transaction)
if err != nil { if err != nil {
t.Fatalf("Invalid transaction: %s", err.Error()) t.Fatalf("Invalid transaction: %s", err.Error())
@ -135,11 +127,7 @@ func TestShouldDeleteTransaction(t *testing.T) {
db, close := mustCreateTransactionJSONTable(t, dbType) db, close := mustCreateTransactionJSONTable(t, dbType)
defer close() defer close()
userID, err := gomatrixserverlib.NewUserID("@local:domain", false) transaction := mustCreateTransaction()
if err != nil {
t.Fatalf("Invalid userID: %s", err.Error())
}
transaction := mustCreateTransaction(*userID)
tx, err := json.Marshal(transaction) tx, err := json.Marshal(transaction)
if err != nil { if err != nil {
t.Fatalf("Invalid transaction: %s", err.Error()) t.Fatalf("Invalid transaction: %s", err.Error())

2
go.mod
View file

@ -143,4 +143,4 @@ require (
go 1.18 go 1.18
replace github.com/matrix-org/gomatrixserverlib => ../../gomatrixserverlib/mailbox replace github.com/matrix-org/gomatrixserverlib => github.com/matrix-org/gomatrixserverlib v0.0.0-20221110204444-22af9cae40c5

4
go.sum
View file

@ -348,8 +348,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U=
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20221101165746-0e4a8bb6db7e h1:6I34fdyiHMRCxL6GOb/G8ZyI1WWlb6ZxCF2hIGSMSCc= github.com/matrix-org/gomatrixserverlib v0.0.0-20221110204444-22af9cae40c5 h1:06o3BPKc0CeYK6rOn/tzP9SZKTDAE2zF4AmWmZei1CU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20221101165746-0e4a8bb6db7e/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4= github.com/matrix-org/gomatrixserverlib v0.0.0-20221110204444-22af9cae40c5/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4=
github.com/matrix-org/pinecone v0.0.0-20221103125849-37f2e9b9ba37 h1:CQWFrgH9TJOU2f2qCDhGwaSdAnmgSu3/f+2xcf/Fse4= github.com/matrix-org/pinecone v0.0.0-20221103125849-37f2e9b9ba37 h1:CQWFrgH9TJOU2f2qCDhGwaSdAnmgSu3/f+2xcf/Fse4=
github.com/matrix-org/pinecone v0.0.0-20221103125849-37f2e9b9ba37/go.mod h1:F3GHppRuHCTDeoOmmgjZMeJdbql91+RSGGsATWfC7oc= github.com/matrix-org/pinecone v0.0.0-20221103125849-37f2e9b9ba37/go.mod h1:F3GHppRuHCTDeoOmmgjZMeJdbql91+RSGGsATWfC7oc=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=