diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/query.go b/src/github.com/matrix-org/dendrite/roomserver/api/query.go new file mode 100644 index 000000000..157674df4 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/api/query.go @@ -0,0 +1,71 @@ +package api + +import ( + "github.com/matrix-org/gomatrixserverlib" +) + +// StateKeyTuple is a pair of an event type and state_key. +type StateKeyTuple struct { + EventType string + EventStateKey string +} + +// QueryLatestEventsAndStateRequest is a request to QueryLatestEventsAndState +type QueryLatestEventsAndStateRequest struct { + // The roomID to query the latest events for. + RoomID string + // The state key tuples to fetch from the room current state. + // If this list is empty or nil then no events are returned. + StateToFetch []StateKeyTuple +} + +// QueryLatestEventsAndStateResponse is a response to QueryLatestEventsAndState +type QueryLatestEventsAndStateResponse struct { + // Copy of the request for debugging. + QueryLatestEventsAndStateRequest + // The latest events in the room. + LatestEvents gomatrixserverlib.EventReference + // The state events requested. + StateEvents []gomatrixserverlib.Event +} + +// RoomserverQueryAPI is used to query information from the room server. +type RoomserverQueryAPI interface { + // Query the latest events and state for a room from the room server. + QueryLatestEventsAndState( + request *QueryLatestEventsAndStateRequest, + response *QueryLatestEventsAndStateResponse, + ) error +} + +// RPCServer is used to register a roomserver implementation with an RPC server. +type RPCServer interface { + RegisterName(name string, rcvr interface{}) error +} + +// RegisterRoomserverQueryAPI registers a RoomserverQueryAPI implementation with an RPC server. +func RegisterRoomserverQueryAPI(rpcServer RPCServer, roomserver RoomserverQueryAPI) error { + return rpcServer.RegisterName("Roomserver", roomserver) +} + +// RPCClient is used to invoke roomserver APIs on a remote server. +type RPCClient interface { + Call(serviceMethod string, args interface{}, reply interface{}) error +} + +// NewRoomserverQueryAPIFromClient creates a new query API from an RPC client. +func NewRoomserverQueryAPIFromClient(client RPCClient) RoomserverQueryAPI { + return &remoteRoomserver{client} +} + +type remoteRoomserver struct { + client RPCClient +} + +// QueryLatestEventsAndState implements RoomserverQueryAPI +func (r *remoteRoomserver) QueryLatestEventsAndState( + request *QueryLatestEventsAndStateRequest, + response *QueryLatestEventsAndStateResponse, +) error { + return r.client.Call("Roomserver.QueryLatestEventsAndState", request, response) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go new file mode 100644 index 000000000..db473bdfc --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -0,0 +1,18 @@ +package query + +import ( + "fmt" + "github.com/matrix-org/dendrite/roomserver/api" +) + +// RoomserverQueryAPI is an implemenation of RoomserverQueryAPI +type RoomserverQueryAPI struct { +} + +// QueryLatestEventsAndState implements api.RoomserverQueryAPI +func (r *RoomserverQueryAPI) QueryLatestEventsAndState( + request *api.QueryLatestEventsAndStateRequest, + response *api.QueryLatestEventsAndStateResponse, +) error { + return fmt.Errorf("Not Implemented") +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests/main.go index 06f22a2a2..fb9c9b0af 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests/main.go @@ -2,7 +2,9 @@ package main import ( "fmt" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" + "net/rpc" "os" "os/exec" "path/filepath" @@ -17,6 +19,8 @@ var ( zookeeperURI = defaulting(os.Getenv("ZOOKEEPER_URI"), "localhost:2181") // The URI the kafka server is listening on. kafkaURI = defaulting(os.Getenv("KAFKA_URIS"), "localhost:9092") + // The address the roomserver should listen on. + roomserverAddr = defaulting(os.Getenv("ROOMSERVER_URI"), "localhost:9876") // How long to wait for the roomserver to write the expected output messages. timeoutString = defaulting(os.Getenv("TIMEOUT"), "10s") // The name of maintenance database to connect to in order to create the test database. @@ -91,7 +95,7 @@ func writeToTopic(topic string, data []string) error { // messages is reached or after a timeout. It kills the command before it returns. // It returns a list of the messages read from the command on success or an error // on failure. -func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int) ([]string, error) { +func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAPI func()) ([]string, error) { type result struct { // data holds all of stdout on success. data []byte @@ -112,6 +116,16 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int) ([]string, e go func() { // Read all of stdout. data, err := readCmd.Output() + defer func() { + if err := recover(); err != nil { + if errv, ok := err.(error); ok { + done <- result{data, errv} + } else { + panic(err) + } + } + }() + checkQueryAPI() done <- result{data, err} }() go func() { @@ -191,10 +205,23 @@ func testRoomServer(input []string, wantOutput []string) { fmt.Sprintf("KAFKA_URIS=%s", kafkaURI), fmt.Sprintf("TOPIC_INPUT_ROOM_EVENT=%s", inputTopic), fmt.Sprintf("TOPIC_OUTPUT_ROOM_EVENT=%s", outputTopic), + fmt.Sprintf("BIND_ADDRESS=%s", roomserverAddr), ) cmd.Stderr = os.Stderr - gotOutput, err := runAndReadFromTopic(cmd, outputTopic, 1) + gotOutput, err := runAndReadFromTopic(cmd, outputTopic, 1, func() { + client, err := rpc.DialHTTP("tcp", roomserverAddr) + if err != nil { + panic(err) + } + queryAPI := api.NewRoomserverQueryAPIFromClient(client) + if err = queryAPI.QueryLatestEventsAndState( + &api.QueryLatestEventsAndStateRequest{}, + &api.QueryLatestEventsAndStateResponse{}, + ); err != nil { + panic(err) + } + }) if err != nil { panic(err) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go index d2f126bf5..c9a418707 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go +++ b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go @@ -2,9 +2,13 @@ package main import ( "fmt" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/input" + "github.com/matrix-org/dendrite/roomserver/query" "github.com/matrix-org/dendrite/roomserver/storage" sarama "gopkg.in/Shopify/sarama.v1" + "net/http" + "net/rpc" "os" "strings" ) @@ -14,6 +18,7 @@ var ( kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",") inputRoomEventTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT") outputRoomEventTopic = os.Getenv("TOPIC_OUTPUT_ROOM_EVENT") + bindAddr = os.Getenv("BIND_ADDRESS") ) func main() { @@ -44,9 +49,16 @@ func main() { panic(err) } + queryAPI := query.RoomserverQueryAPI{} + + if err = api.RegisterRoomserverQueryAPI(rpc.DefaultServer, &queryAPI); err != nil { + panic(err) + } + + rpc.HandleHTTP() + fmt.Println("Started roomserver") - // Wait forever. // TODO: Implement clean shutdown. - select {} + http.ListenAndServe(bindAddr, nil) }