diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 7c4d0b5a7..4b5f0d660 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -148,8 +148,8 @@ type inputWorker struct { input *sendFIFOQueue } -var txnIDs sync.Map // transaction ID -> chan struct{} -var inputWorkers sync.Map // room ID -> *inputWorker +var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse +var inputWorkers sync.Map // room ID -> *inputWorker // Send implements /_matrix/federation/v1/send/{txnID} func Send( @@ -165,19 +165,36 @@ func Send( mu *internal.MutexByRoom, servers federationAPI.ServersInRoomProvider, ) util.JSONResponse { - index := string(request.Origin()) + string(txnID) - if v, ok := txnIDs.Load(index); ok { + // First we should check if this origin has already submitted this + // txn ID to us. If they have and the txnIDs map contains an entry, + // the transaction is still being worked on. The new client can wait + // for it to complete rather than creating more work. + index := string(request.Origin()) + "\000" + string(txnID) + v, ok := inFlightTxnsPerOrigin.LoadOrStore(index, make(chan util.JSONResponse, 1)) + ch := v.(chan util.JSONResponse) + if ok { + // This origin already submitted this txn ID to us, and the work + // is still taking place, so we'll just wait for it to finish. + ctx, cancel := context.WithTimeout(httpReq.Context(), time.Minute*5) + defer cancel() select { - case <-httpReq.Context().Done(): + case <-ctx.Done(): + // If the caller gives up then return straight away. We don't + // want to attempt to process what they sent us any further. return util.JSONResponse{Code: http.StatusRequestTimeout} - case response := <-v.(chan util.JSONResponse): - return response + case res := <-ch: + // The original task just finished processing so let's return + // the result of it. + if res.Code == 0 { + return util.JSONResponse{Code: http.StatusAccepted} + } + return res } } - ch := make(chan util.JSONResponse, 1) - txnIDs.Store(index, ch) + // Otherwise, store that we're currently working on this txn from + // this origin. When we're done processing, close the channel. defer close(ch) - defer txnIDs.Delete(index) + defer inFlightTxnsPerOrigin.Delete(index) t := txnReq{ rsAPI: rsAPI, diff --git a/go.mod b/go.mod index 4ec7ec4f1..65049ac50 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20211104103430-62945aeee964 + github.com/matrix-org/gomatrixserverlib v0.0.0-20211108094458-ed7020868a9a github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.8 diff --git a/go.sum b/go.sum index 2016ab16b..c64ba823a 100644 --- a/go.sum +++ b/go.sum @@ -988,8 +988,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20211104103430-62945aeee964 h1:YW/wrSyzxB5G0snH6IlJ5u1/HjqtichZj9qem2hqraE= -github.com/matrix-org/gomatrixserverlib v0.0.0-20211104103430-62945aeee964/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc= +github.com/matrix-org/gomatrixserverlib v0.0.0-20211108094458-ed7020868a9a h1:iBNcIIYr0WYl7PTuin9IXFSmrFGUAjUUyw14p29bkxw= +github.com/matrix-org/gomatrixserverlib v0.0.0-20211108094458-ed7020868a9a/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc= github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 h1:6JkIymZ1vxfI0shSpg6gNPTJaF4/95Evy34slPVZGKM= github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= diff --git a/sytest-whitelist b/sytest-whitelist index afe8a7e0d..d074d42b4 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -571,3 +571,19 @@ A prev_batch token from incremental sync can be used in the v1 messages API Inbound federation rejects invites which are not signed by the sender Invited user can reject invite over federation several times Test that we can be reinvited to a room we created +User can create and send/receive messages in a room with version 8 +local user can join room with version 8 +User can invite local user to room with version 8 +remote user can join room with version 8 +User can invite remote user to room with version 8 +Remote user can backfill in a room with version 8 +Can reject invites over federation for rooms with version 8 +Can receive redactions from regular users over federation in room version 8 +User can create and send/receive messages in a room with version 9 +local user can join room with version 9 +User can invite local user to room with version 9 +remote user can join room with version 9 +User can invite remote user to room with version 9 +Remote user can backfill in a room with version 9 +Can reject invites over federation for rooms with version 9 +Can receive redactions from regular users over federation in room version 9