mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-10 08:23:11 -06:00
Name change
This commit is contained in:
parent
7b56d66a2f
commit
982431e53d
|
|
@ -48,7 +48,7 @@ var (
|
||||||
testDatabase = defaulting(os.Getenv("DATABASE"), fmt.Sprintf("dbname=%s binary_parameters=yes", testDatabaseName))
|
testDatabase = defaulting(os.Getenv("DATABASE"), fmt.Sprintf("dbname=%s binary_parameters=yes", testDatabaseName))
|
||||||
)
|
)
|
||||||
|
|
||||||
var env = test.KafkaEnv{
|
var exe = test.KafkaExecutor{
|
||||||
ZookeeperURI: zookeeperURI,
|
ZookeeperURI: zookeeperURI,
|
||||||
KafkaDirectory: kafkaDir,
|
KafkaDirectory: kafkaDir,
|
||||||
KafkaURI: kafkaURI,
|
KafkaURI: kafkaURI,
|
||||||
|
|
@ -168,16 +168,16 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R
|
||||||
inputTopic = "roomserverInput"
|
inputTopic = "roomserverInput"
|
||||||
outputTopic = "roomserverOutput"
|
outputTopic = "roomserverOutput"
|
||||||
)
|
)
|
||||||
env.DeleteTopic(inputTopic)
|
exe.DeleteTopic(inputTopic)
|
||||||
if err := env.CreateTopic(inputTopic); err != nil {
|
if err := exe.CreateTopic(inputTopic); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
env.DeleteTopic(outputTopic)
|
exe.DeleteTopic(outputTopic)
|
||||||
if err := env.CreateTopic(outputTopic); err != nil {
|
if err := exe.CreateTopic(outputTopic); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := env.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil {
|
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ var (
|
||||||
|
|
||||||
const inputTopic = "syncserverInput"
|
const inputTopic = "syncserverInput"
|
||||||
|
|
||||||
var env = test.KafkaEnv{
|
var exe = test.KafkaExecutor{
|
||||||
ZookeeperURI: zookeeperURI,
|
ZookeeperURI: zookeeperURI,
|
||||||
KafkaDirectory: kafkaDir,
|
KafkaDirectory: kafkaDir,
|
||||||
KafkaURI: kafkaURI,
|
KafkaURI: kafkaURI,
|
||||||
|
|
@ -146,11 +146,11 @@ func doSyncRequest(done chan error, want []string, since string) func() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSyncServer(input, want []string, since string) {
|
func testSyncServer(input, want []string, since string) {
|
||||||
env.DeleteTopic(inputTopic)
|
exe.DeleteTopic(inputTopic)
|
||||||
if err := env.CreateTopic(inputTopic); err != nil {
|
if err := exe.CreateTopic(inputTopic); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
if err := env.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil {
|
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,8 +21,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
// KafkaEnv contains the kafka environment information
|
// KafkaExecutor executes kafka scripts.
|
||||||
type KafkaEnv struct {
|
type KafkaExecutor struct {
|
||||||
// The location of Zookeeper. Typically this is `localhost:2181`.
|
// The location of Zookeeper. Typically this is `localhost:2181`.
|
||||||
ZookeeperURI string
|
ZookeeperURI string
|
||||||
// The directory where Kafka is installed to. Used to locate kafka scripts.
|
// The directory where Kafka is installed to. Used to locate kafka scripts.
|
||||||
|
|
@ -34,7 +34,7 @@ type KafkaEnv struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateTopic creates a new kafka topic. This is created with a single partition.
|
// CreateTopic creates a new kafka topic. This is created with a single partition.
|
||||||
func (e *KafkaEnv) CreateTopic(topic string) error {
|
func (e *KafkaExecutor) CreateTopic(topic string) error {
|
||||||
cmd := exec.Command(
|
cmd := exec.Command(
|
||||||
filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"),
|
filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"),
|
||||||
"--create",
|
"--create",
|
||||||
|
|
@ -49,7 +49,7 @@ func (e *KafkaEnv) CreateTopic(topic string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteToTopic writes data to a kafka topic.
|
// WriteToTopic writes data to a kafka topic.
|
||||||
func (e *KafkaEnv) WriteToTopic(topic string, data []string) error {
|
func (e *KafkaExecutor) WriteToTopic(topic string, data []string) error {
|
||||||
cmd := exec.Command(
|
cmd := exec.Command(
|
||||||
filepath.Join(e.KafkaDirectory, "bin", "kafka-console-producer.sh"),
|
filepath.Join(e.KafkaDirectory, "bin", "kafka-console-producer.sh"),
|
||||||
"--broker-list", e.KafkaURI,
|
"--broker-list", e.KafkaURI,
|
||||||
|
|
@ -62,7 +62,7 @@ func (e *KafkaEnv) WriteToTopic(topic string, data []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteTopic deletes a given kafka topic if it exists.
|
// DeleteTopic deletes a given kafka topic if it exists.
|
||||||
func (e *KafkaEnv) DeleteTopic(topic string) error {
|
func (e *KafkaExecutor) DeleteTopic(topic string) error {
|
||||||
cmd := exec.Command(
|
cmd := exec.Command(
|
||||||
filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"),
|
filepath.Join(e.KafkaDirectory, "bin", "kafka-topics.sh"),
|
||||||
"--delete",
|
"--delete",
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue