reimplement SS /peek to prod the RS to tell the FS about the peek

This commit is contained in:
Matthew Hodgson 2020-09-12 22:45:00 +01:00
parent 803647be56
commit 0ae0d11446
12 changed files with 140 additions and 84 deletions

View file

@ -15,7 +15,6 @@
package routing package routing
import ( import (
"context"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
@ -25,7 +24,7 @@ import (
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
// Peek implements the /peek API // Peek implements the SS /peek API
func Peek( func Peek(
httpReq *http.Request, httpReq *http.Request,
request *gomatrixserverlib.FederationRequest, request *gomatrixserverlib.FederationRequest,
@ -34,6 +33,8 @@ func Peek(
roomID, peekID string, roomID, peekID string,
remoteVersions []gomatrixserverlib.RoomVersion, remoteVersions []gomatrixserverlib.RoomVersion,
) util.JSONResponse { ) util.JSONResponse {
// TODO: check if we're just refreshing an existing peek. Somehow.
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{} verRes := api.QueryRoomVersionForRoomResponse{}
if err := rsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { if err := rsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
@ -43,7 +44,7 @@ func Peek(
} }
} }
// Check that the room that the remote side is trying to join is actually // Check that the room that the peeking server is trying to peek is actually
// one of the room versions that they listed in their supported ?ver= in // one of the room versions that they listed in their supported ?ver= in
// the peek URL. // the peek URL.
remoteSupportsVersion := false remoteSupportsVersion := false
@ -53,7 +54,7 @@ func Peek(
break break
} }
} }
// If it isn't, stop trying to join the room. // If it isn't, stop trying to peek the room.
if !remoteSupportsVersion { if !remoteSupportsVersion {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
@ -63,50 +64,33 @@ func Peek(
// TODO: Check history visibility // TODO: Check history visibility
state, err := getCurrentState(httpReq.Context(), request, rsAPI, roomID) var response api.PerformHandleRemotePeekResponse
if err != nil { err := rsAPI.PerformHandleRemotePeek(
return *err httpReq.Context(),
} &api.PerformHandleRemotePeekRequest{
return util.JSONResponse{
Code: http.StatusOK,
JSON: map[string]interface{}{
"state": state,
"room_version": verRes.RoomVersion,
},
}
}
func getCurrentState(
ctx context.Context,
request *gomatrixserverlib.FederationRequest,
rsAPI api.RoomserverInternalAPI,
roomID string,
) (*gomatrixserverlib.RespPeek, *util.JSONResponse) {
var response api.QueryStateAndAuthChainResponse
err := rsAPI.QueryStateAndAuthChain(
ctx,
&api.QueryStateAndAuthChainRequest{
RoomID: roomID, RoomID: roomID,
PrevEventIDs: []string{}, PeekID: peekID,
AuthEventIDs: []string{}, ServerName: request.Origin(),
}, },
&response, &response,
) )
if err != nil { if err != nil {
resErr := util.ErrorResponse(err) resErr := util.ErrorResponse(err)
return nil, &resErr return resErr
} }
if !response.RoomExists { if !response.RoomExists {
return nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil} return util.JSONResponse{Code: http.StatusNotFound, JSON: nil}
} }
return &gomatrixserverlib.RespPeek{ return util.JSONResponse{
StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), Code: http.StatusOK,
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), JSON: gomatrixserverlib.RespPeek{
RoomVersion: response.RoomVersion, StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents),
RenewalInterval: 60 * 60 * 1000 * 1000, // one hour AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents),
}, nil RoomVersion: response.RoomVersion,
RenewalInterval: 60 * 60 * 1000 * 1000, // one hour
},
}
} }

View file

