Review comments, make FS API take list of servernames, dedupe them, break out of loop properly on success

This commit is contained in:
Neil Alexander 2020-05-04 12:52:20 +01:00
parent f5963308e0
commit 56e980782b
4 changed files with 127 additions and 114 deletions

View file

@ -4,6 +4,7 @@ import (
"context"
commonHTTP "github.com/matrix-org/dendrite/common/http"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/opentracing/opentracing-go"
)
@ -45,7 +46,7 @@ func (h *httpFederationSenderInternalAPI) PerformDirectoryLookup(
type PerformJoinRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
ServerName gomatrixserverlib.ServerName `json:"server_name"`
ServerNames types.ServerNames `json:"server_names"`
Content map[string]interface{} `json:"content"`
}

View file

@ -9,6 +9,8 @@ import (
"github.com/matrix-org/dendrite/federationsender/internal/perform"
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
// PerformLeaveRequest implements api.FederationSenderInternalAPI
@ -42,11 +44,18 @@ func (r *FederationSenderInternalAPI) PerformJoin(
supportedVersions = append(supportedVersions, version)
}
// Deduplicate the server names we were provided.
util.Unique(request.ServerNames)
// Try each server that we were provided until we land on one that
// successfully completes the make-join send-join dance.
succeeded := false
for _, serverName := range request.ServerNames {
// Try to perform a make_join using the information supplied in the
// request.
respMakeJoin, err := r.federation.MakeJoin(
ctx,
request.ServerName,
serverName,
request.RoomID,
request.UserID,
supportedVersions,
@ -98,20 +107,22 @@ func (r *FederationSenderInternalAPI) PerformJoin(
// Try to perform a send_join using the newly built event.
respSendJoin, err := r.federation.SendJoin(
ctx,
request.ServerName,
serverName,
event,
respMakeJoin.RoomVersion,
)
if err != nil {
return fmt.Errorf("r.federation.SendJoin: %w", err)
logrus.WithError(err).Warnf("r.federation.SendJoin failed")
continue
}
// Check that the send_join response was valid.
joinCtx := perform.JoinContext(r.federation, r.keyRing)
if err = joinCtx.CheckSendJoinResponse(
ctx, event, request.ServerName, respMakeJoin, respSendJoin,
ctx, event, serverName, respMakeJoin, respSendJoin,
); err != nil {
return fmt.Errorf("perform.JoinRequest.CheckSendJoinResponse: %w", err)
logrus.WithError(err).Warnf("joinCtx.CheckSendJoinResponse failed")
continue
}
// If we successfully performed a send_join above then the other
@ -122,11 +133,25 @@ func (r *FederationSenderInternalAPI) PerformJoin(
respSendJoin.ToRespState(),
event.Headered(respMakeJoin.RoomVersion),
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
logrus.WithError(err).Warnf("r.producer.SendEventWithState failed")
continue
}
// We're all good.
succeeded = true
break
}
if succeeded {
// Everything went to plan.
return nil
} else {
// We didn't complete a join for some reason.
return fmt.Errorf(
"failed to join user %q to room %q through %d server(s)",
request.UserID, request.RoomID, len(request.ServerNames),
)
}
}
// PerformLeaveRequest implements api.FederationSenderInternalAPI

View file

@ -28,6 +28,12 @@ type JoinedHost struct {
ServerName gomatrixserverlib.ServerName
}
type ServerNames []gomatrixserverlib.ServerName
func (s ServerNames) Len() int { return len(s) }
func (s ServerNames) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s ServerNames) Less(i, j int) bool { return s[i] < s[j] }
// A EventIDMismatchError indicates that we have got out of sync with the
// room server.
type EventIDMismatchError struct {

View file

@ -179,35 +179,16 @@ func (r *RoomserverInternalAPI) performJoinRoomByID(
}
// Try joining by all of the supplied server names.
// TODO: Update the FS API so that it accepts a list of server names and
// does this bit by itself.
joined := false
for _, serverName := range req.ServerNames {
// Otherwise, if we've reached this point, the room isn't a local room
// and we should ask the federation sender to try and join for us.
fedReq := fsAPI.PerformJoinRequest{
RoomID: req.RoomIDOrAlias, // the room ID to try and join
UserID: req.UserID, // the user ID joining the room
ServerName: serverName, // the server to try joining with
ServerNames: req.ServerNames, // the server to try joining with
Content: req.Content, // the membership event content
}
fedRes := fsAPI.PerformJoinResponse{}
err = r.fsAPI.PerformJoin(ctx, &fedReq, &fedRes)
if err != nil {
logrus.WithError(err).Errorf("error joining federated room %q", req.RoomIDOrAlias)
continue
}
joined = true
break
}
// If we didn't successfully join the room using any of the supplied
// servers then return an error saying such.
if !joined {
return fmt.Errorf(
"Failed to join %q using %d server(s)",
req.RoomIDOrAlias, len(req.ServerNames),
)
return fmt.Errorf("Error joining federated room: %q", err)
}
default: