Merge input API into query API

This commit is contained in:
Neil Alexander 2020-04-29 10:01:12 +01:00
parent bd750aaf00
commit 29ce2b78e7
8 changed files with 204 additions and 254 deletions

View file

@ -0,0 +1,53 @@
package api
import (
"context"
"errors"
"net/http"
)
// FederationSenderQueryAPI is used to query information from the federation sender.
type FederationSenderQueryAPI interface {
// Query the joined hosts and the membership events accounting for their participation in a room.
// Note that if a server has multiple users in the room, it will have multiple entries in the returned slice.
// See `QueryJoinedHostServerNamesInRoom` for a de-duplicated version.
QueryJoinedHostsInRoom(
ctx context.Context,
request *QueryJoinedHostsInRoomRequest,
response *QueryJoinedHostsInRoomResponse,
) error
// Query the server names of the joined hosts in a room.
// Unlike QueryJoinedHostsInRoom, this function returns a de-duplicated slice
// containing only the server names (without information for membership events).
QueryJoinedHostServerNamesInRoom(
ctx context.Context,
request *QueryJoinedHostServerNamesInRoomRequest,
response *QueryJoinedHostServerNamesInRoomResponse,
) error
// Handle an instruction to make_join & send_join with a remote server.
InputJoinRequest(
ctx context.Context,
request *InputJoinRequest,
response *InputJoinResponse,
) error
// Handle an instruction to make_leave & send_leave with a remote server.
InputLeaveRequest(
ctx context.Context,
request *InputLeaveRequest,
response *InputLeaveResponse,
) error
}
// NewFederationSenderQueryAPIHTTP creates a FederationSenderQueryAPI implemented by talking to a HTTP POST API.
// If httpClient is nil an error is returned
func NewFederationSenderQueryAPIHTTP(federationSenderURL string, httpClient *http.Client) (FederationSenderQueryAPI, error) {
if httpClient == nil {
return nil, errors.New("NewFederationSenderQueryAPIHTTP: httpClient is <nil>")
}
return &httpFederationSenderQueryAPI{federationSenderURL, httpClient}, nil
}
type httpFederationSenderQueryAPI struct {
federationSenderURL string
httpClient *http.Client
}

View file

