diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index f8f6ff91a..4f1786fab 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -17,13 +17,10 @@ package main import ( "encoding/json" "fmt" - "io/ioutil" "net/http" "os" "os/exec" "path/filepath" - "strings" - "sync" "time" "github.com/matrix-org/dendrite/common/test" @@ -33,23 +30,25 @@ import ( var ( // Path to where kafka is installed. - kafkaDir = defaulting(os.Getenv("KAFKA_DIR"), "kafka") + kafkaDir = test.Defaulting(os.Getenv("KAFKA_DIR"), "kafka") // The URI the kafka zookeeper is listening on. - zookeeperURI = defaulting(os.Getenv("ZOOKEEPER_URI"), "localhost:2181") + zookeeperURI = test.Defaulting(os.Getenv("ZOOKEEPER_URI"), "localhost:2181") // The URI the kafka server is listening on. - kafkaURI = defaulting(os.Getenv("KAFKA_URIS"), "localhost:9092") + kafkaURI = test.Defaulting(os.Getenv("KAFKA_URIS"), "localhost:9092") // The address the syncserver should listen on. - syncserverAddr = defaulting(os.Getenv("SYNCSERVER_URI"), "localhost:9876") + syncserverAddr = test.Defaulting(os.Getenv("SYNCSERVER_URI"), "localhost:9876") // How long to wait for the syncserver to write the expected output messages. // This needs to be high enough to account for the time it takes to create // the postgres database tables which can take a while on travis. - timeoutString = defaulting(os.Getenv("TIMEOUT"), "10s") + timeoutString = test.Defaulting(os.Getenv("TIMEOUT"), "10s") // The name of maintenance database to connect to in order to create the test database. - postgresDatabase = defaulting(os.Getenv("POSTGRES_DATABASE"), "postgres") + postgresDatabase = test.Defaulting(os.Getenv("POSTGRES_DATABASE"), "postgres") + // Postgres docker container name (for running psql) + postgresContainerName = os.Getenv("POSTGRES_CONTAINER") // The name of the test database to create. - testDatabaseName = defaulting(os.Getenv("DATABASE_NAME"), "syncserver_test") + testDatabaseName = test.Defaulting(os.Getenv("DATABASE_NAME"), "syncserver_test") // The postgres connection config for connecting to the test database. - testDatabase = defaulting(os.Getenv("DATABASE"), fmt.Sprintf("dbname=%s sslmode=disable binary_parameters=yes", testDatabaseName)) + testDatabase = test.Defaulting(os.Getenv("DATABASE"), fmt.Sprintf("dbname=%s sslmode=disable binary_parameters=yes", testDatabaseName)) ) const inputTopic = "syncserverInput" @@ -63,36 +62,12 @@ var exe = test.KafkaExecutor{ OutputWriter: os.Stderr, } -var ( - lastRequestMutex sync.Mutex - lastRequestErr error -) - -func setLastRequestError(err error) { - lastRequestMutex.Lock() - defer lastRequestMutex.Unlock() - lastRequestErr = err -} - -func getLastRequestError() error { - lastRequestMutex.Lock() - defer lastRequestMutex.Unlock() - return lastRequestErr -} - var syncServerConfigFileContents = (`consumer_uris: ["` + kafkaURI + `"] roomserver_topic: "` + inputTopic + `" database: "` + testDatabase + `" server_name: "localhost" `) -func defaulting(value, defaultValue string) string { - if value == "" { - value = defaultValue - } - return value -} - var timeout time.Duration var clientEventTestData []string @@ -108,31 +83,6 @@ func init() { } } -// TODO: dupes roomserver integration tests. Factor out. -func createDatabase(database string) error { - cmd := exec.Command("psql", postgresDatabase) - cmd.Stdin = strings.NewReader( - fmt.Sprintf("DROP DATABASE IF EXISTS %s; CREATE DATABASE %s;", database, database), - ) - // Send stdout and stderr to our stderr so that we see error messages from - // the psql process - cmd.Stdout = os.Stderr - cmd.Stderr = os.Stderr - return cmd.Run() -} - -// TODO: dupes roomserver integration tests. Factor out. -func canonicalJSONInput(jsonData []string) []string { - for i := range jsonData { - jsonBytes, err := gomatrixserverlib.CanonicalJSON([]byte(jsonData[i])) - if err != nil { - panic(err) - } - jsonData[i] = string(jsonBytes) - } - return jsonData -} - func createTestUser(database, username, token string) error { cmd := exec.Command( filepath.Join(filepath.Dir(os.Args[0]), "create-account"), @@ -172,66 +122,32 @@ func clientEventJSONForOutputRoomEvent(outputRoomEvent string) string { return string(jsonBytes) } -// doSyncRequest does a /sync request and returns an error if it fails or doesn't -// return the wanted string. -func doSyncRequest(syncServerURL, want string) error { - cli := &http.Client{ - Timeout: 5 * time.Second, - } - res, err := cli.Get(syncServerURL) - if err != nil { - return err - } - if res.StatusCode != 200 { - return fmt.Errorf("/sync returned HTTP status %d", res.StatusCode) - } - defer res.Body.Close() - resBytes, err := ioutil.ReadAll(res.Body) - if err != nil { - return err - } - jsonBytes, err := gomatrixserverlib.CanonicalJSON(resBytes) - if err != nil { - return err - } - if string(jsonBytes) != want { - return fmt.Errorf("/sync returned wrong bytes. Expected:\n%s\n\nGot:\n%s", want, string(jsonBytes)) - } - return nil -} - -// syncRequestUntilSuccess blocks and performs the same /sync request over and over until -// the response returns the wanted string, where it will close the given channel and return. -// It will keep track of the last error in `lastRequestErr`. -func syncRequestUntilSuccess(done chan error, userID, since, want string) { - for { - sinceQuery := "" - if since != "" { - sinceQuery = "&since=" + since - } - err := doSyncRequest( - // low value timeout so polling with an up-to-date token returns quickly - "http://"+syncserverAddr+"/api/_matrix/client/r0/sync?timeout=100&access_token="+userID+sinceQuery, - want, - ) - if err != nil { - setLastRequestError(err) - time.Sleep(1 * time.Second) // don't tightloop - continue - } - close(done) - return - } -} - // startSyncServer creates the database and config file needed for the sync server to run and // then starts the sync server. The Cmd being executed is returned. A channel is also returned, // which will have any termination errors sent down it, followed immediately by the channel being closed. func startSyncServer() (*exec.Cmd, chan error) { - if err := createDatabase(testDatabaseName); err != nil { - panic(err) + const configFilename = "sync-api-server-config-test.yaml" + + serverArgs := []string{ + "--config", configFilename, + "--listen", syncserverAddr, } + databases := []string{ + testDatabaseName, + } + + cmd, cmdChan := test.StartServer( + "sync-api", + serverArgs, + "", + configFilename, + syncServerConfigFileContents, + postgresDatabase, + postgresContainerName, + databases, + ) + if err := createTestUser(testDatabase, "alice", "@alice:localhost"); err != nil { panic(err) } @@ -242,29 +158,7 @@ func startSyncServer() (*exec.Cmd, chan error) { panic(err) } - const configFileName = "sync-api-server-config-test.yaml" - err := ioutil.WriteFile(configFileName, []byte(syncServerConfigFileContents), 0644) - if err != nil { - panic(err) - } - - cmd := exec.Command( - filepath.Join(filepath.Dir(os.Args[0]), "dendrite-sync-api-server"), - "--config", configFileName, - "--listen", syncserverAddr, - ) - cmd.Stderr = os.Stderr - cmd.Stdout = os.Stderr - - if err := cmd.Start(); err != nil { - panic("failed to start sync server: " + err.Error()) - } - syncServerCmdChan := make(chan error, 1) - go func() { - syncServerCmdChan <- cmd.Wait() - close(syncServerCmdChan) - }() - return cmd, syncServerCmdChan + return cmd, cmdChan } // prepareKafka creates the topics which will be written to by the tests. @@ -277,49 +171,24 @@ func prepareKafka() { func testSyncServer(syncServerCmdChan chan error, userID, since, want string) { fmt.Printf("==TESTING== testSyncServer(%s,%s)\n", userID, since) - done := make(chan error, 1) - - // We need to wait for the sync server to: - // - have created the tables - // - be listening on the given port - // - have consumed the kafka logs - // before we begin hitting it with /sync requests. We don't get told when it has done - // all these things, so we just continually hit /sync until it returns the right bytes. - // We can't even wait for the first valid 200 OK response because it's possible to race - // with consuming the kafka logs (so the /sync response will be missing events and - // therefore fail the test). - go syncRequestUntilSuccess(done, userID, since, canonicalJSONInput([]string{want})[0]) - - // wait for one of: - // - the test to pass (done channel is closed) - // - the sync server to exit with an error (error sent on syncServerCmdChan) - // - our test timeout to expire - // We don't need to clean up since the main() function handles that in the event we panic - var testPassed bool - - select { - case <-time.After(timeout): - if testPassed { - break - } - fmt.Printf("==TESTING== testSyncServer(%s,%s) TIMEOUT\n", userID, since) - if reqErr := getLastRequestError(); reqErr != nil { - fmt.Println("Last /sync request error:") - fmt.Println(reqErr) - } - panic("dendrite-sync-api-server timed out") - case err := <-syncServerCmdChan: - if err != nil { - fmt.Println("=============================================================================================") - fmt.Println("sync server failed to run. If failing with 'pq: password authentication failed for user' try:") - fmt.Println(" export PGHOST=/var/run/postgresql") - fmt.Println("=============================================================================================") - panic(err) - } - case <-done: - testPassed = true - fmt.Printf("==TESTING== testSyncServer(%s,%s) PASSED\n", userID, since) + sinceQuery := "" + if since != "" { + sinceQuery = "&since=" + since } + req, err := http.NewRequest( + "GET", + "http://"+syncserverAddr+"/api/_matrix/client/r0/sync?timeout=100&access_token="+userID+sinceQuery, + nil, + ) + if err != nil { + panic(err) + } + testReq := &test.Request{ + Req: req, + WantedStatusCode: 200, + WantedBody: test.CanonicalJSONInput([]string{want})[0], + } + testReq.Run("sync-api", timeout, syncServerCmdChan) } func writeToRoomServerLog(indexes ...int) { @@ -327,7 +196,7 @@ func writeToRoomServerLog(indexes ...int) { for _, i := range indexes { roomEvents = append(roomEvents, outputRoomEventTestData[i]) } - if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(roomEvents)); err != nil { + if err := exe.WriteToTopic(inputTopic, test.CanonicalJSONInput(roomEvents)); err != nil { panic(err) } }