diff --git a/docs/peeking.md b/docs/peeking.md index 194da6520..60f359072 100644 --- a/docs/peeking.md +++ b/docs/peeking.md @@ -13,8 +13,8 @@ Peeking over federation is implemented as per [MSC2444](https://github.com/matri For requests to peek our rooms ("inbound peeks"): * Remote servers call `/peek` on federationapi * The federationapi queries the federationsender to check if this is renewing an inbound peek or not. - * If not, it hits the PerformHandleRemotePeek on the roomserver to ask it for the current state of the room. - * The roomserver atomically (in theory) adds a NewRemotePeek to its kafka stream to tell the federationserver to start peeking. + * If not, it hits the PerformInboundPeek on the roomserver to ask it for the current state of the room. + * The roomserver atomically (in theory) adds a NewInboundPeek to its kafka stream to tell the federationserver to start peeking. * The federationsender receives the event, tracks the inbound peek in the federationsender_inbound_peeks table, and starts sending events to the peeking server. * The federationsender evicts stale inbound peeks which haven't been renewed. diff --git a/federationapi/routing/peek.go b/federationapi/routing/peek.go index a6d0739c6..6fb3aa7ce 100644 --- a/federationapi/routing/peek.go +++ b/federationapi/routing/peek.go @@ -24,7 +24,7 @@ import ( "github.com/matrix-org/util" ) -// Peek implements the SS /peek API +// Peek implements the SS /peek API, handling inbound peeks func Peek( httpReq *http.Request, request *gomatrixserverlib.FederationRequest, @@ -67,10 +67,10 @@ func Peek( // tell the peeking server to renew every hour renewalInterval := int64(60 * 60 * 1000 * 1000) - var response api.PerformHandleRemotePeekResponse - err := rsAPI.PerformHandleRemotePeek( + var response api.PerformInboundPeekResponse + err := rsAPI.PerformInboundPeek( httpReq.Context(), - &api.PerformHandleRemotePeekRequest{ + &api.PerformInboundPeekRequest{ RoomID: roomID, PeekID: peekID, ServerName: request.Origin(), diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 6215edb24..182feb49c 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -97,10 +97,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write room event failure") return nil } - case api.OutputTypeNewRemotePeek: - if err := s.processRemotePeek(*output.NewRemotePeek); err != nil { + case api.OutputTypeNewInboundPeek: + if err := s.processInboundPeek(*output.NewInboundPeek); err != nil { log.WithFields(log.Fields{ - "event": output.NewRemotePeek, + "event": output.NewInboundPeek, log.ErrorKey: err, }).Panicf("roomserver output log: remote peek event failure") return nil @@ -115,9 +115,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } -// processRemotePeek adds a new inbound peek (replacing the existing one if any) +// processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any) // causing the federationsender to start sending messages to the peeking server -func (s *OutputRoomEventConsumer) processRemotePeek(orp api.OutputNewRemotePeek) error { +func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error { return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval) } diff --git a/roomserver/api/api.go b/roomserver/api/api.go index 08a269a82..cc0237e6e 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -48,10 +48,10 @@ type RoomserverInternalAPI interface { res *PerformPublishResponse, ) - PerformHandleRemotePeek( + PerformInboundPeek( ctx context.Context, - req *PerformHandleRemotePeekRequest, - res *PerformHandleRemotePeekResponse, + req *PerformInboundPeekRequest, + res *PerformInboundPeekResponse, ) error QueryPublishedRooms( diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go index 4be566d4f..6a1442163 100644 --- a/roomserver/api/api_trace.go +++ b/roomserver/api/api_trace.go @@ -75,13 +75,13 @@ func (t *RoomserverInternalAPITrace) PerformPublish( util.GetLogger(ctx).Infof("PerformPublish req=%+v res=%+v", js(req), js(res)) } -func (t *RoomserverInternalAPITrace) PerformHandleRemotePeek( +func (t *RoomserverInternalAPITrace) PerformInboundPeek( ctx context.Context, - req *PerformHandleRemotePeekRequest, - res *PerformHandleRemotePeekResponse, + req *PerformInboundPeekRequest, + res *PerformInboundPeekResponse, ) error { - err := t.Impl.PerformHandleRemotePeek(ctx, req, res) - util.GetLogger(ctx).Infof("PerformHandleRemotePeek req=%+v res=%+v", js(req), js(res)) + err := t.Impl.PerformInboundPeek(ctx, req, res) + util.GetLogger(ctx).Infof("PerformInboundPeek req=%+v res=%+v", js(req), js(res)) return err } diff --git a/roomserver/api/output.go b/roomserver/api/output.go index ec4186a5e..ea0c7eefe 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -50,8 +50,8 @@ const ( // OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek OutputTypeNewPeek OutputType = "new_peek" - // OutputTypeNewRemotePeek indicates that the kafka event is an OutputNewRemotePeek - OutputTypeNewRemotePeek OutputType = "new_remote_peek" + // OutputTypeNewInboundPeek indicates that the kafka event is an OutputNewInboundPeek + OutputTypeNewInboundPeek OutputType = "new_remote_peek" ) // An OutputEvent is an entry in the roomserver output kafka log. @@ -69,8 +69,8 @@ type OutputEvent struct { RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"` // The content of event with type OutputTypeNewPeek NewPeek *OutputNewPeek `json:"new_peek,omitempty"` - // The content of event with type OutputTypeNewRemotePeek - NewRemotePeek *OutputNewRemotePeek `json:"new_remote_peek,omitempty"` + // The content of event with type OutputTypeNewInboundPeek + NewInboundPeek *OutputNewInboundPeek `json:"new_remote_peek,omitempty"` } // An OutputNewRoomEvent is written when the roomserver receives a new event. @@ -214,8 +214,8 @@ type OutputNewPeek struct { DeviceID string } -// An OutputNewRemotePeek is written whenever a server starts peeking into a room -type OutputNewRemotePeek struct { +// An OutputNewInboundPeek is written whenever a server starts peeking into a room +type OutputNewInboundPeek struct { RoomID string PeekID string ServerName gomatrixserverlib.ServerName diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index b7c713330..3b002474e 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -160,7 +160,7 @@ type PerformPublishResponse struct { Error *PerformError } -type PerformHandleRemotePeekRequest struct { +type PerformInboundPeekRequest struct { UserID string `json:"user_id"` RoomID string `json:"room_id"` PeekID string `json:"peek_id"` @@ -168,7 +168,7 @@ type PerformHandleRemotePeekRequest struct { RenewalInterval int64 `json:"renewal_interval"` } -type PerformHandleRemotePeekResponse struct { +type PerformInboundPeekResponse struct { // Does the room exist on this roomserver? // If the room doesn't exist this will be false and StateEvents will be empty. RoomExists bool `json:"room_exists"` diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index e4f645c1e..c1414c09d 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -23,7 +23,7 @@ type RoomserverInternalAPI struct { *perform.Inviter *perform.Joiner *perform.Peeker - *perform.HandleRemotePeeker + *perform.InboundPeeker *perform.Leaver *perform.Publisher *perform.Backfiller @@ -92,7 +92,7 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen FSAPI: r.fsAPI, Inputer: r.Inputer, } - r.HandleRemotePeeker = &perform.HandleRemotePeeker{ + r.InboundPeeker = &perform.InboundPeeker{ DB: r.DB, Inputer: r.Inputer, } diff --git a/roomserver/internal/perform/perform_handle_remote_peek.go b/roomserver/internal/perform/perform_inbound_peek.go similarity index 89% rename from roomserver/internal/perform/perform_handle_remote_peek.go rename to roomserver/internal/perform/perform_inbound_peek.go index a2aec73a7..e6520b573 100644 --- a/roomserver/internal/perform/perform_handle_remote_peek.go +++ b/roomserver/internal/perform/perform_inbound_peek.go @@ -28,23 +28,23 @@ import ( "github.com/matrix-org/util" ) -type HandleRemotePeeker struct { +type InboundPeeker struct { DB storage.Database Inputer *input.Inputer } -// PerformHandleRemotePeek handles peeking into matrix rooms, including over +// PerformInboundPeek handles peeking into matrix rooms, including over // federation by talking to the federationsender. called when a remote server // initiates a /peek over federation. // // It should atomically figure out the current state of the room (for the -// response to /peek) while adding the remotepeek to the kafka stream so the +// response to /peek) while adding the new inbound peek to the kafka stream so the // fed sender can start sending peeked events without a race between the state // snapshot and the stream of peeked events. -func (r *HandleRemotePeeker) PerformHandleRemotePeek( +func (r *InboundPeeker) PerformInboundPeek( ctx context.Context, - request *api.PerformHandleRemotePeekRequest, - response *api.PerformHandleRemotePeekResponse, + request *api.PerformInboundPeekRequest, + response *api.PerformInboundPeekResponse, ) error { info, err := r.DB.RoomInfo(ctx, request.RoomID) if err != nil { @@ -105,8 +105,8 @@ func (r *HandleRemotePeeker) PerformHandleRemotePeek( err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{ { - Type: api.OutputTypeNewRemotePeek, - NewRemotePeek: &api.OutputNewRemotePeek{ + Type: api.OutputTypeNewInboundPeek, + NewInboundPeek: &api.OutputNewInboundPeek{ RoomID: request.RoomID, PeekID: request.PeekID, ServerName: request.ServerName, diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index b013bb2c7..e70d2f0ce 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -31,7 +31,7 @@ const ( RoomserverPerformLeavePath = "/roomserver/performLeave" RoomserverPerformBackfillPath = "/roomserver/performBackfill" RoomserverPerformPublishPath = "/roomserver/performPublish" - RoomserverPerformHandleRemotePeekPath = "/roomserver/performHandleRemotePeek" + RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek" // Query operations RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" @@ -204,15 +204,15 @@ func (h *httpRoomserverInternalAPI) PerformPeek( } } -func (h *httpRoomserverInternalAPI) PerformHandleRemotePeek( +func (h *httpRoomserverInternalAPI) PerformInboundPeek( ctx context.Context, - request *api.PerformHandleRemotePeekRequest, - response *api.PerformHandleRemotePeekResponse, + request *api.PerformInboundPeekRequest, + response *api.PerformInboundPeekResponse, ) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "PerformHandleRemotePeek") + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInboundPeek") defer span.Finish() - apiURL := h.roomserverURL + RoomserverPerformHandleRemotePeekPath + apiURL := h.roomserverURL + RoomserverPerformInboundPeekPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go index 032e2b6e4..ffa8acf6e 100644 --- a/roomserver/inthttp/server.go +++ b/roomserver/inthttp/server.go @@ -74,14 +74,14 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) - internalAPIMux.Handle(RoomserverPerformHandleRemotePeekPath, - httputil.MakeInternalAPI("performHandleRemotePeek", func(req *http.Request) util.JSONResponse { - var request api.PerformHandleRemotePeekRequest - var response api.PerformHandleRemotePeekResponse + internalAPIMux.Handle(RoomserverPerformInboundPeekPath, + httputil.MakeInternalAPI("performInboundPeek", func(req *http.Request) util.JSONResponse { + var request api.PerformInboundPeekRequest + var response api.PerformInboundPeekResponse if err := json.NewDecoder(req.Body).Decode(&request); err != nil { return util.MessageResponse(http.StatusBadRequest, err.Error()) } - if err := r.PerformHandleRemotePeek(req.Context(), &request, &response); err != nil { + if err := r.PerformInboundPeek(req.Context(), &request, &response); err != nil { return util.ErrorResponse(err) } return util.JSONResponse{Code: http.StatusOK, JSON: &response}