Start implementing a query API for go using net/rpc

This commit is contained in:
Mark Haines 2017-03-02 19:14:34 +00:00
parent 37e0b6c4c6
commit e5d25d81ae
4 changed files with 132 additions and 4 deletions

View file

@ -0,0 +1,71 @@
package api
import (
"github.com/matrix-org/gomatrixserverlib"
)
// StateKeyTuple is a pair of an event type and state_key.
type StateKeyTuple struct {
EventType string
EventStateKey string
}
// QueryLatestEventsAndStateRequest is a request to QueryLatestEventsAndState
type QueryLatestEventsAndStateRequest struct {
// The roomID to query the latest events for.
RoomID string
// The state key tuples to fetch from the room current state.
// If this list is empty or nil then no events are returned.
StateToFetch []StateKeyTuple
}
// QueryLatestEventsAndStateResponse is a response to QueryLatestEventsAndState
type QueryLatestEventsAndStateResponse struct {
// Copy of the request for debugging.
QueryLatestEventsAndStateRequest
// The latest events in the room.
LatestEvents gomatrixserverlib.EventReference
// The state events requested.
StateEvents []gomatrixserverlib.Event
}
// RoomserverQueryAPI is used to query information from the room server.
type RoomserverQueryAPI interface {
// Query the latest events and state for a room from the room server.
QueryLatestEventsAndState(
request *QueryLatestEventsAndStateRequest,
response *QueryLatestEventsAndStateResponse,
) error
}
// RPCServer is used to register a roomserver implementation with an RPC server.
type RPCServer interface {
RegisterName(name string, rcvr interface{}) error
}
// RegisterRoomserverQueryAPI registers a RoomserverQueryAPI implementation with an RPC server.
func RegisterRoomserverQueryAPI(rpcServer RPCServer, roomserver RoomserverQueryAPI) error {
return rpcServer.RegisterName("Roomserver", roomserver)
}
// RPCClient is used to invoke roomserver APIs on a remote server.
type RPCClient interface {
Call(serviceMethod string, args interface{}, reply interface{}) error
}
// NewRoomserverQueryAPIFromClient creates a new query API from an RPC client.
func NewRoomserverQueryAPIFromClient(client RPCClient) RoomserverQueryAPI {
return &remoteRoomserver{client}
}
type remoteRoomserver struct {
client RPCClient
}
// QueryLatestEventsAndState implements RoomserverQueryAPI
func (r *remoteRoomserver) QueryLatestEventsAndState(
request *QueryLatestEventsAndStateRequest,
response *QueryLatestEventsAndStateResponse,
) error {
return r.client.Call("Roomserver.QueryLatestEventsAndState", request, response)
}

View file

@ -0,0 +1,18 @@
package query
import (
"fmt"
"github.com/matrix-org/dendrite/roomserver/api"
)
// RoomserverQueryAPI is an implemenation of RoomserverQueryAPI
type RoomserverQueryAPI struct {
}
// QueryLatestEventsAndState implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
request *api.QueryLatestEventsAndStateRequest,
response *api.QueryLatestEventsAndStateResponse,
) error {
return fmt.Errorf("Not Implemented")
}

View file

@ -2,7 +2,9 @@ package main
import (
"fmt"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"net/rpc"
"os"
"os/exec"
"path/filepath"
@ -17,6 +19,8 @@ var (
zookeeperURI = defaulting(os.Getenv("ZOOKEEPER_URI"), "localhost:2181")
// The URI the kafka server is listening on.
kafkaURI = defaulting(os.Getenv("KAFKA_URIS"), "localhost:9092")
// The address the roomserver should listen on.
roomserverAddr = defaulting(os.Getenv("ROOMSERVER_URI"), "localhost:9876")
// How long to wait for the roomserver to write the expected output messages.
timeoutString = defaulting(os.Getenv("TIMEOUT"), "10s")
// The name of maintenance database to connect to in order to create the test database.
@ -91,7 +95,7 @@ func writeToTopic(topic string, data []string) error {
// messages is reached or after a timeout. It kills the command before it returns.
// It returns a list of the messages read from the command on success or an error
// on failure.
func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int) ([]string, error) {
func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAPI func()) ([]string, error) {
type result struct {
// data holds all of stdout on success.
data []byte
@ -112,6 +116,16 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int) ([]string, e
go func() {
// Read all of stdout.
data, err := readCmd.Output()
defer func() {
if err := recover(); err != nil {
if errv, ok := err.(error); ok {
done <- result{data, errv}
} else {
panic(err)
}
}
}()
checkQueryAPI()
done <- result{data, err}
}()
go func() {
@ -191,10 +205,23 @@ func testRoomServer(input []string, wantOutput []string) {
fmt.Sprintf("KAFKA_URIS=%s", kafkaURI),
fmt.Sprintf("TOPIC_INPUT_ROOM_EVENT=%s", inputTopic),
fmt.Sprintf("TOPIC_OUTPUT_ROOM_EVENT=%s", outputTopic),
fmt.Sprintf("BIND_ADDRESS=%s", roomserverAddr),
)
cmd.Stderr = os.Stderr
gotOutput, err := runAndReadFromTopic(cmd, outputTopic, 1)
gotOutput, err := runAndReadFromTopic(cmd, outputTopic, 1, func() {
client, err := rpc.DialHTTP("tcp", roomserverAddr)
if err != nil {
panic(err)
}
queryAPI := api.NewRoomserverQueryAPIFromClient(client)
if err = queryAPI.QueryLatestEventsAndState(
&api.QueryLatestEventsAndStateRequest{},
&api.QueryLatestEventsAndStateResponse{},
); err != nil {
panic(err)
}
})
if err != nil {
panic(err)
}

View file

@ -2,9 +2,13 @@ package main
import (
"fmt"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/input"
"github.com/matrix-org/dendrite/roomserver/query"
"github.com/matrix-org/dendrite/roomserver/storage"
sarama "gopkg.in/Shopify/sarama.v1"
"net/http"
"net/rpc"
"os"
"strings"
)
@ -14,6 +18,7 @@ var (
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
inputRoomEventTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT")
outputRoomEventTopic = os.Getenv("TOPIC_OUTPUT_ROOM_EVENT")
bindAddr = os.Getenv("BIND_ADDRESS")
)
func main() {
@ -44,9 +49,16 @@ func main() {
panic(err)
}
queryAPI := query.RoomserverQueryAPI{}
if err = api.RegisterRoomserverQueryAPI(rpc.DefaultServer, &queryAPI); err != nil {
panic(err)
}
rpc.HandleHTTP()
fmt.Println("Started roomserver")
// Wait forever.
// TODO: Implement clean shutdown.
select {}
http.ListenAndServe(bindAddr, nil)
}