From 0ae0d1144660a08a5af5a2fbf857db91b9b4319b Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sat, 12 Sep 2020 22:45:00 +0100 Subject: [PATCH] reimplement SS /peek to prod the RS to tell the FS about the peek --- federationapi/routing/peek.go | 62 ++++++++------------- federationsender/consumers/roomserver.go | 23 ++++++++ roomserver/api/api.go | 8 ++- roomserver/api/api_trace.go | 10 ++++ roomserver/api/output.go | 12 ++++ roomserver/api/perform.go | 19 +++++++ roomserver/api/query.go | 2 +- roomserver/internal/api.go | 6 ++ roomserver/internal/perform/perform_peek.go | 2 - roomserver/internal/query/query.go | 41 ++------------ roomserver/inthttp/client.go | 26 +++++++-- roomserver/inthttp/server.go | 13 +++++ 12 files changed, 140 insertions(+), 84 deletions(-) diff --git a/federationapi/routing/peek.go b/federationapi/routing/peek.go index 12df15448..423f84731 100644 --- a/federationapi/routing/peek.go +++ b/federationapi/routing/peek.go @@ -15,7 +15,6 @@ package routing import ( - "context" "net/http" "github.com/matrix-org/dendrite/clientapi/jsonerror" @@ -25,7 +24,7 @@ import ( "github.com/matrix-org/util" ) -// Peek implements the /peek API +// Peek implements the SS /peek API func Peek( httpReq *http.Request, request *gomatrixserverlib.FederationRequest, @@ -34,6 +33,8 @@ func Peek( roomID, peekID string, remoteVersions []gomatrixserverlib.RoomVersion, ) util.JSONResponse { + // TODO: check if we're just refreshing an existing peek. Somehow. + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verRes := api.QueryRoomVersionForRoomResponse{} if err := rsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { @@ -43,7 +44,7 @@ func Peek( } } - // Check that the room that the remote side is trying to join is actually + // Check that the room that the peeking server is trying to peek is actually // one of the room versions that they listed in their supported ?ver= in // the peek URL. remoteSupportsVersion := false @@ -53,7 +54,7 @@ func Peek( break } } - // If it isn't, stop trying to join the room. + // If it isn't, stop trying to peek the room. if !remoteSupportsVersion { return util.JSONResponse{ Code: http.StatusBadRequest, @@ -63,50 +64,33 @@ func Peek( // TODO: Check history visibility - state, err := getCurrentState(httpReq.Context(), request, rsAPI, roomID) - if err != nil { - return *err - } - - return util.JSONResponse{ - Code: http.StatusOK, - JSON: map[string]interface{}{ - "state": state, - "room_version": verRes.RoomVersion, - }, - } -} - - -func getCurrentState( - ctx context.Context, - request *gomatrixserverlib.FederationRequest, - rsAPI api.RoomserverInternalAPI, - roomID string, -) (*gomatrixserverlib.RespPeek, *util.JSONResponse) { - var response api.QueryStateAndAuthChainResponse - err := rsAPI.QueryStateAndAuthChain( - ctx, - &api.QueryStateAndAuthChainRequest{ + var response api.PerformHandleRemotePeekResponse + err := rsAPI.PerformHandleRemotePeek( + httpReq.Context(), + &api.PerformHandleRemotePeekRequest{ RoomID: roomID, - PrevEventIDs: []string{}, - AuthEventIDs: []string{}, + PeekID: peekID, + ServerName: request.Origin(), }, &response, ) if err != nil { resErr := util.ErrorResponse(err) - return nil, &resErr + return resErr } if !response.RoomExists { - return nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil} + return util.JSONResponse{Code: http.StatusNotFound, JSON: nil} } - return &gomatrixserverlib.RespPeek{ - StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), - AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), - RoomVersion: response.RoomVersion, - RenewalInterval: 60 * 60 * 1000 * 1000, // one hour - }, nil + return util.JSONResponse{ + Code: http.StatusOK, + JSON: gomatrixserverlib.RespPeek{ + StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), + AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), + RoomVersion: response.RoomVersion, + RenewalInterval: 60 * 60 * 1000 * 1000, // one hour + }, + } } + diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index efeb53fa6..1f16621f0 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -97,6 +97,14 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write room event failure") return nil } + case api.OutputTypeNewRemotePeek: + if err := s.processRemotePeek(*output.NewRemotePeek); err != nil { + log.WithFields(log.Fields{ + "event": output.NewRemotePeek, + log.ErrorKey: err, + }).Panicf("roomserver output log: remote peek event failure") + return nil + } default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", @@ -107,6 +115,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } +// processMessage updates the list of currently joined hosts in the room +// and then sends the event to the hosts that were joined before the event. +func (s *OutputRoomEventConsumer) processRemotePeek(orp api.OutputNewRemotePeek) error { + return nil +} + // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { @@ -150,6 +164,15 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err return err } + // TODO: track what hosts are peeking (federationsender_received_peeks) + // TODO: rename federationsender_remote_peeks as federationsender_sent_peeks + + // TOOD: add peeking hosts to the joinedHosts list + + // TODO: do housekeeping to evict unrenewed peeking hosts + + // TODO: implement query to let the fedapi check whether a given peek is live or not + // Send the event. return s.queues.SendEvent( &ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, diff --git a/roomserver/api/api.go b/roomserver/api/api.go index eecefe322..08a269a82 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -48,6 +48,12 @@ type RoomserverInternalAPI interface { res *PerformPublishResponse, ) + PerformHandleRemotePeek( + ctx context.Context, + req *PerformHandleRemotePeekRequest, + res *PerformHandleRemotePeekResponse, + ) error + QueryPublishedRooms( ctx context.Context, req *QueryPublishedRoomsRequest, @@ -181,4 +187,4 @@ type RoomserverInternalAPI interface { req *RemoveRoomAliasRequest, response *RemoveRoomAliasResponse, ) error -} +} \ No newline at end of file diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go index 643309307..4be566d4f 100644 --- a/roomserver/api/api_trace.go +++ b/roomserver/api/api_trace.go @@ -75,6 +75,16 @@ func (t *RoomserverInternalAPITrace) PerformPublish( util.GetLogger(ctx).Infof("PerformPublish req=%+v res=%+v", js(req), js(res)) } +func (t *RoomserverInternalAPITrace) PerformHandleRemotePeek( + ctx context.Context, + req *PerformHandleRemotePeekRequest, + res *PerformHandleRemotePeekResponse, +) error { + err := t.Impl.PerformHandleRemotePeek(ctx, req, res) + util.GetLogger(ctx).Infof("PerformHandleRemotePeek req=%+v res=%+v", js(req), js(res)) + return err +} + func (t *RoomserverInternalAPITrace) QueryPublishedRooms( ctx context.Context, req *QueryPublishedRoomsRequest, diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 013ebdc83..4163cc2d0 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -49,6 +49,9 @@ const ( // OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek OutputTypeNewPeek OutputType = "new_peek" + + // OutputTypeNewRemotePeek indicates that the kafka event is an OutputNewRemotePeek + OutputTypeNewRemotePeek OutputType = "new_remote_peek" ) // An OutputEvent is an entry in the roomserver output kafka log. @@ -66,6 +69,8 @@ type OutputEvent struct { RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"` // The content of event with type OutputTypeNewPeek NewPeek *OutputNewPeek `json:"new_peek,omitempty"` + // The content of event with type OutputTypeNewRemotePeek + NewRemotePeek *OutputNewRemotePeek `json:"new_remote_peek,omitempty"` } // An OutputNewRoomEvent is written when the roomserver receives a new event. @@ -208,3 +213,10 @@ type OutputNewPeek struct { UserID string DeviceID string } + +// An OutputNewRemotePeek is written whenever a server starts peeking into a room +type OutputNewRemotePeek struct { + RoomID string + PeekID string + ServerName gomatrixserverlib.ServerName +} diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index 0c2d96a7d..a8967dac1 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -159,3 +159,22 @@ type PerformPublishResponse struct { // If non-nil, the publish request failed. Contains more information why it failed. Error *PerformError } + +type PerformHandleRemotePeekRequest struct { + UserID string `json:"user_id"` + RoomID string `json:"room_id"` + PeekID string `json:"peek_id"` + ServerName gomatrixserverlib.ServerName `json:"server_name"` +} + +type PerformHandleRemotePeekResponse struct { + // Does the room exist on this roomserver? + // If the room doesn't exist this will be false and StateEvents will be empty. + RoomExists bool `json:"room_exists"` + // The room version of the room. + RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` + // The current state and auth chain events. + // The lists will be in an arbitrary order. + StateEvents []gomatrixserverlib.HeaderedEvent `json:"state_events"` + AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"` +} \ No newline at end of file diff --git a/roomserver/api/query.go b/roomserver/api/query.go index a80fe684c..0af030749 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -177,7 +177,7 @@ type QueryStateAndAuthChainRequest struct { // The room ID to query the state in. RoomID string `json:"room_id"` // The list of prev events for the event. Used to calculate the state at - // the event. If empty, assumes current state. + // the event. PrevEventIDs []string `json:"prev_event_ids"` // The list of auth events for the event. Used to calculate the auth chain AuthEventIDs []string `json:"auth_event_ids"` diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 8dc1a170b..e4f645c1e 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -23,6 +23,7 @@ type RoomserverInternalAPI struct { *perform.Inviter *perform.Joiner *perform.Peeker + *perform.HandleRemotePeeker *perform.Leaver *perform.Publisher *perform.Backfiller @@ -91,6 +92,10 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen FSAPI: r.fsAPI, Inputer: r.Inputer, } + r.HandleRemotePeeker = &perform.HandleRemotePeeker{ + DB: r.DB, + Inputer: r.Inputer, + } r.Leaver = &perform.Leaver{ Cfg: r.Cfg, DB: r.DB, @@ -137,3 +142,4 @@ func (r *RoomserverInternalAPI) PerformLeave( } return r.WriteOutputEvents(req.RoomID, outputEvents) } + diff --git a/roomserver/internal/perform/perform_peek.go b/roomserver/internal/perform/perform_peek.go index 3d0919356..8fdb23bf7 100644 --- a/roomserver/internal/perform/perform_peek.go +++ b/roomserver/internal/perform/perform_peek.go @@ -198,8 +198,6 @@ func (r *Peeker) performPeekRoomByID( } } - // TODO: handle federated peeks - err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{ { Type: api.OutputTypeNewPeek, diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 640210298..4fddcdfa4 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -335,38 +335,9 @@ func (r *Queryer) QueryStateAndAuthChain( response.RoomVersion = info.RoomVersion var stateEvents []gomatrixserverlib.Event - if len(request.PrevEventIDs) > 0 { - stateEvents, err = r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs) - if err != nil { - return err - } - } else { - // no PrevEventIDs or AuthEventIDs were provided, so return current state instead. - - // XXX: is this right? - roomState := state.NewStateResolution(r.DB, *info) - // no need to resolve state again later - request.ResolveState = false - - var currentStateSnapshotNID types.StateSnapshotNID - _, currentStateSnapshotNID, _, err = - r.DB.LatestEventIDs(ctx, info.RoomNID) - if err != nil { - return err - } - - var stateEntries []types.StateEntry - stateEntries, err = roomState.LoadStateAtSnapshot( - ctx, currentStateSnapshotNID, - ) - if err != nil { - return err - } - - stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries) - if err != nil { - return err - } + stateEvents, err = r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs) + if err != nil { + return err } response.PrevEventsExist = true @@ -379,7 +350,7 @@ func (r *Queryer) QueryStateAndAuthChain( } authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe - authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs) + authEvents, err := GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs) if err != nil { return err } @@ -428,11 +399,11 @@ func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomIn type eventsFromIDs func(context.Context, []string) ([]types.Event, error) -// getAuthChain fetches the auth chain for the given auth events. An auth chain +// GetAuthChain fetches the auth chain for the given auth events. An auth chain // is the list of all events that are referenced in the auth_events section, and // all their auth_events, recursively. The returned set of events contain the // given events. Will *not* error if we don't have all auth events. -func getAuthChain( +func GetAuthChain( ctx context.Context, fn eventsFromIDs, authEventIDs []string, ) ([]gomatrixserverlib.Event, error) { // List of event IDs to fetch. On each pass, these events will be requested diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index 1ff1fc82b..b013bb2c7 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -25,12 +25,13 @@ const ( RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents" // Perform operations - RoomserverPerformInvitePath = "/roomserver/performInvite" - RoomserverPerformPeekPath = "/roomserver/performPeek" - RoomserverPerformJoinPath = "/roomserver/performJoin" - RoomserverPerformLeavePath = "/roomserver/performLeave" - RoomserverPerformBackfillPath = "/roomserver/performBackfill" - RoomserverPerformPublishPath = "/roomserver/performPublish" + RoomserverPerformInvitePath = "/roomserver/performInvite" + RoomserverPerformPeekPath = "/roomserver/performPeek" + RoomserverPerformJoinPath = "/roomserver/performJoin" + RoomserverPerformLeavePath = "/roomserver/performLeave" + RoomserverPerformBackfillPath = "/roomserver/performBackfill" + RoomserverPerformPublishPath = "/roomserver/performPublish" + RoomserverPerformHandleRemotePeekPath = "/roomserver/performHandleRemotePeek" // Query operations RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" @@ -203,6 +204,19 @@ func (h *httpRoomserverInternalAPI) PerformPeek( } } +func (h *httpRoomserverInternalAPI) PerformHandleRemotePeek( + ctx context.Context, + request *api.PerformHandleRemotePeekRequest, + response *api.PerformHandleRemotePeekResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformHandleRemotePeek") + defer span.Finish() + + apiURL := h.roomserverURL + RoomserverPerformHandleRemotePeekPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +// XXX: why do some of these Perform's return errors, and others don't? func (h *httpRoomserverInternalAPI) PerformLeave( ctx context.Context, request *api.PerformLeaveRequest, diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go index 5816d4d82..032e2b6e4 100644 --- a/roomserver/inthttp/server.go +++ b/roomserver/inthttp/server.go @@ -74,6 +74,19 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(RoomserverPerformHandleRemotePeekPath, + httputil.MakeInternalAPI("performHandleRemotePeek", func(req *http.Request) util.JSONResponse { + var request api.PerformHandleRemotePeekRequest + var response api.PerformHandleRemotePeekResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.PerformHandleRemotePeek(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) internalAPIMux.Handle(RoomserverPerformPublishPath, httputil.MakeInternalAPI("performPublish", func(req *http.Request) util.JSONResponse { var request api.PerformPublishRequest