mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-13 01:43:09 -06:00
cmd/syncserver-integration-tests: Port to new common/test infra
This commit is contained in:
parent
e3b18f9138
commit
6650246ee8
|
|
@ -17,13 +17,10 @@ package main
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common/test"
|
"github.com/matrix-org/dendrite/common/test"
|
||||||
|
|
@ -33,23 +30,25 @@ import (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// Path to where kafka is installed.
|
// Path to where kafka is installed.
|
||||||
kafkaDir = defaulting(os.Getenv("KAFKA_DIR"), "kafka")
|
kafkaDir = test.Defaulting(os.Getenv("KAFKA_DIR"), "kafka")
|
||||||
// The URI the kafka zookeeper is listening on.
|
// The URI the kafka zookeeper is listening on.
|
||||||
zookeeperURI = defaulting(os.Getenv("ZOOKEEPER_URI"), "localhost:2181")
|
zookeeperURI = test.Defaulting(os.Getenv("ZOOKEEPER_URI"), "localhost:2181")
|
||||||
// The URI the kafka server is listening on.
|
// The URI the kafka server is listening on.
|
||||||
kafkaURI = defaulting(os.Getenv("KAFKA_URIS"), "localhost:9092")
|
kafkaURI = test.Defaulting(os.Getenv("KAFKA_URIS"), "localhost:9092")
|
||||||
// The address the syncserver should listen on.
|
// The address the syncserver should listen on.
|
||||||
syncserverAddr = defaulting(os.Getenv("SYNCSERVER_URI"), "localhost:9876")
|
syncserverAddr = test.Defaulting(os.Getenv("SYNCSERVER_URI"), "localhost:9876")
|
||||||
// How long to wait for the syncserver to write the expected output messages.
|
// How long to wait for the syncserver to write the expected output messages.
|
||||||
// This needs to be high enough to account for the time it takes to create
|
// This needs to be high enough to account for the time it takes to create
|
||||||
// the postgres database tables which can take a while on travis.
|
// the postgres database tables which can take a while on travis.
|
||||||
timeoutString = defaulting(os.Getenv("TIMEOUT"), "10s")
|
timeoutString = test.Defaulting(os.Getenv("TIMEOUT"), "10s")
|
||||||
// The name of maintenance database to connect to in order to create the test database.
|
// The name of maintenance database to connect to in order to create the test database.
|
||||||
postgresDatabase = defaulting(os.Getenv("POSTGRES_DATABASE"), "postgres")
|
postgresDatabase = test.Defaulting(os.Getenv("POSTGRES_DATABASE"), "postgres")
|
||||||
|
// Postgres docker container name (for running psql)
|
||||||
|
postgresContainerName = os.Getenv("POSTGRES_CONTAINER")
|
||||||
// The name of the test database to create.
|
// The name of the test database to create.
|
||||||
testDatabaseName = defaulting(os.Getenv("DATABASE_NAME"), "syncserver_test")
|
testDatabaseName = test.Defaulting(os.Getenv("DATABASE_NAME"), "syncserver_test")
|
||||||
// The postgres connection config for connecting to the test database.
|
// The postgres connection config for connecting to the test database.
|
||||||
testDatabase = defaulting(os.Getenv("DATABASE"), fmt.Sprintf("dbname=%s sslmode=disable binary_parameters=yes", testDatabaseName))
|
testDatabase = test.Defaulting(os.Getenv("DATABASE"), fmt.Sprintf("dbname=%s sslmode=disable binary_parameters=yes", testDatabaseName))
|
||||||
)
|
)
|
||||||
|
|
||||||
const inputTopic = "syncserverInput"
|
const inputTopic = "syncserverInput"
|
||||||
|
|
@ -63,36 +62,12 @@ var exe = test.KafkaExecutor{
|
||||||
OutputWriter: os.Stderr,
|
OutputWriter: os.Stderr,
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
lastRequestMutex sync.Mutex
|
|
||||||
lastRequestErr error
|
|
||||||
)
|
|
||||||
|
|
||||||
func setLastRequestError(err error) {
|
|
||||||
lastRequestMutex.Lock()
|
|
||||||
defer lastRequestMutex.Unlock()
|
|
||||||
lastRequestErr = err
|
|
||||||
}
|
|
||||||
|
|
||||||
func getLastRequestError() error {
|
|
||||||
lastRequestMutex.Lock()
|
|
||||||
defer lastRequestMutex.Unlock()
|
|
||||||
return lastRequestErr
|
|
||||||
}
|
|
||||||
|
|
||||||
var syncServerConfigFileContents = (`consumer_uris: ["` + kafkaURI + `"]
|
var syncServerConfigFileContents = (`consumer_uris: ["` + kafkaURI + `"]
|
||||||
roomserver_topic: "` + inputTopic + `"
|
roomserver_topic: "` + inputTopic + `"
|
||||||
database: "` + testDatabase + `"
|
database: "` + testDatabase + `"
|
||||||
server_name: "localhost"
|
server_name: "localhost"
|
||||||
`)
|
`)
|
||||||
|
|
||||||
func defaulting(value, defaultValue string) string {
|
|
||||||
if value == "" {
|
|
||||||
value = defaultValue
|
|
||||||
}
|
|
||||||
return value
|
|
||||||
}
|
|
||||||
|
|
||||||
var timeout time.Duration
|
var timeout time.Duration
|
||||||
var clientEventTestData []string
|
var clientEventTestData []string
|
||||||
|
|
||||||
|
|
@ -108,31 +83,6 @@ func init() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: dupes roomserver integration tests. Factor out.
|
|
||||||
func createDatabase(database string) error {
|
|
||||||
cmd := exec.Command("psql", postgresDatabase)
|
|
||||||
cmd.Stdin = strings.NewReader(
|
|
||||||
fmt.Sprintf("DROP DATABASE IF EXISTS %s; CREATE DATABASE %s;", database, database),
|
|
||||||
)
|
|
||||||
// Send stdout and stderr to our stderr so that we see error messages from
|
|
||||||
// the psql process
|
|
||||||
cmd.Stdout = os.Stderr
|
|
||||||
cmd.Stderr = os.Stderr
|
|
||||||
return cmd.Run()
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: dupes roomserver integration tests. Factor out.
|
|
||||||
func canonicalJSONInput(jsonData []string) []string {
|
|
||||||
for i := range jsonData {
|
|
||||||
jsonBytes, err := gomatrixserverlib.CanonicalJSON([]byte(jsonData[i]))
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
jsonData[i] = string(jsonBytes)
|
|
||||||
}
|
|
||||||
return jsonData
|
|
||||||
}
|
|
||||||
|
|
||||||
func createTestUser(database, username, token string) error {
|
func createTestUser(database, username, token string) error {
|
||||||
cmd := exec.Command(
|
cmd := exec.Command(
|
||||||
filepath.Join(filepath.Dir(os.Args[0]), "create-account"),
|
filepath.Join(filepath.Dir(os.Args[0]), "create-account"),
|
||||||
|
|
@ -172,66 +122,32 @@ func clientEventJSONForOutputRoomEvent(outputRoomEvent string) string {
|
||||||
return string(jsonBytes)
|
return string(jsonBytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
// doSyncRequest does a /sync request and returns an error if it fails or doesn't
|
|
||||||
// return the wanted string.
|
|
||||||
func doSyncRequest(syncServerURL, want string) error {
|
|
||||||
cli := &http.Client{
|
|
||||||
Timeout: 5 * time.Second,
|
|
||||||
}
|
|
||||||
res, err := cli.Get(syncServerURL)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if res.StatusCode != 200 {
|
|
||||||
return fmt.Errorf("/sync returned HTTP status %d", res.StatusCode)
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
resBytes, err := ioutil.ReadAll(res.Body)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
jsonBytes, err := gomatrixserverlib.CanonicalJSON(resBytes)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if string(jsonBytes) != want {
|
|
||||||
return fmt.Errorf("/sync returned wrong bytes. Expected:\n%s\n\nGot:\n%s", want, string(jsonBytes))
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// syncRequestUntilSuccess blocks and performs the same /sync request over and over until
|
|
||||||
// the response returns the wanted string, where it will close the given channel and return.
|
|
||||||
// It will keep track of the last error in `lastRequestErr`.
|
|
||||||
func syncRequestUntilSuccess(done chan error, userID, since, want string) {
|
|
||||||
for {
|
|
||||||
sinceQuery := ""
|
|
||||||
if since != "" {
|
|
||||||
sinceQuery = "&since=" + since
|
|
||||||
}
|
|
||||||
err := doSyncRequest(
|
|
||||||
// low value timeout so polling with an up-to-date token returns quickly
|
|
||||||
"http://"+syncserverAddr+"/api/_matrix/client/r0/sync?timeout=100&access_token="+userID+sinceQuery,
|
|
||||||
want,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
setLastRequestError(err)
|
|
||||||
time.Sleep(1 * time.Second) // don't tightloop
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
close(done)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// startSyncServer creates the database and config file needed for the sync server to run and
|
// startSyncServer creates the database and config file needed for the sync server to run and
|
||||||
// then starts the sync server. The Cmd being executed is returned. A channel is also returned,
|
// then starts the sync server. The Cmd being executed is returned. A channel is also returned,
|
||||||
// which will have any termination errors sent down it, followed immediately by the channel being closed.
|
// which will have any termination errors sent down it, followed immediately by the channel being closed.
|
||||||
func startSyncServer() (*exec.Cmd, chan error) {
|
func startSyncServer() (*exec.Cmd, chan error) {
|
||||||
if err := createDatabase(testDatabaseName); err != nil {
|
const configFilename = "sync-api-server-config-test.yaml"
|
||||||
panic(err)
|
|
||||||
|
serverArgs := []string{
|
||||||
|
"--config", configFilename,
|
||||||
|
"--listen", syncserverAddr,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
databases := []string{
|
||||||
|
testDatabaseName,
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd, cmdChan := test.StartServer(
|
||||||
|
"sync-api",
|
||||||
|
serverArgs,
|
||||||
|
"",
|
||||||
|
configFilename,
|
||||||
|
syncServerConfigFileContents,
|
||||||
|
postgresDatabase,
|
||||||
|
postgresContainerName,
|
||||||
|
databases,
|
||||||
|
)
|
||||||
|
|
||||||
if err := createTestUser(testDatabase, "alice", "@alice:localhost"); err != nil {
|
if err := createTestUser(testDatabase, "alice", "@alice:localhost"); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
@ -242,29 +158,7 @@ func startSyncServer() (*exec.Cmd, chan error) {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
const configFileName = "sync-api-server-config-test.yaml"
|
return cmd, cmdChan
|
||||||
err := ioutil.WriteFile(configFileName, []byte(syncServerConfigFileContents), 0644)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd := exec.Command(
|
|
||||||
filepath.Join(filepath.Dir(os.Args[0]), "dendrite-sync-api-server"),
|
|
||||||
"--config", configFileName,
|
|
||||||
"--listen", syncserverAddr,
|
|
||||||
)
|
|
||||||
cmd.Stderr = os.Stderr
|
|
||||||
cmd.Stdout = os.Stderr
|
|
||||||
|
|
||||||
if err := cmd.Start(); err != nil {
|
|
||||||
panic("failed to start sync server: " + err.Error())
|
|
||||||
}
|
|
||||||
syncServerCmdChan := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
syncServerCmdChan <- cmd.Wait()
|
|
||||||
close(syncServerCmdChan)
|
|
||||||
}()
|
|
||||||
return cmd, syncServerCmdChan
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepareKafka creates the topics which will be written to by the tests.
|
// prepareKafka creates the topics which will be written to by the tests.
|
||||||
|
|
@ -277,49 +171,24 @@ func prepareKafka() {
|
||||||
|
|
||||||
func testSyncServer(syncServerCmdChan chan error, userID, since, want string) {
|
func testSyncServer(syncServerCmdChan chan error, userID, since, want string) {
|
||||||
fmt.Printf("==TESTING== testSyncServer(%s,%s)\n", userID, since)
|
fmt.Printf("==TESTING== testSyncServer(%s,%s)\n", userID, since)
|
||||||
done := make(chan error, 1)
|
sinceQuery := ""
|
||||||
|
if since != "" {
|
||||||
// We need to wait for the sync server to:
|
sinceQuery = "&since=" + since
|
||||||
// - have created the tables
|
|
||||||
// - be listening on the given port
|
|
||||||
// - have consumed the kafka logs
|
|
||||||
// before we begin hitting it with /sync requests. We don't get told when it has done
|
|
||||||
// all these things, so we just continually hit /sync until it returns the right bytes.
|
|
||||||
// We can't even wait for the first valid 200 OK response because it's possible to race
|
|
||||||
// with consuming the kafka logs (so the /sync response will be missing events and
|
|
||||||
// therefore fail the test).
|
|
||||||
go syncRequestUntilSuccess(done, userID, since, canonicalJSONInput([]string{want})[0])
|
|
||||||
|
|
||||||
// wait for one of:
|
|
||||||
// - the test to pass (done channel is closed)
|
|
||||||
// - the sync server to exit with an error (error sent on syncServerCmdChan)
|
|
||||||
// - our test timeout to expire
|
|
||||||
// We don't need to clean up since the main() function handles that in the event we panic
|
|
||||||
var testPassed bool
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-time.After(timeout):
|
|
||||||
if testPassed {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
fmt.Printf("==TESTING== testSyncServer(%s,%s) TIMEOUT\n", userID, since)
|
|
||||||
if reqErr := getLastRequestError(); reqErr != nil {
|
|
||||||
fmt.Println("Last /sync request error:")
|
|
||||||
fmt.Println(reqErr)
|
|
||||||
}
|
|
||||||
panic("dendrite-sync-api-server timed out")
|
|
||||||
case err := <-syncServerCmdChan:
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("=============================================================================================")
|
|
||||||
fmt.Println("sync server failed to run. If failing with 'pq: password authentication failed for user' try:")
|
|
||||||
fmt.Println(" export PGHOST=/var/run/postgresql")
|
|
||||||
fmt.Println("=============================================================================================")
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
case <-done:
|
|
||||||
testPassed = true
|
|
||||||
fmt.Printf("==TESTING== testSyncServer(%s,%s) PASSED\n", userID, since)
|
|
||||||
}
|
}
|
||||||
|
req, err := http.NewRequest(
|
||||||
|
"GET",
|
||||||
|
"http://"+syncserverAddr+"/api/_matrix/client/r0/sync?timeout=100&access_token="+userID+sinceQuery,
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
testReq := &test.Request{
|
||||||
|
Req: req,
|
||||||
|
WantedStatusCode: 200,
|
||||||
|
WantedBody: test.CanonicalJSONInput([]string{want})[0],
|
||||||
|
}
|
||||||
|
testReq.Run("sync-api", timeout, syncServerCmdChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeToRoomServerLog(indexes ...int) {
|
func writeToRoomServerLog(indexes ...int) {
|
||||||
|
|
@ -327,7 +196,7 @@ func writeToRoomServerLog(indexes ...int) {
|
||||||
for _, i := range indexes {
|
for _, i := range indexes {
|
||||||
roomEvents = append(roomEvents, outputRoomEventTestData[i])
|
roomEvents = append(roomEvents, outputRoomEventTestData[i])
|
||||||
}
|
}
|
||||||
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(roomEvents)); err != nil {
|
if err := exe.WriteToTopic(inputTopic, test.CanonicalJSONInput(roomEvents)); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue