Implement the query API and add it to the integration tests

This commit is contained in:
Mark Haines 2017-03-03 16:47:46 +00:00
parent 0448ffc891
commit 505055a9ca
7 changed files with 128 additions and 20 deletions

View file

@ -27,8 +27,11 @@ type QueryLatestEventsAndStateRequest struct {
type QueryLatestEventsAndStateResponse struct { type QueryLatestEventsAndStateResponse struct {
// Copy of the request for debugging. // Copy of the request for debugging.
QueryLatestEventsAndStateRequest QueryLatestEventsAndStateRequest
// Does the room exist?
// If the room doesn't exist this will be false and LatestEvents will be empty.
RoomExists bool
// The latest events in the room. // The latest events in the room.
LatestEvents gomatrixserverlib.EventReference LatestEvents []gomatrixserverlib.EventReference
// The state events requested. // The state events requested.
StateEvents []gomatrixserverlib.Event StateEvents []gomatrixserverlib.Event
} }
@ -86,9 +89,8 @@ func postJSON(httpClient http.Client, apiURL string, request, response interface
} }
if err = json.NewDecoder(res.Body).Decode(&errorBody); err != nil { if err = json.NewDecoder(res.Body).Decode(&errorBody); err != nil {
return err return err
} else {
return fmt.Errorf("api: %d: %s", res.StatusCode, errorBody.Message)
} }
return fmt.Errorf("api: %d: %s", res.StatusCode, errorBody.Message)
} }
return json.NewDecoder(res.Body).Decode(response) return json.NewDecoder(res.Body).Decode(response)
} }

View file

