Process federated joins in background context (#1434)
* Return early from federated room join * Synchronous perform-join as long as possible * Don't allow multiple federated joins to the same room by the same user
This commit is contained in:
parent
45de9dc1c0
commit
a7563ede3d
|
@ -2,6 +2,7 @@ package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/api"
|
"github.com/matrix-org/dendrite/federationsender/api"
|
||||||
|
@ -23,6 +24,7 @@ type FederationSenderInternalAPI struct {
|
||||||
federation *gomatrixserverlib.FederationClient
|
federation *gomatrixserverlib.FederationClient
|
||||||
keyRing *gomatrixserverlib.KeyRing
|
keyRing *gomatrixserverlib.KeyRing
|
||||||
queues *queue.OutgoingQueues
|
queues *queue.OutgoingQueues
|
||||||
|
joins sync.Map // joins currently in progress
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFederationSenderInternalAPI(
|
func NewFederationSenderInternalAPI(
|
||||||
|
|
|
@ -37,12 +37,32 @@ func (r *FederationSenderInternalAPI) PerformDirectoryLookup(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type federatedJoin struct {
|
||||||
|
UserID string
|
||||||
|
RoomID string
|
||||||
|
}
|
||||||
|
|
||||||
// PerformJoinRequest implements api.FederationSenderInternalAPI
|
// PerformJoinRequest implements api.FederationSenderInternalAPI
|
||||||
func (r *FederationSenderInternalAPI) PerformJoin(
|
func (r *FederationSenderInternalAPI) PerformJoin(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.PerformJoinRequest,
|
request *api.PerformJoinRequest,
|
||||||
response *api.PerformJoinResponse,
|
response *api.PerformJoinResponse,
|
||||||
) {
|
) {
|
||||||
|
// Check that a join isn't already in progress for this user/room.
|
||||||
|
j := federatedJoin{request.UserID, request.RoomID}
|
||||||
|
if _, found := r.joins.Load(j); found {
|
||||||
|
response.LastError = &gomatrix.HTTPError{
|
||||||
|
Code: 429,
|
||||||
|
Message: `{
|
||||||
|
"errcode": "M_LIMIT_EXCEEDED",
|
||||||
|
"error": "There is already a federated join to this room in progress. Please wait for it to finish."
|
||||||
|
}`, // TODO: Why do none of our error types play nicely with each other?
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.joins.Store(j, nil)
|
||||||
|
defer r.joins.Delete(j)
|
||||||
|
|
||||||
// Look up the supported room versions.
|
// Look up the supported room versions.
|
||||||
var supportedVersions []gomatrixserverlib.RoomVersion
|
var supportedVersions []gomatrixserverlib.RoomVersion
|
||||||
for version := range version.SupportedRoomVersions() {
|
for version := range version.SupportedRoomVersions() {
|
||||||
|
@ -186,27 +206,47 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
|
||||||
}
|
}
|
||||||
r.statistics.ForServer(serverName).Success()
|
r.statistics.ForServer(serverName).Success()
|
||||||
|
|
||||||
// Check that the send_join response was valid.
|
// Process the join response in a goroutine. The idea here is
|
||||||
joinCtx := perform.JoinContext(r.federation, r.keyRing)
|
// that we'll try and wait for as long as possible for the work
|
||||||
respState, err := joinCtx.CheckSendJoinResponse(
|
// to complete, but if the client does give up waiting, we'll
|
||||||
ctx, event, serverName, respMakeJoin, respSendJoin,
|
// still continue to process the join anyway so that we don't
|
||||||
)
|
// waste the effort.
|
||||||
if err != nil {
|
var cancel context.CancelFunc
|
||||||
return fmt.Errorf("joinCtx.CheckSendJoinResponse: %w", err)
|
ctx, cancel = context.WithCancel(context.Background())
|
||||||
}
|
go func() {
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
// If we successfully performed a send_join above then the other
|
// Check that the send_join response was valid.
|
||||||
// server now thinks we're a part of the room. Send the newly
|
joinCtx := perform.JoinContext(r.federation, r.keyRing)
|
||||||
// returned state to the roomserver to update our local view.
|
respState, err := joinCtx.CheckSendJoinResponse(
|
||||||
if err = roomserverAPI.SendEventWithRewrite(
|
ctx, event, serverName, respMakeJoin, respSendJoin,
|
||||||
ctx, r.rsAPI,
|
)
|
||||||
respState,
|
if err != nil {
|
||||||
event.Headered(respMakeJoin.RoomVersion),
|
logrus.WithFields(logrus.Fields{
|
||||||
nil,
|
"room_id": roomID,
|
||||||
); err != nil {
|
"user_id": userID,
|
||||||
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
|
}).WithError(err).Error("Failed to process room join response")
|
||||||
}
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we successfully performed a send_join above then the other
|
||||||
|
// server now thinks we're a part of the room. Send the newly
|
||||||
|
// returned state to the roomserver to update our local view.
|
||||||
|
if err = roomserverAPI.SendEventWithRewrite(
|
||||||
|
ctx, r.rsAPI,
|
||||||
|
respState,
|
||||||
|
event.Headered(respMakeJoin.RoomVersion),
|
||||||
|
nil,
|
||||||
|
); err != nil {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"room_id": roomID,
|
||||||
|
"user_id": userID,
|
||||||
|
}).WithError(err).Error("Failed to send room join response to roomserver")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue