diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 3cae837c9..b0cf361ad 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -148,6 +148,7 @@ type inputWorker struct { input *sendFIFOQueue } +var txnIDs sync.Map // transaction ID -> chan util.JSONResponse var inputWorkers sync.Map // room ID -> *inputWorker // Send implements /_matrix/federation/v1/send/{txnID} @@ -164,6 +165,31 @@ func Send( mu *internal.MutexByRoom, servers federationAPI.ServersInRoomProvider, ) util.JSONResponse { + // First we should check if this origin has already submitted this + // txn ID to us. If they have and the txnIDs map contains an entry, + // the transaction is still being worked on. The new client can wait + // for it to complete rather than creating more work. + index := string(request.Origin()) + string(txnID) + if v, ok := txnIDs.Load(index); ok { + // This origin already submitted this txn ID to us, and the work + // is still taking place, so we'll just wait for it to finish. + select { + case <-httpReq.Context().Done(): + return util.JSONResponse{Code: http.StatusRequestTimeout} + case res := <-v.(chan util.JSONResponse): + if res.Code == 0 { + return util.JSONResponse{Code: http.StatusAccepted} + } + return res + } + } + // Otherwise, store that we're currently working on this txn from + // this origin. When we're done processing, close the channel. + ch := make(chan util.JSONResponse, 1) + txnIDs.Store(index, ch) + defer close(ch) + defer txnIDs.Delete(index) + t := txnReq{ rsAPI: rsAPI, eduAPI: eduAPI, @@ -205,7 +231,7 @@ func Send( util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) - resp, jsonErr := t.processTransaction(httpReq.Context()) + resp, jsonErr := t.processTransaction(context.Background()) if jsonErr != nil { util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") return *jsonErr @@ -215,10 +241,12 @@ func Send( // Status code 200: // The result of processing the transaction. The server is to use this response // even in the event of one or more PDUs failing to be processed. - return util.JSONResponse{ + res := util.JSONResponse{ Code: http.StatusOK, JSON: resp, } + ch <- res + return res } type txnReq struct {