@ -2,25 +2,50 @@ package query
import ( import (
"encoding/json" "encoding/json"
"fmt"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"net/http" "net/http"
) )
// RoomserverQueryAPIDatabase has the storage APIs needed to implement the query API.
type RoomserverQueryAPIDatabase interface {
// Lookup the numeric ID for the room.
// Returns 0 if the room doesn't exists.
// Returns an error if there was a problem talking to the database.
RoomNID(roomID string) (types.RoomNID, error)
// Lookup event references for the latest events in the room.
// Returns an error if there was a problem talking to the database.
LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, error)
}
// RoomserverQueryAPI is an implemenation of RoomserverQueryAPI // RoomserverQueryAPI is an implemenation of RoomserverQueryAPI
type RoomserverQueryAPI struct { type RoomserverQueryAPI struct {
DB RoomserverQueryAPIDatabase
} }
// QueryLatestEventsAndState implements api.RoomserverQueryAPI // QueryLatestEventsAndState implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryLatestEventsAndState( func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
request *api.QueryLatestEventsAndStateRequest, request *api.QueryLatestEventsAndStateRequest,
response *api.QueryLatestEventsAndStateResponse, response *api.QueryLatestEventsAndStateResponse,
) error { ) (err error) {
return fmt.Errorf("Not Implemented") response.QueryLatestEventsAndStateRequest = *request
roomNID, err := r.DB.RoomNID(request.RoomID)
if err != nil {
return err
}
if roomNID == 0 {
return nil
}
response.RoomExists = true
response.LatestEvents, err = r.DB.LatestEventIDs(roomNID)
// TODO: look up the current state.
return err
} }
// SetupHTTP adds the RoomserverQueryAPI handlers to the http.ServeMux.
func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle( servMux.Handle(
api.RoomserverQueryLatestEventsAndStatePath, api.RoomserverQueryLatestEventsAndStatePath,

View file

@ -170,7 +170,7 @@ func deleteTopic(topic string) error {
return cmd.Run() return cmd.Run()
} }
func testRoomServer(input []string, wantOutput []string) { func testRoomServer(input []string, wantOutput []string, checkQueries func(api.RoomserverQueryAPI)) {
const ( const (
inputTopic = "roomserverInput" inputTopic = "roomserverInput"
outputTopic = "roomserverOutput" outputTopic = "roomserverOutput"
@ -210,12 +210,7 @@ func testRoomServer(input []string, wantOutput []string) {
gotOutput, err := runAndReadFromTopic(cmd, outputTopic, 1, func() { gotOutput, err := runAndReadFromTopic(cmd, outputTopic, 1, func() {
queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+roomserverAddr, nil) queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+roomserverAddr, nil)
if err := queryAPI.QueryLatestEventsAndState( checkQueries(queryAPI)
&api.QueryLatestEventsAndStateRequest{},
&api.QueryLatestEventsAndStateResponse{},
); err != nil {
panic(err)
}
}) })
if err != nil { if err != nil {
panic(err) panic(err)
@ -356,7 +351,21 @@ func main() {
}`, }`,
} }
testRoomServer(input, want) testRoomServer(input, want, func(q api.RoomserverQueryAPI) {
var response api.QueryLatestEventsAndStateResponse
if err := q.QueryLatestEventsAndState(
&api.QueryLatestEventsAndStateRequest{RoomID: "!HCXfdvrfksxuYnIFiJ:matrix.org"},
&response,
); err != nil {
panic(err)
}
if !response.RoomExists {
panic(fmt.Errorf("Wanted room \"!HCXfdvrfksxuYnIFiJ:matrix.org\" to exist"))
}
if len(response.LatestEvents) != 1 || response.LatestEvents[0].EventID != "$1463671339126270PnVwC:matrix.org" {
panic(fmt.Errorf("Wanted \"$1463671339126270PnVwC:matrix.org\" to be the latest event got %#v", response.LatestEvents))
}
})
fmt.Println("==PASSED==", os.Args[0]) fmt.Println("==PASSED==", os.Args[0])
} }

View file

@ -47,7 +47,9 @@ func main() {
panic(err) panic(err)
} }
queryAPI := query.RoomserverQueryAPI{} queryAPI := query.RoomserverQueryAPI{
DB: db,
}
queryAPI.SetupHTTP(http.DefaultServeMux) queryAPI.SetupHTTP(http.DefaultServeMux)

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
) )
const eventsSchema = ` const eventsSchema = `
@ -83,6 +84,9 @@ const bulkSelectStateAtEventAndReferenceSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" + "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" +
" FROM events WHERE event_nid = ANY($1)" " FROM events WHERE event_nid = ANY($1)"
const bulkSelectEventReferenceSQL = "" +
"SELECT event_id, reference_sha256 FROM events WHERE event_nid = ANY($1)"
type eventStatements struct { type eventStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt selectEventStmt *sql.Stmt
@ -93,6 +97,7 @@ type eventStatements struct {
updateEventSentToOutputStmt *sql.Stmt updateEventSentToOutputStmt *sql.Stmt
selectEventIDStmt *sql.Stmt selectEventIDStmt *sql.Stmt
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
bulkSelectEventReferenceStmt *sql.Stmt
} }
func (s *eventStatements) prepare(db *sql.DB) (err error) { func (s *eventStatements) prepare(db *sql.DB) (err error) {
@ -127,6 +132,9 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) {
if s.bulkSelectStateAtEventAndReferenceStmt, err = db.Prepare(bulkSelectStateAtEventAndReferenceSQL); err != nil { if s.bulkSelectStateAtEventAndReferenceStmt, err = db.Prepare(bulkSelectStateAtEventAndReferenceSQL); err != nil {
return return
} }
if s.bulkSelectEventReferenceStmt, err = db.Prepare(bulkSelectEventReferenceSQL); err != nil {
return
}
return return
} }
@ -276,3 +284,27 @@ func (s *eventStatements) bulkSelectStateAtEventAndReference(txn *sql.Tx, eventN
} }
return results, nil return results, nil
} }
func (s *eventStatements) bulkSelectEventReference(eventNIDs []types.EventNID) ([]gomatrixserverlib.EventReference, error) {
nids := make([]int64, len(eventNIDs))
for i := range eventNIDs {
nids[i] = int64(eventNIDs[i])
}
rows, err := s.bulkSelectEventReferenceStmt.Query(pq.Int64Array(nids))
if err != nil {
return nil, err
}
defer rows.Close()
results := make([]gomatrixserverlib.EventReference, len(eventNIDs))
i := 0
for ; rows.Next(); i++ {
result := &results[i]
if err = rows.Scan(&result.EventID, &result.EventSHA256); err != nil {
return nil, err
}
}
if i != len(eventNIDs) {
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
}
return results, nil
}

