diff --git a/federationapi/api/api.go b/federationapi/api/api.go index f6ac8c552..9e651a963 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -36,6 +36,11 @@ type FederationInternalAPI interface { request *PerformStoreAsyncRequest, response *PerformStoreAsyncResponse, ) error + QueryAsyncTransactions( + ctx context.Context, + request *QueryAsyncTransactionsRequest, + response *QueryAsyncTransactionsResponse, + ) error } type ClientFederationAPI interface { @@ -228,6 +233,15 @@ type PerformStoreAsyncRequest struct { type PerformStoreAsyncResponse struct { } +type QueryAsyncTransactionsRequest struct { + UserID gomatrixserverlib.UserID `json:"user_id"` +} + +type QueryAsyncTransactionsResponse struct { + Txn gomatrixserverlib.Transaction `json:"transaction"` + RemainingCount uint32 `json:"remaining"` +} + type InputPublicKeysRequest struct { Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"` } diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index d808c4f10..a46c41054 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -694,6 +694,15 @@ func (r *FederationInternalAPI) PerformStoreAsync( return err } +// QueryAsyncTransactions implements api.FederationInternalAPI +func (r *FederationInternalAPI) QueryAsyncTransactions( + ctx context.Context, + request *api.QueryAsyncTransactionsRequest, + response *api.QueryAsyncTransactionsResponse, +) error { + return nil +} + func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverlib.ServerName) { for _, srv := range destinations { _ = r.db.RemoveServerFromBlacklist(srv) diff --git a/federationapi/inthttp/client.go b/federationapi/inthttp/client.go index 9fdb30cef..326b87325 100644 --- a/federationapi/inthttp/client.go +++ b/federationapi/inthttp/client.go @@ -24,6 +24,7 @@ const ( FederationAPIPerformOutboundPeekRequestPath = "/federationapi/performOutboundPeekRequest" FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU" FederationAPIPerformStoreAsyncPath = "/federationapi/performStoreAsync" + FederationAPIQueryAsyncTransactionsPath = "/federationapi/queryAsyncTransactions" FederationAPIGetUserDevicesPath = "/federationapi/client/getUserDevices" FederationAPIClaimKeysPath = "/federationapi/client/claimKeys" @@ -162,6 +163,17 @@ func (h *httpFederationInternalAPI) PerformStoreAsync( ) } +func (h *httpFederationInternalAPI) QueryAsyncTransactions( + ctx context.Context, + request *api.QueryAsyncTransactionsRequest, + response *api.QueryAsyncTransactionsResponse, +) error { + return httputil.CallInternalRPCAPI( + "QueryAsyncTransactions", h.federationAPIURL+FederationAPIQueryAsyncTransactionsPath, + h.httpClient, ctx, request, response, + ) +} + type getUserDevices struct { S gomatrixserverlib.ServerName UserID string diff --git a/federationapi/inthttp/server.go b/federationapi/inthttp/server.go index cc55fab64..beccaea7f 100644 --- a/federationapi/inthttp/server.go +++ b/federationapi/inthttp/server.go @@ -48,6 +48,11 @@ func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) { httputil.MakeInternalRPCAPI("FederationAPIPerformStoreAsync", intAPI.PerformStoreAsync), ) + internalAPIMux.Handle( + FederationAPIQueryAsyncTransactionsPath, + httputil.MakeInternalRPCAPI("FederationAPIQueryAsyncTransactions", intAPI.QueryAsyncTransactions), + ) + internalAPIMux.Handle( FederationAPIPerformJoinRequestPath, httputil.MakeInternalRPCAPI( diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index a16a58b59..0a4dba6b1 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -391,9 +391,9 @@ func (oq *destinationQueue) nextTransaction( mailservers := oq.statistics.KnownMailservers() if oq.statistics.AssumedOffline() && len(mailservers) > 0 { // TODO : how to pass through actual userID here?!?!?!?! - userID, err := gomatrixserverlib.NewUserID("@user:"+string(oq.origin), false) - if err != nil { - return err, false + userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.origin), false) + if userErr != nil { + return userErr, false } for _, mailserver := range mailservers { _, asyncErr := oq.client.SendAsyncTransaction(ctx, *userID, t, mailserver) diff --git a/federationapi/routing/asyncevents.go b/federationapi/routing/asyncevents.go new file mode 100644 index 000000000..0348160ff --- /dev/null +++ b/federationapi/routing/asyncevents.go @@ -0,0 +1,37 @@ +package routing + +import ( + "net/http" + + "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +type AsyncEventsResponse struct { + Transaction gomatrixserverlib.Transaction `json:"transaction"` + Remaining uint32 `json:"remaining"` +} + +// GetAsyncEvents implements /_matrix/federation/v1/async_events/{userID} +func GetAsyncEvents( + httpReq *http.Request, + fedAPI api.FederationInternalAPI, + userID gomatrixserverlib.UserID, +) util.JSONResponse { + var response api.QueryAsyncTransactionsResponse + err := fedAPI.QueryAsyncTransactions(httpReq.Context(), &api.QueryAsyncTransactionsRequest{UserID: userID}, &response) + if err != nil { + return util.JSONResponse{ + Code: http.StatusInternalServerError, + } + } + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: AsyncEventsResponse{ + Transaction: gomatrixserverlib.Transaction{}, + Remaining: response.RemainingCount, + }, + } +} diff --git a/federationapi/routing/asyncevents_test.go b/federationapi/routing/asyncevents_test.go new file mode 100644 index 000000000..b9258e210 --- /dev/null +++ b/federationapi/routing/asyncevents_test.go @@ -0,0 +1,46 @@ +package routing_test + +import ( + "context" + "net/http" + "testing" + + "github.com/matrix-org/dendrite/federationapi/internal" + "github.com/matrix-org/dendrite/federationapi/routing" + "github.com/matrix-org/dendrite/federationapi/storage/shared" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/gomatrixserverlib" + "github.com/stretchr/testify/assert" +) + +func TestGetAsyncEmptyDatabaseReturnsNothing(t *testing.T) { + testDB := createDatabase() + db := shared.Database{ + Writer: sqlutil.NewDummyWriter(), + FederationQueueTransactions: testDB, + FederationTransactionJSON: testDB, + } + httpReq := &http.Request{} + userID, err := gomatrixserverlib.NewUserID("@local:domain", false) + if err != nil { + t.Fatalf("Invalid userID: %s", err.Error()) + } + transaction := createTransaction() + + _, err = db.StoreAsyncTransaction(context.Background(), transaction) + if err != nil { + t.Fatalf("Failed to store transaction: %s", err.Error()) + } + + fedAPI := internal.NewFederationInternalAPI( + &db, &config.FederationAPI{}, nil, nil, nil, nil, nil, nil, + ) + + response := routing.GetAsyncEvents(httpReq, fedAPI, *userID) + assert.Equal(t, response.Code, http.StatusOK) + + jsonResponse := response.JSON.(routing.AsyncEventsResponse) + assert.Equal(t, jsonResponse.Remaining, uint32(0)) + assert.Equal(t, jsonResponse.Transaction, gomatrixserverlib.Transaction{}) +} diff --git a/federationapi/routing/forwardasync_test.go b/federationapi/routing/forwardasync_test.go index 4b330b37f..fe3df24ab 100644 --- a/federationapi/routing/forwardasync_test.go +++ b/federationapi/routing/forwardasync_test.go @@ -106,7 +106,7 @@ func (d *testDatabase) SelectTransactionJSON(ctx context.Context, txn *sql.Tx, j return result, nil } -func createFederationRequest(userID gomatrixserverlib.UserID) (gomatrixserverlib.Transaction, gomatrixserverlib.FederationRequest) { +func createTransaction() gomatrixserverlib.Transaction { txn := gomatrixserverlib.Transaction{} txn.PDUs = []json.RawMessage{ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":5,"event_id":"$gl2T9l3qm0kUbiIJ:kaer.morhen","hashes":{"sha256":"Qx3nRMHLDPSL5hBAzuX84FiSSP0K0Kju2iFoBWH4Za8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$UKNe10XzYzG0TeA9:kaer.morhen",{"sha256":"KtSRyMjt0ZSjsv2koixTRCxIRCGoOp6QrKscsW97XRo"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sqDgv3EG7ml5VREzmT9aZeBpS4gAPNIaIeJOwqjDhY0GPU/BcpX5wY4R7hYLrNe5cChgV+eFy/GWm1Zfg5FfDg"}},"type":"m.room.message"}`), @@ -114,6 +114,11 @@ func createFederationRequest(userID gomatrixserverlib.UserID) (gomatrixserverlib txn.Origin = testOrigin txn.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano())) txn.Destination = testDestination + return txn +} + +func createFederationRequest(userID gomatrixserverlib.UserID) (gomatrixserverlib.Transaction, gomatrixserverlib.FederationRequest) { + txn := createTransaction() var federationPathPrefixV1 = "/_matrix/federation/v1" path := federationPathPrefixV1 + "/forward_async/" + string(txn.TransactionID) + "/" + userID.Raw() request := gomatrixserverlib.NewFederationRequest("PUT", txn.Destination, path)