From e7f70501634cd67b8101b8410146af47ab6fcd22 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 4 Nov 2021 16:28:20 +0000 Subject: [PATCH] Review comments --- federationapi/routing/send.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index b0cf361ad..2b272a92c 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -148,8 +148,8 @@ type inputWorker struct { input *sendFIFOQueue } -var txnIDs sync.Map // transaction ID -> chan util.JSONResponse -var inputWorkers sync.Map // room ID -> *inputWorker +var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse +var inputWorkers sync.Map // room ID -> *inputWorker // Send implements /_matrix/federation/v1/send/{txnID} func Send( @@ -169,14 +169,21 @@ func Send( // txn ID to us. If they have and the txnIDs map contains an entry, // the transaction is still being worked on. The new client can wait // for it to complete rather than creating more work. - index := string(request.Origin()) + string(txnID) - if v, ok := txnIDs.Load(index); ok { + index := string(request.Origin()) + "\000" + string(txnID) + ch, ok := inFlightTxnsPerOrigin.LoadOrStore(index, make(chan util.JSONResponse, 1)) + if ok { // This origin already submitted this txn ID to us, and the work // is still taking place, so we'll just wait for it to finish. + ctx, cancel := context.WithTimeout(httpReq.Context(), time.Minute*5) + defer cancel() select { - case <-httpReq.Context().Done(): + case <-ctx.Done(): + // If the caller gives up then return straight away. We don't + // want to attempt to process what they sent us any further. return util.JSONResponse{Code: http.StatusRequestTimeout} - case res := <-v.(chan util.JSONResponse): + case res := <-ch.(chan util.JSONResponse): + // The original task just finished processing so let's return + // the result of it. if res.Code == 0 { return util.JSONResponse{Code: http.StatusAccepted} } @@ -185,10 +192,8 @@ func Send( } // Otherwise, store that we're currently working on this txn from // this origin. When we're done processing, close the channel. - ch := make(chan util.JSONResponse, 1) - txnIDs.Store(index, ch) - defer close(ch) - defer txnIDs.Delete(index) + defer close(ch.(chan util.JSONResponse)) + defer inFlightTxnsPerOrigin.Delete(index) t := txnReq{ rsAPI: rsAPI,