View file

@ -32,6 +32,9 @@ const selectRoomNIDSQL = "" +
"SELECT room_nid FROM rooms WHERE room_id = $1" "SELECT room_nid FROM rooms WHERE room_id = $1"
const selectLatestEventNIDsSQL = "" + const selectLatestEventNIDsSQL = "" +
"SELECT latest_event_nids FROM rooms WHERE room_nid = $1"
const selectLatestEventNIDsForUpdateSQL = "" +
"SELECT latest_event_nids, last_event_sent_nid FROM rooms WHERE room_nid = $1 FOR UPDATE" "SELECT latest_event_nids, last_event_sent_nid FROM rooms WHERE room_nid = $1 FOR UPDATE"
const updateLatestEventNIDsSQL = "" + const updateLatestEventNIDsSQL = "" +
@ -41,6 +44,7 @@ type roomStatements struct {
insertRoomNIDStmt *sql.Stmt insertRoomNIDStmt *sql.Stmt
selectRoomNIDStmt *sql.Stmt selectRoomNIDStmt *sql.Stmt
selectLatestEventNIDsStmt *sql.Stmt selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt updateLatestEventNIDsStmt *sql.Stmt
} }
@ -58,6 +62,9 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) {
if s.selectLatestEventNIDsStmt, err = db.Prepare(selectLatestEventNIDsSQL); err != nil { if s.selectLatestEventNIDsStmt, err = db.Prepare(selectLatestEventNIDsSQL); err != nil {
return return
} }
if s.selectLatestEventNIDsForUpdateStmt, err = db.Prepare(selectLatestEventNIDsForUpdateSQL); err != nil {
return
}
if s.updateLatestEventNIDsStmt, err = db.Prepare(updateLatestEventNIDsSQL); err != nil { if s.updateLatestEventNIDsStmt, err = db.Prepare(updateLatestEventNIDsSQL); err != nil {
return return
} }
@ -76,10 +83,23 @@ func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) {
return types.RoomNID(roomNID), err return types.RoomNID(roomNID), err
} }
func (s *roomStatements) selectLatestEventNIDs(roomNID types.RoomNID) ([]types.EventNID, error) {
var nids pq.Int64Array
err := s.selectLatestEventNIDsStmt.QueryRow(int64(roomNID)).Scan(&nids)
if err != nil {
return nil, err
}
eventNIDs := make([]types.EventNID, len(nids))
for i := range nids {
eventNIDs[i] = types.EventNID(nids[i])
}
return eventNIDs, nil
}
func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, error) { func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, error) {
var nids pq.Int64Array var nids pq.Int64Array
var lastEventSentNID int64 var lastEventSentNID int64
err := txn.Stmt(s.selectLatestEventNIDsStmt).QueryRow(int64(roomNID)).Scan(&nids, &lastEventSentNID) err := txn.Stmt(s.selectLatestEventNIDsForUpdateStmt).QueryRow(int64(roomNID)).Scan(&nids, &lastEventSentNID)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }

View file

@ -280,3 +280,21 @@ func (u *roomRecentEventsUpdater) Commit() error {
func (u *roomRecentEventsUpdater) Rollback() error { func (u *roomRecentEventsUpdater) Rollback() error {
return u.txn.Rollback() return u.txn.Rollback()
} }
// RoomNID implements query.RoomserverQueryAPIDB
func (d *Database) RoomNID(roomID string) (types.RoomNID, error) {
roomNID, err := d.statements.selectRoomNID(roomID)
if err == sql.ErrNoRows {
return 0, nil
}
return roomNID, err
}
// LatestEventIDs implements query.RoomserverQueryAPIDB
func (d *Database) LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, error) {
eventNIDs, err := d.statements.selectLatestEventNIDs(roomNID)
if err != nil {
return nil, err
}
return d.statements.bulkSelectEventReference(eventNIDs)
}