From 0448ffc891499b2c5db60e7faa3a5388aecab99e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Mar 2017 15:04:28 +0000 Subject: [PATCH] Use a conventional JSON POST API rather than go net/rpc net/rpc doesn't automatically handle reconnecting and we have better logging and metrics infrastructure for monitoring HTTP apis. --- .../dendrite/roomserver/api/query.go | 67 +++++++++++++------ .../dendrite/roomserver/query/query.go | 25 +++++++ .../roomserver-integration-tests/main.go | 9 +-- .../roomserver/roomserver/roomserver.go | 8 +-- vendor/manifest | 4 +- .../src/github.com/matrix-org/util/context.go | 6 +- vendor/src/github.com/matrix-org/util/json.go | 15 +++++ .../github.com/matrix-org/util/json_test.go | 4 +- 8 files changed, 96 insertions(+), 42 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/query.go b/src/github.com/matrix-org/dendrite/roomserver/api/query.go index 157674df4..f8314dd0b 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/query.go @@ -1,7 +1,11 @@ package api import ( + "bytes" + "encoding/json" + "fmt" "github.com/matrix-org/gomatrixserverlib" + "net/http" ) // StateKeyTuple is a pair of an event type and state_key. @@ -38,34 +42,53 @@ type RoomserverQueryAPI interface { ) error } -// RPCServer is used to register a roomserver implementation with an RPC server. -type RPCServer interface { - RegisterName(name string, rcvr interface{}) error +// RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API. +const RoomserverQueryLatestEventsAndStatePath = "/api/Roomserver/QueryLatestEventsAndState" + +// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. +// If httpClient is nil then it uses the http.DefaultClient +func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI { + if httpClient == nil { + httpClient = http.DefaultClient + } + return &httpRoomserverQueryAPI{roomserverURL, *httpClient} } -// RegisterRoomserverQueryAPI registers a RoomserverQueryAPI implementation with an RPC server. -func RegisterRoomserverQueryAPI(rpcServer RPCServer, roomserver RoomserverQueryAPI) error { - return rpcServer.RegisterName("Roomserver", roomserver) -} - -// RPCClient is used to invoke roomserver APIs on a remote server. -type RPCClient interface { - Call(serviceMethod string, args interface{}, reply interface{}) error -} - -// NewRoomserverQueryAPIFromClient creates a new query API from an RPC client. -func NewRoomserverQueryAPIFromClient(client RPCClient) RoomserverQueryAPI { - return &remoteRoomserver{client} -} - -type remoteRoomserver struct { - client RPCClient +type httpRoomserverQueryAPI struct { + roomserverURL string + httpClient http.Client } // QueryLatestEventsAndState implements RoomserverQueryAPI -func (r *remoteRoomserver) QueryLatestEventsAndState( +func (h *httpRoomserverQueryAPI) QueryLatestEventsAndState( request *QueryLatestEventsAndStateRequest, response *QueryLatestEventsAndStateResponse, ) error { - return r.client.Call("Roomserver.QueryLatestEventsAndState", request, response) + apiURL := h.roomserverURL + RoomserverQueryLatestEventsAndStatePath + return postJSON(h.httpClient, apiURL, request, response) +} + +func postJSON(httpClient http.Client, apiURL string, request, response interface{}) error { + jsonBytes, err := json.Marshal(request) + if err != nil { + return err + } + res, err := httpClient.Post(apiURL, "application/json", bytes.NewReader(jsonBytes)) + if res != nil { + defer res.Body.Close() + } + if err != nil { + return err + } + if res.StatusCode != 200 { + var errorBody struct { + Message string `json:"message"` + } + if err = json.NewDecoder(res.Body).Decode(&errorBody); err != nil { + return err + } else { + return fmt.Errorf("api: %d: %s", res.StatusCode, errorBody.Message) + } + } + return json.NewDecoder(res.Body).Decode(response) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index db473bdfc..69924ab39 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -1,8 +1,12 @@ package query import ( + "encoding/json" "fmt" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/util" + "github.com/prometheus/client_golang/prometheus" + "net/http" ) // RoomserverQueryAPI is an implemenation of RoomserverQueryAPI @@ -16,3 +20,24 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState( ) error { return fmt.Errorf("Not Implemented") } + +func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { + servMux.Handle( + api.RoomserverQueryLatestEventsAndStatePath, + makeAPI("query_latest_events_and_state", func(req *http.Request) util.JSONResponse { + var request api.QueryLatestEventsAndStateRequest + var response api.QueryLatestEventsAndStateResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := r.QueryLatestEventsAndState(&request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: 200, JSON: &response} + }), + ) +} + +func makeAPI(metric string, apiFunc func(req *http.Request) util.JSONResponse) http.Handler { + return prometheus.InstrumentHandler(metric, util.MakeJSONAPI(util.NewJSONRequestHandler(apiFunc))) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests/main.go index fb9c9b0af..9094ef57f 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests/main.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" - "net/rpc" "os" "os/exec" "path/filepath" @@ -210,12 +209,8 @@ func testRoomServer(input []string, wantOutput []string) { cmd.Stderr = os.Stderr gotOutput, err := runAndReadFromTopic(cmd, outputTopic, 1, func() { - client, err := rpc.DialHTTP("tcp", roomserverAddr) - if err != nil { - panic(err) - } - queryAPI := api.NewRoomserverQueryAPIFromClient(client) - if err = queryAPI.QueryLatestEventsAndState( + queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+roomserverAddr, nil) + if err := queryAPI.QueryLatestEventsAndState( &api.QueryLatestEventsAndStateRequest{}, &api.QueryLatestEventsAndStateResponse{}, ); err != nil { diff --git a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go index c9a418707..7cf2074f1 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go +++ b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go @@ -2,13 +2,11 @@ package main import ( "fmt" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/input" "github.com/matrix-org/dendrite/roomserver/query" "github.com/matrix-org/dendrite/roomserver/storage" sarama "gopkg.in/Shopify/sarama.v1" "net/http" - "net/rpc" "os" "strings" ) @@ -51,11 +49,7 @@ func main() { queryAPI := query.RoomserverQueryAPI{} - if err = api.RegisterRoomserverQueryAPI(rpc.DefaultServer, &queryAPI); err != nil { - panic(err) - } - - rpc.HandleHTTP() + queryAPI.SetupHTTP(http.DefaultServeMux) fmt.Println("Started roomserver") diff --git a/vendor/manifest b/vendor/manifest index 79bac494c..35065c88a 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -98,7 +98,7 @@ { "importpath": "github.com/matrix-org/util", "repository": "https://github.com/matrix-org/util", - "revision": "ccef6dc7c24a7c896d96b433a9107b7c47ecf828", + "revision": "28bd7491c8aafbf346ca23821664f0f9911ef52b", "branch": "master" }, { @@ -206,4 +206,4 @@ "branch": "master" } ] -} +} \ No newline at end of file diff --git a/vendor/src/github.com/matrix-org/util/context.go b/vendor/src/github.com/matrix-org/util/context.go index d8def4f9b..f2477a56a 100644 --- a/vendor/src/github.com/matrix-org/util/context.go +++ b/vendor/src/github.com/matrix-org/util/context.go @@ -25,11 +25,13 @@ func GetRequestID(ctx context.Context) string { // ctxValueLogger is the key to extract the logrus Logger. const ctxValueLogger = contextKeys("logger") -// GetLogger retrieves the logrus logger from the supplied context. Returns nil if there is no logger. +// GetLogger retrieves the logrus logger from the supplied context. Always returns a logger, +// even if there wasn't one originally supplied. func GetLogger(ctx context.Context) *log.Entry { l := ctx.Value(ctxValueLogger) if l == nil { - return nil + // Always return a logger so callers don't need to constantly nil check. + return log.WithField("context", "missing") } return l.(*log.Entry) } diff --git a/vendor/src/github.com/matrix-org/util/json.go b/vendor/src/github.com/matrix-org/util/json.go index b0834eac7..46c5396f5 100644 --- a/vendor/src/github.com/matrix-org/util/json.go +++ b/vendor/src/github.com/matrix-org/util/json.go @@ -58,6 +58,21 @@ type JSONRequestHandler interface { OnIncomingRequest(req *http.Request) JSONResponse } +// jsonRequestHandlerWrapper is a wrapper to allow in-line functions to conform to util.JSONRequestHandler +type jsonRequestHandlerWrapper struct { + function func(req *http.Request) JSONResponse +} + +// OnIncomingRequest implements util.JSONRequestHandler +func (r *jsonRequestHandlerWrapper) OnIncomingRequest(req *http.Request) JSONResponse { + return r.function(req) +} + +// NewJSONRequestHandler converts the given OnIncomingRequest function into a JSONRequestHandler +func NewJSONRequestHandler(f func(req *http.Request) JSONResponse) JSONRequestHandler { + return &jsonRequestHandlerWrapper{f} +} + // Protect panicking HTTP requests from taking down the entire process, and log them using // the correct logger, returning a 500 with a JSON response rather than abruptly closing the // connection. The http.Request MUST have a ctxValueLogger. diff --git a/vendor/src/github.com/matrix-org/util/json_test.go b/vendor/src/github.com/matrix-org/util/json_test.go index 687db277f..3ce03a883 100644 --- a/vendor/src/github.com/matrix-org/util/json_test.go +++ b/vendor/src/github.com/matrix-org/util/json_test.go @@ -164,8 +164,8 @@ func TestGetLogger(t *testing.T) { noLoggerInReq, _ := http.NewRequest("GET", "http://example.com/foo", nil) ctxLogger = GetLogger(noLoggerInReq.Context()) - if ctxLogger != nil { - t.Errorf("TestGetLogger wanted nil logger, got '%v'", ctxLogger) + if ctxLogger == nil { + t.Errorf("TestGetLogger wanted logger, got nil") } }