Factor out creating/deleting/writing to kafka topics (#94)

This commit is contained in:
Kegsay 2017-05-09 09:05:05 +01:00 committed by GitHub
parent 801b9246ce
commit 42564e8ed6
3 changed files with 107 additions and 99 deletions

View file

@ -16,13 +16,15 @@ package main
import ( import (
"fmt" "fmt"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings" "strings"
"time" "time"
"github.com/matrix-org/dendrite/common/test"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
) )
var ( var (
@ -46,6 +48,15 @@ var (
testDatabase = defaulting(os.Getenv("DATABASE"), fmt.Sprintf("dbname=%s binary_parameters=yes", testDatabaseName)) testDatabase = defaulting(os.Getenv("DATABASE"), fmt.Sprintf("dbname=%s binary_parameters=yes", testDatabaseName))
) )
var exe = test.KafkaExecutor{
ZookeeperURI: zookeeperURI,
KafkaDirectory: kafkaDir,
KafkaURI: kafkaURI,
// Send stdout and stderr to our stderr so that we see error messages from
// the kafka process.
OutputWriter: os.Stderr,
}
func defaulting(value, defaultValue string) string { func defaulting(value, defaultValue string) string {
if value == "" { if value == "" {
value = defaultValue value = defaultValue
@ -75,36 +86,6 @@ func createDatabase(database string) error {
return cmd.Run() return cmd.Run()
} }
func createTopic(topic string) error {
cmd := exec.Command(
filepath.Join(kafkaDir, "bin", "kafka-topics.sh"),
"--create",
"--zookeeper", zookeeperURI,
"--replication-factor", "1",
"--partitions", "1",
"--topic", topic,
)
// Send stdout and stderr to our stderr so that we see error messages from
// the kafka process.
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr
return cmd.Run()
}
func writeToTopic(topic string, data []string) error {
cmd := exec.Command(
filepath.Join(kafkaDir, "bin", "kafka-console-producer.sh"),
"--broker-list", kafkaURI,
"--topic", topic,
)
// Send stdout and stderr to our stderr so that we see error messages from
// the kafka process.
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr
cmd.Stdin = strings.NewReader(strings.Join(data, "\n"))
return cmd.Run()
}
// runAndReadFromTopic runs a command and waits for a number of messages to be // runAndReadFromTopic runs a command and waits for a number of messages to be
// written to a kafka topic. It returns if the command exits, the number of // written to a kafka topic. It returns if the command exits, the number of
// messages is reached or after a timeout. It kills the command before it returns. // messages is reached or after a timeout. It kills the command before it returns.
@ -173,19 +154,6 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP
return lines, nil return lines, nil
} }
func deleteTopic(topic string) error {
cmd := exec.Command(
filepath.Join(kafkaDir, "bin", "kafka-topics.sh"),
"--delete",
"--if-exists",
"--zookeeper", zookeeperURI,
"--topic", topic,
)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stderr
return cmd.Run()
}
// testRoomserver is used to run integration tests against a single roomserver. // testRoomserver is used to run integration tests against a single roomserver.
// It creates new kafka topics for the input and output of the roomserver. // It creates new kafka topics for the input and output of the roomserver.
// It writes the input messages to the input kafka topic, formatting each message // It writes the input messages to the input kafka topic, formatting each message
@ -200,16 +168,16 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R
inputTopic = "roomserverInput" inputTopic = "roomserverInput"
outputTopic = "roomserverOutput" outputTopic = "roomserverOutput"
) )
deleteTopic(inputTopic) exe.DeleteTopic(inputTopic)
if err := createTopic(inputTopic); err != nil { if err := exe.CreateTopic(inputTopic); err != nil {
panic(err) panic(err)
} }
deleteTopic(outputTopic) exe.DeleteTopic(outputTopic)
if err := createTopic(outputTopic); err != nil { if err := exe.CreateTopic(outputTopic); err != nil {
panic(err) panic(err)
} }
if err := writeToTopic(inputTopic, canonicalJSONInput(input)); err != nil { if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil {
panic(err) panic(err)
} }

View file

@ -24,6 +24,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/matrix-org/dendrite/common/test"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -50,6 +51,15 @@ var (
const inputTopic = "syncserverInput" const inputTopic = "syncserverInput"
var exe = test.KafkaExecutor{
ZookeeperURI: zookeeperURI,
KafkaDirectory: kafkaDir,
KafkaURI: kafkaURI,
// Send stdout and stderr to our stderr so that we see error messages from
// the kafka process.
OutputWriter: os.Stderr,
}
var syncServerConfigFileContents = (`consumer_uris: ["` + kafkaURI + `"] var syncServerConfigFileContents = (`consumer_uris: ["` + kafkaURI + `"]
roomserver_topic: "` + inputTopic + `" roomserver_topic: "` + inputTopic + `"
database: "` + testDatabase + `" database: "` + testDatabase + `"
@ -85,52 +95,6 @@ func createDatabase(database string) error {
return cmd.Run() return cmd.Run()
} }
// TODO: dupes roomserver integration tests. Factor out.
func createTopic(topic string) error {
cmd := exec.Command(
filepath.Join(kafkaDir, "bin", "kafka-topics.sh"),
"--create",
"--zookeeper", zookeeperURI,
"--replication-factor", "1",
"--partitions", "1",
"--topic", topic,
)
// Send stdout and stderr to our stderr so that we see error messages from
// the kafka process.
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr
return cmd.Run()
}
// TODO: dupes roomserver integration tests. Factor out.
func writeToTopic(topic string, data []string) error {
cmd := exec.Command(
filepath.Join(kafkaDir, "bin", "kafka-console-producer.sh"),
"--broker-list", kafkaURI,
"--topic", topic,
)
// Send stdout and stderr to our stderr so that we see error messages from
// the kafka process.
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr
cmd.Stdin = strings.NewReader(strings.Join(data, "\n"))
return cmd.Run()
}
// TODO: dupes roomserver integration tests. Factor out.
func deleteTopic(topic string) error {
cmd := exec.Command(
filepath.Join(kafkaDir, "bin", "kafka-topics.sh"),
"--delete",
"--if-exists",
"--zookeeper", zookeeperURI,
"--topic", topic,
)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stderr
return cmd.Run()
}
// TODO: dupes roomserver integration tests. Factor out. // TODO: dupes roomserver integration tests. Factor out.
func canonicalJSONInput(jsonData []string) []string { func canonicalJSONInput(jsonData []string) []string {
for i := range jsonData { for i := range jsonData {
@ -182,11 +146,11 @@ func doSyncRequest(done chan error, want []string, since string) func() {
} }
func testSyncServer(input, want []string, since string) { func testSyncServer(input, want []string, since string) {
deleteTopic(inputTopic) exe.DeleteTopic(inputTopic)
if err := createTopic(inputTopic); err != nil { if err := exe.CreateTopic(inputTopic); err != nil {
panic(err) panic(err)
} }
if err := writeToTopic(inputTopic, canonicalJSONInput(input)); err != nil { if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil {
panic(err) panic(err)
} }

View file

@ -0,0 +1,76 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"io"
"os/exec"
"path/filepath"
"strings"
)
// KafkaExecutor executes kafka scripts.
type KafkaExecutor struct {
// The location of Zookeeper. Typically this is `localhost:2181`.
ZookeeperURI string
// The directory where Kafka is installed to. Used to locate kafka scripts.
KafkaDirectory string
// The location of the Kafka logs. Typically this is `localhost:9092`.
KafkaURI string
// Where stdout and stderr should be written to. Typically this is `os.Stderr`.
OutputWriter io.Writer
}
// CreateTopic creates a new kafka topic. This is created with a single partition.
func (e *KafkaExecutor) CreateTopic(topic string) error {
cmd := exec.Command(
filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"),
"--create",
"--zookeeper", e.ZookeeperURI,
"--replication-factor", "1",
"--partitions", "1",
"--topic", topic,
)
cmd.Stdout = e.OutputWriter
cmd.Stderr = e.OutputWriter
return cmd.Run()
}
// WriteToTopic writes data to a kafka topic.
func (e *KafkaExecutor) WriteToTopic(topic string, data []string) error {
cmd := exec.Command(
filepath.Join(e.KafkaDirectory, "bin", "kafka-console-producer.sh"),
"--broker-list", e.KafkaURI,
"--topic", topic,
)
cmd.Stdout = e.OutputWriter
cmd.Stderr = e.OutputWriter
cmd.Stdin = strings.NewReader(strings.Join(data, "\n"))
return cmd.Run()
}
// DeleteTopic deletes a given kafka topic if it exists.
func (e *KafkaExecutor) DeleteTopic(topic string) error {
cmd := exec.Command(
filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"),
"--delete",
"--if-exists",
"--zookeeper", e.ZookeeperURI,
"--topic", topic,
)
cmd.Stderr = e.OutputWriter
cmd.Stdout = e.OutputWriter
return cmd.Run()
}