diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 3cae837c9..446be8c9c 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -148,6 +148,7 @@ type inputWorker struct { input *sendFIFOQueue } +var txnIDs sync.Map // transaction ID -> chan struct{} var inputWorkers sync.Map // room ID -> *inputWorker // Send implements /_matrix/federation/v1/send/{txnID} @@ -164,6 +165,19 @@ func Send( mu *internal.MutexByRoom, servers federationAPI.ServersInRoomProvider, ) util.JSONResponse { + index := string(request.Origin()) + string(txnID) + if v, ok := txnIDs.Load(index); ok { + select { + case <-httpReq.Context().Done(): + return util.JSONResponse{Code: http.StatusRequestTimeout} + case response := <-v.(chan util.JSONResponse): + return response + } + } + ch := make(chan util.JSONResponse, 1) + txnIDs.Store(index, ch) + defer close(ch) + t := txnReq{ rsAPI: rsAPI, eduAPI: eduAPI, @@ -205,7 +219,7 @@ func Send( util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) - resp, jsonErr := t.processTransaction(httpReq.Context()) + resp, jsonErr := t.processTransaction(context.Background()) if jsonErr != nil { util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") return *jsonErr @@ -215,10 +229,12 @@ func Send( // Status code 200: // The result of processing the transaction. The server is to use this response // even in the event of one or more PDUs failing to be processed. - return util.JSONResponse{ + res := util.JSONResponse{ Code: http.StatusOK, JSON: resp, } + ch <- res + return res } type txnReq struct {