Merge branch 'master' into neilalexander/natsrsinput

This commit is contained in:
Neil Alexander 2021-11-08 09:53:18 +00:00
commit 4b3ae031af
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 46 additions and 13 deletions

View file

@ -148,8 +148,8 @@ type inputWorker struct {
input *sendFIFOQueue input *sendFIFOQueue
} }
var txnIDs sync.Map // transaction ID -> chan struct{} var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse
var inputWorkers sync.Map // room ID -> *inputWorker var inputWorkers sync.Map // room ID -> *inputWorker
// Send implements /_matrix/federation/v1/send/{txnID} // Send implements /_matrix/federation/v1/send/{txnID}
func Send( func Send(
@ -165,19 +165,36 @@ func Send(
mu *internal.MutexByRoom, mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider, servers federationAPI.ServersInRoomProvider,
) util.JSONResponse { ) util.JSONResponse {
index := string(request.Origin()) + string(txnID) // First we should check if this origin has already submitted this
if v, ok := txnIDs.Load(index); ok { // txn ID to us. If they have and the txnIDs map contains an entry,
// the transaction is still being worked on. The new client can wait
// for it to complete rather than creating more work.
index := string(request.Origin()) + "\000" + string(txnID)
v, ok := inFlightTxnsPerOrigin.LoadOrStore(index, make(chan util.JSONResponse, 1))
ch := v.(chan util.JSONResponse)
if ok {
// This origin already submitted this txn ID to us, and the work
// is still taking place, so we'll just wait for it to finish.
ctx, cancel := context.WithTimeout(httpReq.Context(), time.Minute*5)
defer cancel()
select { select {
case <-httpReq.Context().Done(): case <-ctx.Done():
// If the caller gives up then return straight away. We don't
// want to attempt to process what they sent us any further.
return util.JSONResponse{Code: http.StatusRequestTimeout} return util.JSONResponse{Code: http.StatusRequestTimeout}
case response := <-v.(chan util.JSONResponse): case res := <-ch:
return response // The original task just finished processing so let's return
// the result of it.
if res.Code == 0 {
return util.JSONResponse{Code: http.StatusAccepted}
}
return res
} }
} }
ch := make(chan util.JSONResponse, 1) // Otherwise, store that we're currently working on this txn from
txnIDs.Store(index, ch) // this origin. When we're done processing, close the channel.
defer close(ch) defer close(ch)
defer txnIDs.Delete(index) defer inFlightTxnsPerOrigin.Delete(index)
t := txnReq{ t := txnReq{
rsAPI: rsAPI, rsAPI: rsAPI,

2
go.mod
View file

@ -39,7 +39,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20211104103430-62945aeee964 github.com/matrix-org/gomatrixserverlib v0.0.0-20211108094458-ed7020868a9a
github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.8 github.com/mattn/go-sqlite3 v1.14.8

4
go.sum
View file

@ -988,8 +988,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211104103430-62945aeee964 h1:YW/wrSyzxB5G0snH6IlJ5u1/HjqtichZj9qem2hqraE= github.com/matrix-org/gomatrixserverlib v0.0.0-20211108094458-ed7020868a9a h1:iBNcIIYr0WYl7PTuin9IXFSmrFGUAjUUyw14p29bkxw=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211104103430-62945aeee964/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc= github.com/matrix-org/gomatrixserverlib v0.0.0-20211108094458-ed7020868a9a/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc=
github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 h1:6JkIymZ1vxfI0shSpg6gNPTJaF4/95Evy34slPVZGKM= github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 h1:6JkIymZ1vxfI0shSpg6gNPTJaF4/95Evy34slPVZGKM=
github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk= github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -571,3 +571,19 @@ A prev_batch token from incremental sync can be used in the v1 messages API
Inbound federation rejects invites which are not signed by the sender Inbound federation rejects invites which are not signed by the sender
Invited user can reject invite over federation several times Invited user can reject invite over federation several times
Test that we can be reinvited to a room we created Test that we can be reinvited to a room we created
User can create and send/receive messages in a room with version 8
local user can join room with version 8
User can invite local user to room with version 8
remote user can join room with version 8
User can invite remote user to room with version 8
Remote user can backfill in a room with version 8
Can reject invites over federation for rooms with version 8
Can receive redactions from regular users over federation in room version 8
User can create and send/receive messages in a room with version 9
local user can join room with version 9
User can invite local user to room with version 9
remote user can join room with version 9
User can invite remote user to room with version 9
Remote user can backfill in a room with version 9
Can reject invites over federation for rooms with version 9
Can receive redactions from regular users over federation in room version 9