Use a conventional JSON POST API rather than go net/rpc

net/rpc doesn't automatically handle reconnecting and we have better
logging and metrics infrastructure for monitoring HTTP apis.
This commit is contained in:
Mark Haines 2017-03-03 15:04:28 +00:00
parent e5d25d81ae
commit 0448ffc891
8 changed files with 96 additions and 42 deletions

View file

@ -1,7 +1,11 @@
package api package api
import ( import (
"bytes"
"encoding/json"
"fmt"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"net/http"
) )
// StateKeyTuple is a pair of an event type and state_key. // StateKeyTuple is a pair of an event type and state_key.
@ -38,34 +42,53 @@ type RoomserverQueryAPI interface {
) error ) error
} }
// RPCServer is used to register a roomserver implementation with an RPC server. // RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API.
type RPCServer interface { const RoomserverQueryLatestEventsAndStatePath = "/api/Roomserver/QueryLatestEventsAndState"
RegisterName(name string, rcvr interface{}) error
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
// If httpClient is nil then it uses the http.DefaultClient
func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI {
if httpClient == nil {
httpClient = http.DefaultClient
}
return &httpRoomserverQueryAPI{roomserverURL, *httpClient}
} }
// RegisterRoomserverQueryAPI registers a RoomserverQueryAPI implementation with an RPC server. type httpRoomserverQueryAPI struct {
func RegisterRoomserverQueryAPI(rpcServer RPCServer, roomserver RoomserverQueryAPI) error { roomserverURL string
return rpcServer.RegisterName("Roomserver", roomserver) httpClient http.Client
}
// RPCClient is used to invoke roomserver APIs on a remote server.
type RPCClient interface {
Call(serviceMethod string, args interface{}, reply interface{}) error
}
// NewRoomserverQueryAPIFromClient creates a new query API from an RPC client.
func NewRoomserverQueryAPIFromClient(client RPCClient) RoomserverQueryAPI {
return &remoteRoomserver{client}
}
type remoteRoomserver struct {
client RPCClient
} }
// QueryLatestEventsAndState implements RoomserverQueryAPI // QueryLatestEventsAndState implements RoomserverQueryAPI
func (r *remoteRoomserver) QueryLatestEventsAndState( func (h *httpRoomserverQueryAPI) QueryLatestEventsAndState(
request *QueryLatestEventsAndStateRequest, request *QueryLatestEventsAndStateRequest,
response *QueryLatestEventsAndStateResponse, response *QueryLatestEventsAndStateResponse,
) error { ) error {
return r.client.Call("Roomserver.QueryLatestEventsAndState", request, response) apiURL := h.roomserverURL + RoomserverQueryLatestEventsAndStatePath
return postJSON(h.httpClient, apiURL, request, response)
}
func postJSON(httpClient http.Client, apiURL string, request, response interface{}) error {
jsonBytes, err := json.Marshal(request)
if err != nil {
return err
}
res, err := httpClient.Post(apiURL, "application/json", bytes.NewReader(jsonBytes))
if res != nil {
defer res.Body.Close()
}
if err != nil {
return err
}
if res.StatusCode != 200 {
var errorBody struct {
Message string `json:"message"`
}
if err = json.NewDecoder(res.Body).Decode(&errorBody); err != nil {
return err
} else {
return fmt.Errorf("api: %d: %s", res.StatusCode, errorBody.Message)
}
}
return json.NewDecoder(res.Body).Decode(response)
} }

View file

@ -1,8 +1,12 @@
package query package query
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"net/http"
) )
// RoomserverQueryAPI is an implemenation of RoomserverQueryAPI // RoomserverQueryAPI is an implemenation of RoomserverQueryAPI
@ -16,3 +20,24 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
) error { ) error {
return fmt.Errorf("Not Implemented") return fmt.Errorf("Not Implemented")
} }
func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(
api.RoomserverQueryLatestEventsAndStatePath,
makeAPI("query_latest_events_and_state", func(req *http.Request) util.JSONResponse {
var request api.QueryLatestEventsAndStateRequest
var response api.QueryLatestEventsAndStateResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := r.QueryLatestEventsAndState(&request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: 200, JSON: &response}
}),
)
}
func makeAPI(metric string, apiFunc func(req *http.Request) util.JSONResponse) http.Handler {
return prometheus.InstrumentHandler(metric, util.MakeJSONAPI(util.NewJSONRequestHandler(apiFunc)))
}