@ -97,6 +97,14 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}).Panicf("roomserver output log: write room event failure") }).Panicf("roomserver output log: write room event failure")
return nil return nil
} }
case api.OutputTypeNewRemotePeek:
if err := s.processRemotePeek(*output.NewRemotePeek); err != nil {
log.WithFields(log.Fields{
"event": output.NewRemotePeek,
log.ErrorKey: err,
}).Panicf("roomserver output log: remote peek event failure")
return nil
}
default: default:
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type", "roomserver output log: ignoring unknown output type",
@ -107,6 +115,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processRemotePeek(orp api.OutputNewRemotePeek) error {
return nil
}
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event. // and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
@ -150,6 +164,15 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
return err return err
} }
// TODO: track what hosts are peeking (federationsender_received_peeks)
// TODO: rename federationsender_remote_peeks as federationsender_sent_peeks
// TOOD: add peeking hosts to the joinedHosts list
// TODO: do housekeeping to evict unrenewed peeking hosts
// TODO: implement query to let the fedapi check whether a given peek is live or not
// Send the event. // Send the event.
return s.queues.SendEvent( return s.queues.SendEvent(
&ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, &ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,

View file

@ -48,6 +48,12 @@ type RoomserverInternalAPI interface {
res *PerformPublishResponse, res *PerformPublishResponse,
) )
PerformHandleRemotePeek(
ctx context.Context,
req *PerformHandleRemotePeekRequest,
res *PerformHandleRemotePeekResponse,
) error
QueryPublishedRooms( QueryPublishedRooms(
ctx context.Context, ctx context.Context,
req *QueryPublishedRoomsRequest, req *QueryPublishedRoomsRequest,
@ -181,4 +187,4 @@ type RoomserverInternalAPI interface {
req *RemoveRoomAliasRequest, req *RemoveRoomAliasRequest,
response *RemoveRoomAliasResponse, response *RemoveRoomAliasResponse,
) error ) error
} }

View file

