Merge branch 'master' of https://github.com/matrix-org/dendrite into add-presence

This commit is contained in:
Till Faelligen 2021-11-12 19:27:21 +01:00
commit a9c6b405f7
4 changed files with 56 additions and 6 deletions

View file

@ -150,6 +150,7 @@ type inputWorker struct {
input *sendFIFOQueue
}
var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse
var inputWorkers sync.Map // room ID -> *inputWorker
// Send implements /_matrix/federation/v1/send/{txnID}
@ -167,6 +168,37 @@ func Send(
mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider,
) util.JSONResponse {
// First we should check if this origin has already submitted this
// txn ID to us. If they have and the txnIDs map contains an entry,
// the transaction is still being worked on. The new client can wait
// for it to complete rather than creating more work.
index := string(request.Origin()) + "\000" + string(txnID)
v, ok := inFlightTxnsPerOrigin.LoadOrStore(index, make(chan util.JSONResponse, 1))
ch := v.(chan util.JSONResponse)
if ok {
// This origin already submitted this txn ID to us, and the work
// is still taking place, so we'll just wait for it to finish.
ctx, cancel := context.WithTimeout(httpReq.Context(), time.Minute*5)
defer cancel()
select {
case <-ctx.Done():
// If the caller gives up then return straight away. We don't
// want to attempt to process what they sent us any further.
return util.JSONResponse{Code: http.StatusRequestTimeout}
case res := <-ch:
// The original task just finished processing so let's return
// the result of it.
if res.Code == 0 {
return util.JSONResponse{Code: http.StatusAccepted}
}
return res
}
}
// Otherwise, store that we're currently working on this txn from
// this origin. When we're done processing, close the channel.
defer close(ch)
defer inFlightTxnsPerOrigin.Delete(index)
t := txnReq{
rsAPI: rsAPI,
eduAPI: eduAPI,
@ -210,7 +242,7 @@ func Send(
util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
resp, jsonErr := t.processTransaction(httpReq.Context())
resp, jsonErr := t.processTransaction(context.Background())
if jsonErr != nil {
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
return *jsonErr
@ -220,10 +252,12 @@ func Send(
// Status code 200:
// The result of processing the transaction. The server is to use this response
// even in the event of one or more PDUs failing to be processed.
return util.JSONResponse{
res := util.JSONResponse{
Code: http.StatusOK,
JSON: resp,
}
ch <- res
return res
}
type txnReq struct {

2
go.mod
View file

@ -31,7 +31,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20211104103430-62945aeee964
github.com/matrix-org/gomatrixserverlib v0.0.0-20211112151542-af2616bf4c80
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0
github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4

4
go.sum
View file

@ -993,8 +993,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211104103430-62945aeee964 h1:YW/wrSyzxB5G0snH6IlJ5u1/HjqtichZj9qem2hqraE=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211104103430-62945aeee964/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211112151542-af2616bf4c80 h1:8Fm/nsRuWsdCWNMfAji/Pi6ynUOch9eLkN15cqXOFx0=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211112151542-af2616bf4c80/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc=
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 h1:HZCzy4oVzz55e+cOMiX/JtSF2UOY1evBl2raaE7ACcU=
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 h1:6JkIymZ1vxfI0shSpg6gNPTJaF4/95Evy34slPVZGKM=

View file

@ -579,3 +579,19 @@ A prev_batch token from incremental sync can be used in the v1 messages API
Inbound federation rejects invites which are not signed by the sender
Invited user can reject invite over federation several times
Test that we can be reinvited to a room we created
User can create and send/receive messages in a room with version 8
local user can join room with version 8
User can invite local user to room with version 8
remote user can join room with version 8
User can invite remote user to room with version 8
Remote user can backfill in a room with version 8
Can reject invites over federation for rooms with version 8
Can receive redactions from regular users over federation in room version 8
User can create and send/receive messages in a room with version 9
local user can join room with version 9
User can invite local user to room with version 9
remote user can join room with version 9
User can invite remote user to room with version 9
Remote user can backfill in a room with version 9
Can reject invites over federation for rooms with version 9
Can receive redactions from regular users over federation in room version 9