diff --git a/federationsender/api/input.go b/federationsender/api/input.go new file mode 100644 index 000000000..83c517cc2 --- /dev/null +++ b/federationsender/api/input.go @@ -0,0 +1,94 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package api provides the types that are used to communicate with the roomserver. +package api + +import ( + "context" + "errors" + "net/http" + + commonHTTP "github.com/matrix-org/dendrite/common/http" + opentracing "github.com/opentracing/opentracing-go" +) + +// FederationSenderInputAPI is used to write events to the room server. +type FederationSenderInputAPI interface { + InputJoinRequest( + ctx context.Context, + request *InputJoinRequest, + response *InputJoinResponse, + ) error + InputLeaveRequest( + ctx context.Context, + request *InputLeaveRequest, + response *InputLeaveResponse, + ) error +} + +const RoomserverInputJoinRequestPath = "/api/roomserver/inputJoinRequest" +const RoomserverInputLeaveRequestPath = "/api/roomserver/inputLeaveRequest" + +type InputJoinRequest struct { + RoomID string `json:"room_id"` +} + +type InputJoinResponse struct { +} + +type InputLeaveRequest struct { + RoomID string `json:"room_id"` +} + +type InputLeaveResponse struct { +} + +// NewFederationSenderInputAPIHTTP creates a FederationSenderInputAPI implemented by talking to a HTTP POST API. +// If httpClient is nil an error is returned +func NewFederationSenderInputAPIHTTP(roomserverURL string, httpClient *http.Client) (FederationSenderInputAPI, error) { + if httpClient == nil { + return nil, errors.New("NewFederationSenderInputAPIHTTP: httpClient is ") + } + return &httpFederationSenderInputAPI{roomserverURL, httpClient}, nil +} + +type httpFederationSenderInputAPI struct { + roomserverURL string + httpClient *http.Client +} + +func (h *httpFederationSenderInputAPI) InputJoinRequest( + ctx context.Context, + request *InputJoinRequest, + response *InputJoinResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "InputRoomEvents") + defer span.Finish() + + apiURL := h.roomserverURL + RoomserverInputJoinRequestPath + return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +func (h *httpFederationSenderInputAPI) InputLeaveRequest( + ctx context.Context, + request *InputLeaveRequest, + response *InputLeaveResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "InputRoomEvents") + defer span.Finish() + + apiURL := h.roomserverURL + RoomserverInputLeaveRequestPath + return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/federationsender/input/input.go b/federationsender/input/input.go new file mode 100644 index 000000000..4a323af60 --- /dev/null +++ b/federationsender/input/input.go @@ -0,0 +1,84 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package input + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/util" +) + +// FederationSenderInputAPI implements api.FederationSenderInputAPI +type FederationSenderInputAPI struct { + DB storage.Database + Producer sarama.SyncProducer + // The kafkaesque topic to output new room events to. + // This is the name used in kafka to identify the stream to write events to. + OutputRoomEventTopic string +} + +// SetupHTTP adds the FederationSenderInputAPI handlers to the http.ServeMux. +func (r *FederationSenderInputAPI) SetupHTTP(servMux *http.ServeMux) { + servMux.Handle(api.RoomserverInputJoinRequestPath, + common.MakeInternalAPI("inputJoinRequest", func(req *http.Request) util.JSONResponse { + var request api.InputJoinRequest + var response api.InputJoinResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.InputJoinRequest(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + servMux.Handle(api.RoomserverInputLeaveRequestPath, + common.MakeInternalAPI("inputLeaveRequest", func(req *http.Request) util.JSONResponse { + var request api.InputLeaveRequest + var response api.InputLeaveResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.InputLeaveRequest(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) +} + +// InputJoinRequest implements api.FederationSenderInputAPI +func (r *FederationSenderInputAPI) InputJoinRequest( + ctx context.Context, + request *api.InputJoinRequest, + response *api.InputJoinResponse, +) (err error) { + return nil +} + +// InputLeaveRequest implements api.FederationSenderInputAPI +func (r *FederationSenderInputAPI) InputLeaveRequest( + ctx context.Context, + request *api.InputLeaveRequest, + response *api.InputLeaveResponse, +) (err error) { + return nil +}