@ -75,6 +75,16 @@ func (t *RoomserverInternalAPITrace) PerformPublish(
util.GetLogger(ctx).Infof("PerformPublish req=%+v res=%+v", js(req), js(res)) util.GetLogger(ctx).Infof("PerformPublish req=%+v res=%+v", js(req), js(res))
} }
func (t *RoomserverInternalAPITrace) PerformHandleRemotePeek(
ctx context.Context,
req *PerformHandleRemotePeekRequest,
res *PerformHandleRemotePeekResponse,
) error {
err := t.Impl.PerformHandleRemotePeek(ctx, req, res)
util.GetLogger(ctx).Infof("PerformHandleRemotePeek req=%+v res=%+v", js(req), js(res))
return err
}
func (t *RoomserverInternalAPITrace) QueryPublishedRooms( func (t *RoomserverInternalAPITrace) QueryPublishedRooms(
ctx context.Context, ctx context.Context,
req *QueryPublishedRoomsRequest, req *QueryPublishedRoomsRequest,

View file

@ -49,6 +49,9 @@ const (
// OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek // OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek
OutputTypeNewPeek OutputType = "new_peek" OutputTypeNewPeek OutputType = "new_peek"
// OutputTypeNewRemotePeek indicates that the kafka event is an OutputNewRemotePeek
OutputTypeNewRemotePeek OutputType = "new_remote_peek"
) )
// An OutputEvent is an entry in the roomserver output kafka log. // An OutputEvent is an entry in the roomserver output kafka log.
@ -66,6 +69,8 @@ type OutputEvent struct {
RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"` RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"`
// The content of event with type OutputTypeNewPeek // The content of event with type OutputTypeNewPeek
NewPeek *OutputNewPeek `json:"new_peek,omitempty"` NewPeek *OutputNewPeek `json:"new_peek,omitempty"`
// The content of event with type OutputTypeNewRemotePeek
NewRemotePeek *OutputNewRemotePeek `json:"new_remote_peek,omitempty"`
} }
// An OutputNewRoomEvent is written when the roomserver receives a new event. // An OutputNewRoomEvent is written when the roomserver receives a new event.
@ -208,3 +213,10 @@ type OutputNewPeek struct {
UserID string UserID string
DeviceID string DeviceID string
} }
// An OutputNewRemotePeek is written whenever a server starts peeking into a room
type OutputNewRemotePeek struct {
RoomID string
PeekID string
ServerName gomatrixserverlib.ServerName
}

View file

@ -159,3 +159,22 @@ type PerformPublishResponse struct {
// If non-nil, the publish request failed. Contains more information why it failed. // If non-nil, the publish request failed. Contains more information why it failed.
Error *PerformError Error *PerformError
} }
type PerformHandleRemotePeekRequest struct {
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
PeekID string `json:"peek_id"`
ServerName gomatrixserverlib.ServerName `json:"server_name"`
}
type PerformHandleRemotePeekResponse struct {
// Does the room exist on this roomserver?
// If the room doesn't exist this will be false and StateEvents will be empty.
RoomExists bool `json:"room_exists"`
// The room version of the room.
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
// The current state and auth chain events.
// The lists will be in an arbitrary order.
StateEvents []gomatrixserverlib.HeaderedEvent `json:"state_events"`
AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"`
}

View file

@ -177,7 +177,7 @@ type QueryStateAndAuthChainRequest struct {
// The room ID to query the state in. // The room ID to query the state in.
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
// The list of prev events for the event. Used to calculate the state at // The list of prev events for the event. Used to calculate the state at
// the event. If empty, assumes current state. // the event.
PrevEventIDs []string `json:"prev_event_ids"` PrevEventIDs []string `json:"prev_event_ids"`
// The list of auth events for the event. Used to calculate the auth chain // The list of auth events for the event. Used to calculate the auth chain
AuthEventIDs []string `json:"auth_event_ids"` AuthEventIDs []string `json:"auth_event_ids"`

View file

@ -23,6 +23,7 @@ type RoomserverInternalAPI struct {
*perform.Inviter *perform.Inviter
*perform.Joiner *perform.Joiner
*perform.Peeker *perform.Peeker
*perform.HandleRemotePeeker
*perform.Leaver *perform.Leaver
*perform.Publisher *perform.Publisher
*perform.Backfiller *perform.Backfiller
@ -91,6 +92,10 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
Inputer: r.Inputer, Inputer: r.Inputer,
} }
r.HandleRemotePeeker = &perform.HandleRemotePeeker{
DB: r.DB,
Inputer: r.Inputer,
}
r.Leaver = &perform.Leaver{ r.Leaver = &perform.Leaver{
Cfg: r.Cfg, Cfg: r.Cfg,
DB: r.DB, DB: r.DB,
@ -137,3 +142,4 @@ func (r *RoomserverInternalAPI) PerformLeave(
} }
return r.WriteOutputEvents(req.RoomID, outputEvents) return r.WriteOutputEvents(req.RoomID, outputEvents)
} }

View file

