diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index 2a70f7ed3..49c537553 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -2,6 +2,7 @@ package internal import ( "context" + "sync" "time" "github.com/matrix-org/dendrite/federationsender/api" @@ -23,6 +24,7 @@ type FederationSenderInternalAPI struct { federation *gomatrixserverlib.FederationClient keyRing *gomatrixserverlib.KeyRing queues *queue.OutgoingQueues + joins sync.Map // joins currently in progress } func NewFederationSenderInternalAPI( diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index f9d07e887..6aea296bd 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -37,12 +37,32 @@ func (r *FederationSenderInternalAPI) PerformDirectoryLookup( return nil } +type federatedJoin struct { + UserID string + RoomID string +} + // PerformJoinRequest implements api.FederationSenderInternalAPI func (r *FederationSenderInternalAPI) PerformJoin( ctx context.Context, request *api.PerformJoinRequest, response *api.PerformJoinResponse, ) { + // Check that a join isn't already in progress for this user/room. + j := federatedJoin{request.UserID, request.RoomID} + if _, found := r.joins.Load(j); found { + response.LastError = &gomatrix.HTTPError{ + Code: 429, + Message: `{ + "errcode": "M_LIMIT_EXCEEDED", + "error": "There is already a federated join to this room in progress. Please wait for it to finish." + }`, // TODO: Why do none of our error types play nicely with each other? + } + return + } + r.joins.Store(j, nil) + defer r.joins.Delete(j) + // Look up the supported room versions. var supportedVersions []gomatrixserverlib.RoomVersion for version := range version.SupportedRoomVersions() {