Try to resume transaction re-sends

This commit is contained in:
Neil Alexander 2021-11-03 17:27:18 +00:00
parent 134ec18614
commit 5b67f47547
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -148,6 +148,7 @@ type inputWorker struct {
input *sendFIFOQueue input *sendFIFOQueue
} }
var txnIDs sync.Map // transaction ID -> chan struct{}
var inputWorkers sync.Map // room ID -> *inputWorker var inputWorkers sync.Map // room ID -> *inputWorker
// Send implements /_matrix/federation/v1/send/{txnID} // Send implements /_matrix/federation/v1/send/{txnID}
@ -164,6 +165,19 @@ func Send(
mu *internal.MutexByRoom, mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider, servers federationAPI.ServersInRoomProvider,
) util.JSONResponse { ) util.JSONResponse {
index := string(request.Origin()) + string(txnID)
if v, ok := txnIDs.Load(index); ok {
select {
case <-httpReq.Context().Done():
return util.JSONResponse{Code: http.StatusRequestTimeout}
case response := <-v.(chan util.JSONResponse):
return response
}
}
ch := make(chan util.JSONResponse, 1)
txnIDs.Store(index, ch)
defer close(ch)
t := txnReq{ t := txnReq{
rsAPI: rsAPI, rsAPI: rsAPI,
eduAPI: eduAPI, eduAPI: eduAPI,
@ -205,7 +219,7 @@ func Send(
util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
resp, jsonErr := t.processTransaction(httpReq.Context()) resp, jsonErr := t.processTransaction(context.Background())
if jsonErr != nil { if jsonErr != nil {
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
return *jsonErr return *jsonErr
@ -215,10 +229,12 @@ func Send(
// Status code 200: // Status code 200:
// The result of processing the transaction. The server is to use this response // The result of processing the transaction. The server is to use this response
// even in the event of one or more PDUs failing to be processed. // even in the event of one or more PDUs failing to be processed.
return util.JSONResponse{ res := util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: resp, JSON: resp,
} }
ch <- res
return res
} }
type txnReq struct { type txnReq struct {