@ -198,8 +198,6 @@ func (r *Peeker) performPeekRoomByID(
} }
} }
// TODO: handle federated peeks
err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{ err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{
{ {
Type: api.OutputTypeNewPeek, Type: api.OutputTypeNewPeek,

View file

@ -335,38 +335,9 @@ func (r *Queryer) QueryStateAndAuthChain(
response.RoomVersion = info.RoomVersion response.RoomVersion = info.RoomVersion
var stateEvents []gomatrixserverlib.Event var stateEvents []gomatrixserverlib.Event
if len(request.PrevEventIDs) > 0 { stateEvents, err = r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs)
stateEvents, err = r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs) if err != nil {
if err != nil { return err
return err
}
} else {
// no PrevEventIDs or AuthEventIDs were provided, so return current state instead.
// XXX: is this right?
roomState := state.NewStateResolution(r.DB, *info)
// no need to resolve state again later
request.ResolveState = false
var currentStateSnapshotNID types.StateSnapshotNID
_, currentStateSnapshotNID, _, err =
r.DB.LatestEventIDs(ctx, info.RoomNID)
if err != nil {
return err
}
var stateEntries []types.StateEntry
stateEntries, err = roomState.LoadStateAtSnapshot(
ctx, currentStateSnapshotNID,
)
if err != nil {
return err
}
stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries)
if err != nil {
return err
}
} }
response.PrevEventsExist = true response.PrevEventsExist = true
@ -379,7 +350,7 @@ func (r *Queryer) QueryStateAndAuthChain(
} }
authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe
authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs) authEvents, err := GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
if err != nil { if err != nil {
return err return err
} }
@ -428,11 +399,11 @@ func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomIn
type eventsFromIDs func(context.Context, []string) ([]types.Event, error) type eventsFromIDs func(context.Context, []string) ([]types.Event, error)
// getAuthChain fetches the auth chain for the given auth events. An auth chain // GetAuthChain fetches the auth chain for the given auth events. An auth chain
// is the list of all events that are referenced in the auth_events section, and // is the list of all events that are referenced in the auth_events section, and
// all their auth_events, recursively. The returned set of events contain the // all their auth_events, recursively. The returned set of events contain the
// given events. Will *not* error if we don't have all auth events. // given events. Will *not* error if we don't have all auth events.
func getAuthChain( func GetAuthChain(
ctx context.Context, fn eventsFromIDs, authEventIDs []string, ctx context.Context, fn eventsFromIDs, authEventIDs []string,
) ([]gomatrixserverlib.Event, error) { ) ([]gomatrixserverlib.Event, error) {
// List of event IDs to fetch. On each pass, these events will be requested // List of event IDs to fetch. On each pass, these events will be requested

View file

@ -25,12 +25,13 @@ const (
RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents" RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents"
// Perform operations // Perform operations
RoomserverPerformInvitePath = "/roomserver/performInvite" RoomserverPerformInvitePath = "/roomserver/performInvite"
RoomserverPerformPeekPath = "/roomserver/performPeek" RoomserverPerformPeekPath = "/roomserver/performPeek"
RoomserverPerformJoinPath = "/roomserver/performJoin" RoomserverPerformJoinPath = "/roomserver/performJoin"
RoomserverPerformLeavePath = "/roomserver/performLeave" RoomserverPerformLeavePath = "/roomserver/performLeave"
RoomserverPerformBackfillPath = "/roomserver/performBackfill" RoomserverPerformBackfillPath = "/roomserver/performBackfill"
RoomserverPerformPublishPath = "/roomserver/performPublish" RoomserverPerformPublishPath = "/roomserver/performPublish"
RoomserverPerformHandleRemotePeekPath = "/roomserver/performHandleRemotePeek"
// Query operations // Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@ -203,6 +204,19 @@ func (h *httpRoomserverInternalAPI) PerformPeek(
} }
} }
func (h *httpRoomserverInternalAPI) PerformHandleRemotePeek(
ctx context.Context,
request *api.PerformHandleRemotePeekRequest,
response *api.PerformHandleRemotePeekResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformHandleRemotePeek")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverPerformHandleRemotePeekPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// XXX: why do some of these Perform's return errors, and others don't?
func (h *httpRoomserverInternalAPI) PerformLeave( func (h *httpRoomserverInternalAPI) PerformLeave(
ctx context.Context, ctx context.Context,
request *api.PerformLeaveRequest, request *api.PerformLeaveRequest,

View file

@ -74,6 +74,19 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(RoomserverPerformHandleRemotePeekPath,
httputil.MakeInternalAPI("performHandleRemotePeek", func(req *http.Request) util.JSONResponse {
var request api.PerformHandleRemotePeekRequest
var response api.PerformHandleRemotePeekResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := r.PerformHandleRemotePeek(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(RoomserverPerformPublishPath, internalAPIMux.Handle(RoomserverPerformPublishPath,
httputil.MakeInternalAPI("performPublish", func(req *http.Request) util.JSONResponse { httputil.MakeInternalAPI("performPublish", func(req *http.Request) util.JSONResponse {
var request api.PerformPublishRequest var request api.PerformPublishRequest