rename ambiguous RemotePeek as InboundPeek

This commit is contained in:
Matthew Hodgson 2020-09-13 00:04:02 +01:00
parent 71732f2c28
commit 8f203febc1
11 changed files with 48 additions and 48 deletions

View file

@ -13,8 +13,8 @@ Peeking over federation is implemented as per [MSC2444](https://github.com/matri
For requests to peek our rooms ("inbound peeks"): For requests to peek our rooms ("inbound peeks"):
* Remote servers call `/peek` on federationapi * Remote servers call `/peek` on federationapi
* The federationapi queries the federationsender to check if this is renewing an inbound peek or not. * The federationapi queries the federationsender to check if this is renewing an inbound peek or not.
* If not, it hits the PerformHandleRemotePeek on the roomserver to ask it for the current state of the room. * If not, it hits the PerformInboundPeek on the roomserver to ask it for the current state of the room.
* The roomserver atomically (in theory) adds a NewRemotePeek to its kafka stream to tell the federationserver to start peeking. * The roomserver atomically (in theory) adds a NewInboundPeek to its kafka stream to tell the federationserver to start peeking.
* The federationsender receives the event, tracks the inbound peek in the federationsender_inbound_peeks table, and starts sending events to the peeking server. * The federationsender receives the event, tracks the inbound peek in the federationsender_inbound_peeks table, and starts sending events to the peeking server.
* The federationsender evicts stale inbound peeks which haven't been renewed. * The federationsender evicts stale inbound peeks which haven't been renewed.

View file

@ -24,7 +24,7 @@ import (
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
// Peek implements the SS /peek API // Peek implements the SS /peek API, handling inbound peeks
func Peek( func Peek(
httpReq *http.Request, httpReq *http.Request,
request *gomatrixserverlib.FederationRequest, request *gomatrixserverlib.FederationRequest,
@ -67,10 +67,10 @@ func Peek(
// tell the peeking server to renew every hour // tell the peeking server to renew every hour
renewalInterval := int64(60 * 60 * 1000 * 1000) renewalInterval := int64(60 * 60 * 1000 * 1000)
var response api.PerformHandleRemotePeekResponse var response api.PerformInboundPeekResponse
err := rsAPI.PerformHandleRemotePeek( err := rsAPI.PerformInboundPeek(
httpReq.Context(), httpReq.Context(),
&api.PerformHandleRemotePeekRequest{ &api.PerformInboundPeekRequest{
RoomID: roomID, RoomID: roomID,
PeekID: peekID, PeekID: peekID,
ServerName: request.Origin(), ServerName: request.Origin(),

View file

@ -97,10 +97,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}).Panicf("roomserver output log: write room event failure") }).Panicf("roomserver output log: write room event failure")
return nil return nil
} }
case api.OutputTypeNewRemotePeek: case api.OutputTypeNewInboundPeek:
if err := s.processRemotePeek(*output.NewRemotePeek); err != nil { if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event": output.NewRemotePeek, "event": output.NewInboundPeek,
log.ErrorKey: err, log.ErrorKey: err,
}).Panicf("roomserver output log: remote peek event failure") }).Panicf("roomserver output log: remote peek event failure")
return nil return nil
@ -115,9 +115,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
// processRemotePeek adds a new inbound peek (replacing the existing one if any) // processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any)
// causing the federationsender to start sending messages to the peeking server // causing the federationsender to start sending messages to the peeking server
func (s *OutputRoomEventConsumer) processRemotePeek(orp api.OutputNewRemotePeek) error { func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error {
return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval) return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
} }

View file

