Add initial api for getting async_events

This commit is contained in:
Devon Hudson 2022-12-02 11:48:55 -07:00
parent 9dbfcd9e06
commit bfa784b224
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
8 changed files with 132 additions and 4 deletions

View file

@ -36,6 +36,11 @@ type FederationInternalAPI interface {
request *PerformStoreAsyncRequest,
response *PerformStoreAsyncResponse,
) error
QueryAsyncTransactions(
ctx context.Context,
request *QueryAsyncTransactionsRequest,
response *QueryAsyncTransactionsResponse,
) error
}
type ClientFederationAPI interface {
@ -228,6 +233,15 @@ type PerformStoreAsyncRequest struct {
type PerformStoreAsyncResponse struct {
}
type QueryAsyncTransactionsRequest struct {
UserID gomatrixserverlib.UserID `json:"user_id"`
}
type QueryAsyncTransactionsResponse struct {
Txn gomatrixserverlib.Transaction `json:"transaction"`
RemainingCount uint32 `json:"remaining"`
}
type InputPublicKeysRequest struct {
Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"`
}

View file

@ -694,6 +694,15 @@ func (r *FederationInternalAPI) PerformStoreAsync(
return err
}
// QueryAsyncTransactions implements api.FederationInternalAPI
func (r *FederationInternalAPI) QueryAsyncTransactions(
ctx context.Context,
request *api.QueryAsyncTransactionsRequest,
response *api.QueryAsyncTransactionsResponse,
) error {
return nil
}
func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverlib.ServerName) {
for _, srv := range destinations {
_ = r.db.RemoveServerFromBlacklist(srv)

View file

@ -24,6 +24,7 @@ const (
FederationAPIPerformOutboundPeekRequestPath = "/federationapi/performOutboundPeekRequest"
FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU"
FederationAPIPerformStoreAsyncPath = "/federationapi/performStoreAsync"
FederationAPIQueryAsyncTransactionsPath = "/federationapi/queryAsyncTransactions"
FederationAPIGetUserDevicesPath = "/federationapi/client/getUserDevices"
FederationAPIClaimKeysPath = "/federationapi/client/claimKeys"
@ -162,6 +163,17 @@ func (h *httpFederationInternalAPI) PerformStoreAsync(
)
}
func (h *httpFederationInternalAPI) QueryAsyncTransactions(
ctx context.Context,
request *api.QueryAsyncTransactionsRequest,
response *api.QueryAsyncTransactionsResponse,
) error {
return httputil.CallInternalRPCAPI(
"QueryAsyncTransactions", h.federationAPIURL+FederationAPIQueryAsyncTransactionsPath,
h.httpClient, ctx, request, response,
)
}
type getUserDevices struct {
S gomatrixserverlib.ServerName
UserID string

View file

@ -48,6 +48,11 @@ func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) {
httputil.MakeInternalRPCAPI("FederationAPIPerformStoreAsync", intAPI.PerformStoreAsync),
)
internalAPIMux.Handle(
FederationAPIQueryAsyncTransactionsPath,
httputil.MakeInternalRPCAPI("FederationAPIQueryAsyncTransactions", intAPI.QueryAsyncTransactions),
)
internalAPIMux.Handle(
FederationAPIPerformJoinRequestPath,
httputil.MakeInternalRPCAPI(

View file

@ -391,9 +391,9 @@ func (oq *destinationQueue) nextTransaction(
mailservers := oq.statistics.KnownMailservers()
if oq.statistics.AssumedOffline() && len(mailservers) > 0 {
// TODO : how to pass through actual userID here?!?!?!?!
userID, err := gomatrixserverlib.NewUserID("@user:"+string(oq.origin), false)
if err != nil {
return err, false
userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.origin), false)
if userErr != nil {
return userErr, false
}
for _, mailserver := range mailservers {
_, asyncErr := oq.client.SendAsyncTransaction(ctx, *userID, t, mailserver)

View file

@ -0,0 +1,37 @@
package routing
import (
"net/http"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
type AsyncEventsResponse struct {
Transaction gomatrixserverlib.Transaction `json:"transaction"`
Remaining uint32 `json:"remaining"`
}
// GetAsyncEvents implements /_matrix/federation/v1/async_events/{userID}
func GetAsyncEvents(
httpReq *http.Request,
fedAPI api.FederationInternalAPI,
userID gomatrixserverlib.UserID,
) util.JSONResponse {
var response api.QueryAsyncTransactionsResponse
err := fedAPI.QueryAsyncTransactions(httpReq.Context(), &api.QueryAsyncTransactionsRequest{UserID: userID}, &response)
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: AsyncEventsResponse{
Transaction: gomatrixserverlib.Transaction{},
Remaining: response.RemainingCount,
},
}
}

View file

@ -0,0 +1,46 @@
package routing_test
import (
"context"
"net/http"
"testing"
"github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/federationapi/storage/shared"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/stretchr/testify/assert"
)
func TestGetAsyncEmptyDatabaseReturnsNothing(t *testing.T) {
testDB := createDatabase()
db := shared.Database{
Writer: sqlutil.NewDummyWriter(),
FederationQueueTransactions: testDB,
FederationTransactionJSON: testDB,
}
httpReq := &http.Request{}
userID, err := gomatrixserverlib.NewUserID("@local:domain", false)
if err != nil {
t.Fatalf("Invalid userID: %s", err.Error())
}
transaction := createTransaction()
_, err = db.StoreAsyncTransaction(context.Background(), transaction)
if err != nil {
t.Fatalf("Failed to store transaction: %s", err.Error())
}
fedAPI := internal.NewFederationInternalAPI(
&db, &config.FederationAPI{}, nil, nil, nil, nil, nil, nil,
)
response := routing.GetAsyncEvents(httpReq, fedAPI, *userID)
assert.Equal(t, response.Code, http.StatusOK)
jsonResponse := response.JSON.(routing.AsyncEventsResponse)
assert.Equal(t, jsonResponse.Remaining, uint32(0))
assert.Equal(t, jsonResponse.Transaction, gomatrixserverlib.Transaction{})
}

View file

@ -106,7 +106,7 @@ func (d *testDatabase) SelectTransactionJSON(ctx context.Context, txn *sql.Tx, j
return result, nil
}
func createFederationRequest(userID gomatrixserverlib.UserID) (gomatrixserverlib.Transaction, gomatrixserverlib.FederationRequest) {
func createTransaction() gomatrixserverlib.Transaction {
txn := gomatrixserverlib.Transaction{}
txn.PDUs = []json.RawMessage{
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":5,"event_id":"$gl2T9l3qm0kUbiIJ:kaer.morhen","hashes":{"sha256":"Qx3nRMHLDPSL5hBAzuX84FiSSP0K0Kju2iFoBWH4Za8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$UKNe10XzYzG0TeA9:kaer.morhen",{"sha256":"KtSRyMjt0ZSjsv2koixTRCxIRCGoOp6QrKscsW97XRo"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sqDgv3EG7ml5VREzmT9aZeBpS4gAPNIaIeJOwqjDhY0GPU/BcpX5wY4R7hYLrNe5cChgV+eFy/GWm1Zfg5FfDg"}},"type":"m.room.message"}`),
@ -114,6 +114,11 @@ func createFederationRequest(userID gomatrixserverlib.UserID) (gomatrixserverlib
txn.Origin = testOrigin
txn.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano()))
txn.Destination = testDestination
return txn
}
func createFederationRequest(userID gomatrixserverlib.UserID) (gomatrixserverlib.Transaction, gomatrixserverlib.FederationRequest) {
txn := createTransaction()
var federationPathPrefixV1 = "/_matrix/federation/v1"
path := federationPathPrefixV1 + "/forward_async/" + string(txn.TransactionID) + "/" + userID.Raw()
request := gomatrixserverlib.NewFederationRequest("PUT", txn.Destination, path)