View file

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"net/rpc"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@ -210,12 +209,8 @@ func testRoomServer(input []string, wantOutput []string) {
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
gotOutput, err := runAndReadFromTopic(cmd, outputTopic, 1, func() { gotOutput, err := runAndReadFromTopic(cmd, outputTopic, 1, func() {
client, err := rpc.DialHTTP("tcp", roomserverAddr) queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+roomserverAddr, nil)
if err != nil { if err := queryAPI.QueryLatestEventsAndState(
panic(err)
}
queryAPI := api.NewRoomserverQueryAPIFromClient(client)
if err = queryAPI.QueryLatestEventsAndState(
&api.QueryLatestEventsAndStateRequest{}, &api.QueryLatestEventsAndStateRequest{},
&api.QueryLatestEventsAndStateResponse{}, &api.QueryLatestEventsAndStateResponse{},
); err != nil { ); err != nil {

View file

@ -2,13 +2,11 @@ package main
import ( import (
"fmt" "fmt"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/input" "github.com/matrix-org/dendrite/roomserver/input"
"github.com/matrix-org/dendrite/roomserver/query" "github.com/matrix-org/dendrite/roomserver/query"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
"net/http" "net/http"
"net/rpc"
"os" "os"
"strings" "strings"
) )
@ -51,11 +49,7 @@ func main() {
queryAPI := query.RoomserverQueryAPI{} queryAPI := query.RoomserverQueryAPI{}
if err = api.RegisterRoomserverQueryAPI(rpc.DefaultServer, &queryAPI); err != nil { queryAPI.SetupHTTP(http.DefaultServeMux)
panic(err)
}
rpc.HandleHTTP()
fmt.Println("Started roomserver") fmt.Println("Started roomserver")

2
vendor/manifest vendored
View file

@ -98,7 +98,7 @@
{ {
"importpath": "github.com/matrix-org/util", "importpath": "github.com/matrix-org/util",
"repository": "https://github.com/matrix-org/util", "repository": "https://github.com/matrix-org/util",
"revision": "ccef6dc7c24a7c896d96b433a9107b7c47ecf828", "revision": "28bd7491c8aafbf346ca23821664f0f9911ef52b",
"branch": "master" "branch": "master"
}, },
{ {

View file

@ -25,11 +25,13 @@ func GetRequestID(ctx context.Context) string {
// ctxValueLogger is the key to extract the logrus Logger. // ctxValueLogger is the key to extract the logrus Logger.
const ctxValueLogger = contextKeys("logger") const ctxValueLogger = contextKeys("logger")
// GetLogger retrieves the logrus logger from the supplied context. Returns nil if there is no logger. // GetLogger retrieves the logrus logger from the supplied context. Always returns a logger,
// even if there wasn't one originally supplied.
func GetLogger(ctx context.Context) *log.Entry { func GetLogger(ctx context.Context) *log.Entry {
l := ctx.Value(ctxValueLogger) l := ctx.Value(ctxValueLogger)
if l == nil { if l == nil {
return nil // Always return a logger so callers don't need to constantly nil check.
return log.WithField("context", "missing")
} }
return l.(*log.Entry) return l.(*log.Entry)
} }

View file

@ -58,6 +58,21 @@ type JSONRequestHandler interface {
OnIncomingRequest(req *http.Request) JSONResponse OnIncomingRequest(req *http.Request) JSONResponse
} }
// jsonRequestHandlerWrapper is a wrapper to allow in-line functions to conform to util.JSONRequestHandler
type jsonRequestHandlerWrapper struct {
function func(req *http.Request) JSONResponse
}
// OnIncomingRequest implements util.JSONRequestHandler
func (r *jsonRequestHandlerWrapper) OnIncomingRequest(req *http.Request) JSONResponse {
return r.function(req)
}
// NewJSONRequestHandler converts the given OnIncomingRequest function into a JSONRequestHandler
func NewJSONRequestHandler(f func(req *http.Request) JSONResponse) JSONRequestHandler {
return &jsonRequestHandlerWrapper{f}
}
// Protect panicking HTTP requests from taking down the entire process, and log them using // Protect panicking HTTP requests from taking down the entire process, and log them using
// the correct logger, returning a 500 with a JSON response rather than abruptly closing the // the correct logger, returning a 500 with a JSON response rather than abruptly closing the
// connection. The http.Request MUST have a ctxValueLogger. // connection. The http.Request MUST have a ctxValueLogger.

View file

@ -164,8 +164,8 @@ func TestGetLogger(t *testing.T) {
noLoggerInReq, _ := http.NewRequest("GET", "http://example.com/foo", nil) noLoggerInReq, _ := http.NewRequest("GET", "http://example.com/foo", nil)
ctxLogger = GetLogger(noLoggerInReq.Context()) ctxLogger = GetLogger(noLoggerInReq.Context())
if ctxLogger != nil { if ctxLogger == nil {
t.Errorf("TestGetLogger wanted nil logger, got '%v'", ctxLogger) t.Errorf("TestGetLogger wanted logger, got nil")
} }
} }