@ -48,10 +48,10 @@ type RoomserverInternalAPI interface {
res *PerformPublishResponse, res *PerformPublishResponse,
) )
PerformHandleRemotePeek( PerformInboundPeek(
ctx context.Context, ctx context.Context,
req *PerformHandleRemotePeekRequest, req *PerformInboundPeekRequest,
res *PerformHandleRemotePeekResponse, res *PerformInboundPeekResponse,
) error ) error
QueryPublishedRooms( QueryPublishedRooms(

View file

@ -75,13 +75,13 @@ func (t *RoomserverInternalAPITrace) PerformPublish(
util.GetLogger(ctx).Infof("PerformPublish req=%+v res=%+v", js(req), js(res)) util.GetLogger(ctx).Infof("PerformPublish req=%+v res=%+v", js(req), js(res))
} }
func (t *RoomserverInternalAPITrace) PerformHandleRemotePeek( func (t *RoomserverInternalAPITrace) PerformInboundPeek(
ctx context.Context, ctx context.Context,
req *PerformHandleRemotePeekRequest, req *PerformInboundPeekRequest,
res *PerformHandleRemotePeekResponse, res *PerformInboundPeekResponse,
) error { ) error {
err := t.Impl.PerformHandleRemotePeek(ctx, req, res) err := t.Impl.PerformInboundPeek(ctx, req, res)
util.GetLogger(ctx).Infof("PerformHandleRemotePeek req=%+v res=%+v", js(req), js(res)) util.GetLogger(ctx).Infof("PerformInboundPeek req=%+v res=%+v", js(req), js(res))
return err return err
} }

View file

@ -50,8 +50,8 @@ const (
// OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek // OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek
OutputTypeNewPeek OutputType = "new_peek" OutputTypeNewPeek OutputType = "new_peek"
// OutputTypeNewRemotePeek indicates that the kafka event is an OutputNewRemotePeek // OutputTypeNewInboundPeek indicates that the kafka event is an OutputNewInboundPeek
OutputTypeNewRemotePeek OutputType = "new_remote_peek" OutputTypeNewInboundPeek OutputType = "new_remote_peek"
) )
// An OutputEvent is an entry in the roomserver output kafka log. // An OutputEvent is an entry in the roomserver output kafka log.
@ -69,8 +69,8 @@ type OutputEvent struct {
RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"` RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"`
// The content of event with type OutputTypeNewPeek // The content of event with type OutputTypeNewPeek
NewPeek *OutputNewPeek `json:"new_peek,omitempty"` NewPeek *OutputNewPeek `json:"new_peek,omitempty"`
// The content of event with type OutputTypeNewRemotePeek // The content of event with type OutputTypeNewInboundPeek
NewRemotePeek *OutputNewRemotePeek `json:"new_remote_peek,omitempty"` NewInboundPeek *OutputNewInboundPeek `json:"new_remote_peek,omitempty"`
} }
// An OutputNewRoomEvent is written when the roomserver receives a new event. // An OutputNewRoomEvent is written when the roomserver receives a new event.
@ -214,8 +214,8 @@ type OutputNewPeek struct {
DeviceID string DeviceID string
} }
// An OutputNewRemotePeek is written whenever a server starts peeking into a room // An OutputNewInboundPeek is written whenever a server starts peeking into a room
type OutputNewRemotePeek struct { type OutputNewInboundPeek struct {
RoomID string RoomID string
PeekID string PeekID string
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName

View file

@ -160,7 +160,7 @@ type PerformPublishResponse struct {
Error *PerformError Error *PerformError
} }
type PerformHandleRemotePeekRequest struct { type PerformInboundPeekRequest struct {
UserID string `json:"user_id"` UserID string `json:"user_id"`
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
PeekID string `json:"peek_id"` PeekID string `json:"peek_id"`
@ -168,7 +168,7 @@ type PerformHandleRemotePeekRequest struct {
RenewalInterval int64 `json:"renewal_interval"` RenewalInterval int64 `json:"renewal_interval"`
} }
type PerformHandleRemotePeekResponse struct { type PerformInboundPeekResponse struct {
// Does the room exist on this roomserver? // Does the room exist on this roomserver?
// If the room doesn't exist this will be false and StateEvents will be empty. // If the room doesn't exist this will be false and StateEvents will be empty.
RoomExists bool `json:"room_exists"` RoomExists bool `json:"room_exists"`

View file

@ -23,7 +23,7 @@ type RoomserverInternalAPI struct {
*perform.Inviter *perform.Inviter
*perform.Joiner *perform.Joiner
*perform.Peeker *perform.Peeker
*perform.HandleRemotePeeker *perform.InboundPeeker
*perform.Leaver *perform.Leaver
*perform.Publisher *perform.Publisher
*perform.Backfiller *perform.Backfiller
@ -92,7 +92,7 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
Inputer: r.Inputer, Inputer: r.Inputer,
} }
r.HandleRemotePeeker = &perform.HandleRemotePeeker{ r.InboundPeeker = &perform.InboundPeeker{
DB: r.DB, DB: r.DB,
Inputer: r.Inputer, Inputer: r.Inputer,
} }

View file

@ -28,23 +28,23 @@ import (
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
type HandleRemotePeeker struct { type InboundPeeker struct {
DB storage.Database DB storage.Database
Inputer *input.Inputer Inputer *input.Inputer
} }
// PerformHandleRemotePeek handles peeking into matrix rooms, including over // PerformInboundPeek handles peeking into matrix rooms, including over
// federation by talking to the federationsender. called when a remote server // federation by talking to the federationsender. called when a remote server
// initiates a /peek over federation. // initiates a /peek over federation.
// //
// It should atomically figure out the current state of the room (for the // It should atomically figure out the current state of the room (for the
// response to /peek) while adding the remotepeek to the kafka stream so the // response to /peek) while adding the new inbound peek to the kafka stream so the
// fed sender can start sending peeked events without a race between the state // fed sender can start sending peeked events without a race between the state
// snapshot and the stream of peeked events. // snapshot and the stream of peeked events.
func (r *HandleRemotePeeker) PerformHandleRemotePeek( func (r *InboundPeeker) PerformInboundPeek(
ctx context.Context, ctx context.Context,
request *api.PerformHandleRemotePeekRequest, request *api.PerformInboundPeekRequest,
response *api.PerformHandleRemotePeekResponse, response *api.PerformInboundPeekResponse,
) error { ) error {
info, err := r.DB.RoomInfo(ctx, request.RoomID) info, err := r.DB.RoomInfo(ctx, request.RoomID)
if err != nil { if err != nil {
@ -105,8 +105,8 @@ func (r *HandleRemotePeeker) PerformHandleRemotePeek(
err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{ err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{
{ {
Type: api.OutputTypeNewRemotePeek, Type: api.OutputTypeNewInboundPeek,
NewRemotePeek: &api.OutputNewRemotePeek{ NewInboundPeek: &api.OutputNewInboundPeek{
RoomID: request.RoomID, RoomID: request.RoomID,
PeekID: request.PeekID, PeekID: request.PeekID,
ServerName: request.ServerName, ServerName: request.ServerName,

View file

@ -31,7 +31,7 @@ const (
RoomserverPerformLeavePath = "/roomserver/performLeave" RoomserverPerformLeavePath = "/roomserver/performLeave"
RoomserverPerformBackfillPath = "/roomserver/performBackfill" RoomserverPerformBackfillPath = "/roomserver/performBackfill"
RoomserverPerformPublishPath = "/roomserver/performPublish" RoomserverPerformPublishPath = "/roomserver/performPublish"
RoomserverPerformHandleRemotePeekPath = "/roomserver/performHandleRemotePeek" RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
// Query operations // Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@ -204,15 +204,15 @@ func (h *httpRoomserverInternalAPI) PerformPeek(
} }
} }
func (h *httpRoomserverInternalAPI) PerformHandleRemotePeek( func (h *httpRoomserverInternalAPI) PerformInboundPeek(
ctx context.Context, ctx context.Context,
request *api.PerformHandleRemotePeekRequest, request *api.PerformInboundPeekRequest,
response *api.PerformHandleRemotePeekResponse, response *api.PerformInboundPeekResponse,
) error { ) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformHandleRemotePeek") span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInboundPeek")
defer span.Finish() defer span.Finish()
apiURL := h.roomserverURL + RoomserverPerformHandleRemotePeekPath apiURL := h.roomserverURL + RoomserverPerformInboundPeekPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }

View file

@ -74,14 +74,14 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(RoomserverPerformHandleRemotePeekPath, internalAPIMux.Handle(RoomserverPerformInboundPeekPath,
httputil.MakeInternalAPI("performHandleRemotePeek", func(req *http.Request) util.JSONResponse { httputil.MakeInternalAPI("performInboundPeek", func(req *http.Request) util.JSONResponse {
var request api.PerformHandleRemotePeekRequest var request api.PerformInboundPeekRequest
var response api.PerformHandleRemotePeekResponse var response api.PerformInboundPeekResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error()) return util.MessageResponse(http.StatusBadRequest, err.Error())
} }
if err := r.PerformHandleRemotePeek(req.Context(), &request, &response); err != nil { if err := r.PerformInboundPeek(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}