@ -1,45 +1,16 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package api provides the types that are used to communicate with the roomserver.
package api package api
import ( import (
"context" "context"
"errors"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http" commonHTTP "github.com/matrix-org/dendrite/common/http"
opentracing "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
) )
// FederationSenderInputAPI is used to write events to the room server. const (
type FederationSenderInputAPI interface { FederationSenderInputJoinRequestPath = "/api/federationsender/inputJoinRequest"
InputJoinRequest( FederationSenderInputLeaveRequestPath = "/api/federationsender/inputLeaveRequest"
ctx context.Context, )
request *InputJoinRequest,
response *InputJoinResponse,
) error
InputLeaveRequest(
ctx context.Context,
request *InputLeaveRequest,
response *InputLeaveResponse,
) error
}
const RoomserverInputJoinRequestPath = "/api/roomserver/inputJoinRequest"
const RoomserverInputLeaveRequestPath = "/api/roomserver/inputLeaveRequest"
type InputJoinRequest struct { type InputJoinRequest struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
@ -48,6 +19,19 @@ type InputJoinRequest struct {
type InputJoinResponse struct { type InputJoinResponse struct {
} }
// Handle an instruction to make_join & send_join with a remote server.
func (h *httpFederationSenderQueryAPI) InputJoinRequest(
ctx context.Context,
request *InputJoinRequest,
response *InputJoinResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputJoinRequest")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderInputJoinRequestPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
type InputLeaveRequest struct { type InputLeaveRequest struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
} }
@ -55,40 +39,15 @@ type InputLeaveRequest struct {
type InputLeaveResponse struct { type InputLeaveResponse struct {
} }
// NewFederationSenderInputAPIHTTP creates a FederationSenderInputAPI implemented by talking to a HTTP POST API. // Handle an instruction to make_leave & send_leave with a remote server.
// If httpClient is nil an error is returned func (h *httpFederationSenderQueryAPI) InputLeaveRequest(
func NewFederationSenderInputAPIHTTP(roomserverURL string, httpClient *http.Client) (FederationSenderInputAPI, error) {
if httpClient == nil {
return nil, errors.New("NewFederationSenderInputAPIHTTP: httpClient is <nil>")
}
return &httpFederationSenderInputAPI{roomserverURL, httpClient}, nil
}
type httpFederationSenderInputAPI struct {
roomserverURL string
httpClient *http.Client
}
func (h *httpFederationSenderInputAPI) InputJoinRequest(
ctx context.Context,
request *InputJoinRequest,
response *InputJoinResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputRoomEvents")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverInputJoinRequestPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
func (h *httpFederationSenderInputAPI) InputLeaveRequest(
ctx context.Context, ctx context.Context,
request *InputLeaveRequest, request *InputLeaveRequest,
response *InputLeaveResponse, response *InputLeaveResponse,
) error { ) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputRoomEvents") span, ctx := opentracing.StartSpanFromContext(ctx, "InputLeaveRequest")
defer span.Finish() defer span.Finish()
apiURL := h.roomserverURL + RoomserverInputLeaveRequestPath apiURL := h.federationSenderURL + FederationSenderInputLeaveRequestPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }

View file

@ -2,16 +2,20 @@ package api
import ( import (
"context" "context"
"errors"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http" commonHTTP "github.com/matrix-org/dendrite/common/http"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
) )
// FederationSenderQueryJoinedHostsInRoomPath is the HTTP path for the QueryJoinedHostsInRoom API.
const FederationSenderQueryJoinedHostsInRoomPath = "/api/federationsender/queryJoinedHostsInRoom"
// FederationSenderQueryJoinedHostServerNamesInRoomPath is the HTTP path for the QueryJoinedHostServerNamesInRoom API.
const FederationSenderQueryJoinedHostServerNamesInRoomPath = "/api/federationsender/queryJoinedHostServerNamesInRoom"
// QueryJoinedHostsInRoomRequest is a request to QueryJoinedHostsInRoom // QueryJoinedHostsInRoomRequest is a request to QueryJoinedHostsInRoom
type QueryJoinedHostsInRoomRequest struct { type QueryJoinedHostsInRoomRequest struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
@ -22,56 +26,6 @@ type QueryJoinedHostsInRoomResponse struct {
JoinedHosts []types.JoinedHost `json:"joined_hosts"` JoinedHosts []types.JoinedHost `json:"joined_hosts"`
} }
// QueryJoinedHostServerNamesRequest is a request to QueryJoinedHostServerNames
type QueryJoinedHostServerNamesInRoomRequest struct {
RoomID string `json:"room_id"`
}
// QueryJoinedHostServerNamesResponse is a response to QueryJoinedHostServerNames
type QueryJoinedHostServerNamesInRoomResponse struct {
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}
// FederationSenderQueryAPI is used to query information from the federation sender.
type FederationSenderQueryAPI interface {
// Query the joined hosts and the membership events accounting for their participation in a room.
// Note that if a server has multiple users in the room, it will have multiple entries in the returned slice.
// See `QueryJoinedHostServerNamesInRoom` for a de-duplicated version.
QueryJoinedHostsInRoom(
ctx context.Context,
request *QueryJoinedHostsInRoomRequest,
response *QueryJoinedHostsInRoomResponse,
) error
// Query the server names of the joined hosts in a room.
// Unlike QueryJoinedHostsInRoom, this function returns a de-duplicated slice
// containing only the server names (without information for membership events).
QueryJoinedHostServerNamesInRoom(
ctx context.Context,
request *QueryJoinedHostServerNamesInRoomRequest,
response *QueryJoinedHostServerNamesInRoomResponse,
) error
}
// FederationSenderQueryJoinedHostsInRoomPath is the HTTP path for the QueryJoinedHostsInRoom API.
const FederationSenderQueryJoinedHostsInRoomPath = "/api/federationsender/queryJoinedHostsInRoom"
// FederationSenderQueryJoinedHostServerNamesInRoomPath is the HTTP path for the QueryJoinedHostServerNamesInRoom API.
const FederationSenderQueryJoinedHostServerNamesInRoomPath = "/api/federationsender/queryJoinedHostServerNamesInRoom"
// NewFederationSenderQueryAPIHTTP creates a FederationSenderQueryAPI implemented by talking to a HTTP POST API.
// If httpClient is nil an error is returned
func NewFederationSenderQueryAPIHTTP(federationSenderURL string, httpClient *http.Client) (FederationSenderQueryAPI, error) {
if httpClient == nil {
return nil, errors.New("NewFederationSenderQueryAPIHTTP: httpClient is <nil>")
}
return &httpFederationSenderQueryAPI{federationSenderURL, httpClient}, nil
}
type httpFederationSenderQueryAPI struct {
federationSenderURL string
httpClient *http.Client
}
// QueryJoinedHostsInRoom implements FederationSenderQueryAPI // QueryJoinedHostsInRoom implements FederationSenderQueryAPI
func (h *httpFederationSenderQueryAPI) QueryJoinedHostsInRoom( func (h *httpFederationSenderQueryAPI) QueryJoinedHostsInRoom(
ctx context.Context, ctx context.Context,
@ -85,6 +39,16 @@ func (h *httpFederationSenderQueryAPI) QueryJoinedHostsInRoom(
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// QueryJoinedHostServerNamesRequest is a request to QueryJoinedHostServerNames
type QueryJoinedHostServerNamesInRoomRequest struct {
RoomID string `json:"room_id"`
}
// QueryJoinedHostServerNamesResponse is a response to QueryJoinedHostServerNames
type QueryJoinedHostServerNamesInRoomResponse struct {
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}
// QueryJoinedHostServerNamesInRoom implements FederationSenderQueryAPI // QueryJoinedHostServerNamesInRoom implements FederationSenderQueryAPI
func (h *httpFederationSenderQueryAPI) QueryJoinedHostServerNamesInRoom( func (h *httpFederationSenderQueryAPI) QueryJoinedHostServerNamesInRoom(
ctx context.Context, ctx context.Context,

View file

@ -20,7 +20,6 @@ import (
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/input"
"github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/query" "github.com/matrix-org/dendrite/federationsender/query"
"github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/queue"
@ -37,7 +36,7 @@ func SetupFederationSenderComponent(
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
rsQueryAPI roomserverAPI.RoomserverQueryAPI, rsQueryAPI roomserverAPI.RoomserverQueryAPI,
rsInputAPI roomserverAPI.RoomserverInputAPI, rsInputAPI roomserverAPI.RoomserverInputAPI,
) (api.FederationSenderQueryAPI, api.FederationSenderInputAPI) { ) api.FederationSenderQueryAPI {
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender)) federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender))
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to connect to federation sender db") logrus.WithError(err).Panic("failed to connect to federation sender db")
@ -62,16 +61,10 @@ func SetupFederationSenderComponent(
logrus.WithError(err).Panic("failed to start typing server consumer") logrus.WithError(err).Panic("failed to start typing server consumer")
} }
inputAPI := input.FederationSenderInputAPI{
RoomserverInputAPI: rsInputAPI,
}
inputAPI.SetupHTTP(http.DefaultServeMux)
queryAPI := query.FederationSenderQueryAPI{ queryAPI := query.FederationSenderQueryAPI{
DB: federationSenderDB, DB: federationSenderDB,
} }
queryAPI.SetupHTTP(http.DefaultServeMux) queryAPI.SetupHTTP(http.DefaultServeMux)
return &queryAPI, &inputAPI return &queryAPI
} }

View file

@ -1,80 +0,0 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package input
import (
"context"
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/util"
rsAPI "github.com/matrix-org/dendrite/roomserver/api"
)
// FederationSenderInputAPI implements api.FederationSenderInputAPI
type FederationSenderInputAPI struct {
RoomserverInputAPI rsAPI.RoomserverInputAPI
}
// SetupHTTP adds the FederationSenderInputAPI handlers to the http.ServeMux.
func (r *FederationSenderInputAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(api.RoomserverInputJoinRequestPath,
common.MakeInternalAPI("inputJoinRequest", func(req *http.Request) util.JSONResponse {
var request api.InputJoinRequest
var response api.InputJoinResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := r.InputJoinRequest(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(api.RoomserverInputLeaveRequestPath,
common.MakeInternalAPI("inputLeaveRequest", func(req *http.Request) util.JSONResponse {
var request api.InputLeaveRequest
var response api.InputLeaveResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := r.InputLeaveRequest(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}
// InputJoinRequest implements api.FederationSenderInputAPI
func (r *FederationSenderInputAPI) InputJoinRequest(
ctx context.Context,
request *api.InputJoinRequest,
response *api.InputJoinResponse,
) (err error) {
return nil
}
// InputLeaveRequest implements api.FederationSenderInputAPI
func (r *FederationSenderInputAPI) InputLeaveRequest(
ctx context.Context,
request *api.InputLeaveRequest,
response *api.InputLeaveResponse,
) (err error) {
return nil
}

View file

@ -0,0 +1,85 @@
package query
import (
"context"
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/types"
rsAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/util"
)
// FederationSenderQueryDatabase has the APIs needed to implement the query API.
type FederationSenderQueryDatabase interface {
GetJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error)
}
// FederationSenderQueryAPI is an implementation of api.FederationSenderQueryAPI
type FederationSenderQueryAPI struct {
api.FederationSenderQueryAPI
DB FederationSenderQueryDatabase
RoomserverInputAPI rsAPI.RoomserverInputAPI
}
// SetupHTTP adds the FederationSenderQueryAPI handlers to the http.ServeMux.
func (f *FederationSenderQueryAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(
api.FederationSenderQueryJoinedHostsInRoomPath,
common.MakeInternalAPI("QueryJoinedHostsInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostsInRoomRequest
var response api.QueryJoinedHostsInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostsInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(
api.FederationSenderQueryJoinedHostServerNamesInRoomPath,
common.MakeInternalAPI("QueryJoinedHostServerNamesInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostServerNamesInRoomRequest
var response api.QueryJoinedHostServerNamesInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostServerNamesInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(api.FederationSenderInputJoinRequestPath,
common.MakeInternalAPI("inputJoinRequest", func(req *http.Request) util.JSONResponse {
var request api.InputJoinRequest
var response api.InputJoinResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := f.InputJoinRequest(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(api.FederationSenderInputLeaveRequestPath,
common.MakeInternalAPI("inputLeaveRequest", func(req *http.Request) util.JSONResponse {
var request api.InputLeaveRequest
var response api.InputLeaveResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := f.InputLeaveRequest(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -0,0 +1,25 @@
package query
import (
"context"
"github.com/matrix-org/dendrite/federationsender/api"
)
// InputJoinRequest implements api.FederationSenderQueryAPI
func (r *FederationSenderQueryAPI) InputJoinRequest(
ctx context.Context,
request *api.InputJoinRequest,
response *api.InputJoinResponse,
) (err error) {
return nil
}
// InputLeaveRequest implements api.FederationSenderQueryAPI
func (r *FederationSenderQueryAPI) InputLeaveRequest(
ctx context.Context,
request *api.InputLeaveRequest,
response *api.InputLeaveResponse,
) (err error) {
return nil
}

View file

@ -2,28 +2,11 @@ package query
import ( import (
"context" "context"
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
) )
// FederationSenderQueryDatabase has the APIs needed to implement the query API.
type FederationSenderQueryDatabase interface {
GetJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error)
}
// FederationSenderQueryAPI is an implementation of api.FederationSenderQueryAPI
type FederationSenderQueryAPI struct {
DB FederationSenderQueryDatabase
}
// QueryJoinedHostsInRoom implements api.FederationSenderQueryAPI // QueryJoinedHostsInRoom implements api.FederationSenderQueryAPI
func (f *FederationSenderQueryAPI) QueryJoinedHostsInRoom( func (f *FederationSenderQueryAPI) QueryJoinedHostsInRoom(
ctx context.Context, ctx context.Context,
@ -54,35 +37,3 @@ func (f *FederationSenderQueryAPI) QueryJoinedHostServerNamesInRoom(
return return
} }
// SetupHTTP adds the FederationSenderQueryAPI handlers to the http.ServeMux.
func (f *FederationSenderQueryAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(
api.FederationSenderQueryJoinedHostsInRoomPath,
common.MakeInternalAPI("QueryJoinedHostsInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostsInRoomRequest
var response api.QueryJoinedHostsInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostsInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(
api.FederationSenderQueryJoinedHostServerNamesInRoomPath,
common.MakeInternalAPI("QueryJoinedHostServerNamesInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostServerNamesInRoomRequest
var response api.QueryJoinedHostServerNamesInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostServerNamesInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}