From 5a536aaf18eaabf83aed35edc483112071220aee Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 17 Jul 2017 14:10:55 +0100 Subject: [PATCH] Use synchronous HTTP API for writing events to the roomserver --- .../dendrite/cmd/dendrite-room-server/main.go | 47 ++----- .../cmd/roomserver-integration-tests/main.go | 78 ++++++++-- .../dendrite/roomserver/input/consumer.go | 133 ------------------ 3 files changed, 73 insertions(+), 185 deletions(-) delete mode 100644 src/github.com/matrix-org/dendrite/roomserver/input/consumer.go diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go index 0a1686791..715a40740 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go @@ -16,11 +16,9 @@ package main import ( "flag" - "fmt" "net/http" _ "net/http/pprof" "os" - "strconv" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/common" @@ -33,9 +31,8 @@ import ( ) var ( - logDir = os.Getenv("LOG_DIR") - configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.") - stopProcessingAfter = os.Getenv("STOP_AFTER") + logDir = os.Getenv("LOG_DIR") + configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.") ) func main() { @@ -56,49 +53,25 @@ func main() { panic(err) } - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - panic(err) - } - kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil) if err != nil { panic(err) } - consumer := input.Consumer{ - ContinualConsumer: common.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.InputRoomEvent), - Consumer: kafkaConsumer, - PartitionStore: db, - }, - DB: db, - Producer: kafkaProducer, - OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent), - } - - if stopProcessingAfter != "" { - count, err := strconv.ParseInt(stopProcessingAfter, 10, 64) - if err != nil { - panic(err) - } - consumer.StopProcessingAfter = &count - consumer.ShutdownCallback = func(message string) { - fmt.Println("Stopping roomserver", message) - os.Exit(0) - } - } - - if err = consumer.Start(); err != nil { - panic(err) - } - queryAPI := query.RoomserverQueryAPI{ DB: db, } queryAPI.SetupHTTP(http.DefaultServeMux) + inputAPI := input.RoomserverInputAPI{ + DB: db, + Producer: kafkaProducer, + OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent), + } + + inputAPI.SetupHTTP(http.DefaultServeMux) + http.DefaultServeMux.Handle("/metrics", prometheus.Handler()) log.Info("Started room server on ", cfg.Listen.RoomServer) diff --git a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go index c4bea7f3e..820d1cf47 100644 --- a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go @@ -23,6 +23,10 @@ import ( "strings" "time" + "encoding/json" + + "net/http" + "github.com/matrix-org/dendrite/common/test" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -90,7 +94,7 @@ func createDatabase(database string) error { // messages is reached or after a timeout. It kills the command before it returns. // It returns a list of the messages read from the command on success or an error // on failure. -func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAPI func()) ([]string, error) { +func runAndReadFromTopic(runCmd *exec.Cmd, readyURL string, doInput func(), topic string, count int, checkQueryAPI func()) ([]string, error) { type result struct { // data holds all of stdout on success. data []byte @@ -107,6 +111,11 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP ) // Send stderr to our stderr so the user can see any error messages. readCmd.Stderr = os.Stderr + + // Kill both processes before we exit. + defer func() { runCmd.Process.Kill() }() + defer func() { readCmd.Process.Kill() }() + // Run the command, read the messages and wait for a timeout in parallel. go func() { // Read all of stdout. @@ -131,14 +140,42 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP time.Sleep(timeout) done <- result{nil, fmt.Errorf("Timeout reading %d messages from topic %q", count, topic)} }() + + ready := make(chan struct{}) + + go func() { + delay := 10 * time.Millisecond + for { + time.Sleep(delay) + if delay < 100*time.Millisecond { + delay *= 2 + } + fmt.Printf("Checking %s\n", readyURL) + resp, err := http.Get(readyURL) + if err != nil { + continue + } + if resp.StatusCode == 200 { + break + } + } + ready <- struct{}{} + }() + + // Wait for the roomserver to either be read to receive input or for it to + // crash. + select { + case <-ready: + case r := <-done: + return nil, r.err + } + + // Write the input now that the server is running. + doInput() + // Wait for one of the tasks to finsh. r := <-done - // Kill both processes. We don't check if the processes are running and - // we ignore failures since we are just trying to clean up before returning. - runCmd.Process.Kill() - readCmd.Process.Kill() - if r.err != nil { return nil, r.err } @@ -153,6 +190,20 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP return lines, nil } +func writeToRoomServer(input []string, roomserverURL string) error { + var request api.InputRoomEventsRequest + var response api.InputRoomEventsResponse + var err error + request.InputRoomEvents = make([]api.InputRoomEvent, len(input)) + for i := range input { + if err = json.Unmarshal([]byte(input[i]), &request.InputRoomEvents[i]); err != nil { + return err + } + } + x := api.NewRoomserverInputAPIHTTP(roomserverURL, nil) + return x.InputRoomEvents(&request, &response) +} + // testRoomserver is used to run integration tests against a single roomserver. // It creates new kafka topics for the input and output of the roomserver. // It writes the input messages to the input kafka topic, formatting each message @@ -176,24 +227,21 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R panic(err) } - inputTopic := string(cfg.Kafka.Topics.InputRoomEvent) outputTopic := string(cfg.Kafka.Topics.OutputRoomEvent) - exe.DeleteTopic(inputTopic) - if err := exe.CreateTopic(inputTopic); err != nil { - panic(err) - } exe.DeleteTopic(outputTopic) if err := exe.CreateTopic(outputTopic); err != nil { panic(err) } - if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil { + if err = createDatabase(testDatabaseName); err != nil { panic(err) } - if err = createDatabase(testDatabaseName); err != nil { - panic(err) + doInput := func() { + if err = writeToRoomServer(input, cfg.RoomServerURL()); err != nil { + panic(err) + } } cmd := exec.Command(filepath.Join(filepath.Dir(os.Args[0]), "dendrite-room-server")) @@ -205,7 +253,7 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R cmd.Stderr = os.Stderr cmd.Args = []string{"dendrite-room-server", "--config", filepath.Join(dir, test.ConfigFile)} - gotOutput, err := runAndReadFromTopic(cmd, outputTopic, len(wantOutput), func() { + gotOutput, err := runAndReadFromTopic(cmd, cfg.RoomServerURL()+"/metrics", doInput, outputTopic, len(wantOutput), func() { queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+string(cfg.Listen.RoomServer), nil) checkQueries(queryAPI) }) diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go deleted file mode 100644 index efe450381..000000000 --- a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package input contains the code that writes -package input - -import ( - "encoding/json" - "fmt" - "sync/atomic" - - "github.com/matrix-org/dendrite/common" - "github.com/matrix-org/dendrite/roomserver/api" - sarama "gopkg.in/Shopify/sarama.v1" -) - -// A ConsumerDatabase has the storage APIs needed by the consumer. -type ConsumerDatabase interface { - RoomEventDatabase - common.PartitionStorer -} - -// An ErrorLogger handles the errors encountered by the consumer. -type ErrorLogger interface { - OnError(message *sarama.ConsumerMessage, err error) -} - -// A Consumer consumes a kafkaesque stream of room events. -// The room events are supplied as api.InputRoomEvent structs serialised as JSON. -// The events should be valid matrix events. -// The events needed to authenticate the event should already be stored on the roomserver. -// The events needed to construct the state at the event should already be stored on the roomserver. -// If the event is not valid then it will be discarded and an error will be logged. -type Consumer struct { - ContinualConsumer common.ContinualConsumer - // The database used to store the room events. - DB ConsumerDatabase - Producer sarama.SyncProducer - // The kafkaesque topic to output new room events to. - // This is the name used in kafka to identify the stream to write events to. - OutputRoomEventTopic string - // The ErrorLogger for this consumer. - // If left as nil then the consumer will panic when it encounters an error - ErrorLogger ErrorLogger - // If non-nil then the consumer will stop processing messages after this - // many messages and will shutdown. Malformed messages are included in the count. - StopProcessingAfter *int64 - // If not-nil then the consumer will call this to shutdown the server. - ShutdownCallback func(reason string) - // How many messages the consumer has processed. - processed int64 -} - -// WriteOutputRoomEvent implements OutputRoomEventWriter -func (c *Consumer) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error { - var m sarama.ProducerMessage - oe := api.OutputEvent{ - Type: api.OutputTypeNewRoomEvent, - NewRoomEvent: &output, - } - value, err := json.Marshal(oe) - if err != nil { - return err - } - m.Topic = c.OutputRoomEventTopic - m.Key = sarama.StringEncoder("") - m.Value = sarama.ByteEncoder(value) - _, _, err = c.Producer.SendMessage(&m) - return err -} - -// Start starts the consumer consuming. -// Starts up a goroutine for each partition in the kafka stream. -// Returns nil once all the goroutines are started. -// Returns an error if it can't start consuming for any of the partitions. -func (c *Consumer) Start() error { - c.ContinualConsumer.ProcessMessage = c.processMessage - c.ContinualConsumer.ShutdownCallback = c.shutdown - return c.ContinualConsumer.Start() -} - -func (c *Consumer) processMessage(message *sarama.ConsumerMessage) error { - var input api.InputRoomEvent - if err := json.Unmarshal(message.Value, &input); err != nil { - // If the message is invalid then log it and move onto the next message in the stream. - c.logError(message, err) - } else { - if err := processRoomEvent(c.DB, c, input); err != nil { - // If there was an error processing the message then log it and - // move onto the next message in the stream. - // TODO: If the error was due to a problem talking to the database - // then we shouldn't move onto the next message and we should either - // retry processing the message, or panic and kill ourselves. - c.logError(message, err) - } - } - // Update the number of processed messages using atomic addition because it is accessed from multiple goroutines. - processed := atomic.AddInt64(&c.processed, 1) - // Check if we should stop processing. - // Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages. - // If we try to stop processing after M message and we have N goroutines then we will process somewhere - // between M and (N + M) messages because the N goroutines could all try to process what they think will be the - // last message. We could be more careful here but this is good enough for getting rough benchmarks. - if c.StopProcessingAfter != nil && processed >= int64(*c.StopProcessingAfter) { - return common.ErrShutdown - } - return nil -} - -func (c *Consumer) shutdown() { - if c.ShutdownCallback != nil { - c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", c.processed)) - } -} - -// logError is a convenience method for logging errors. -func (c *Consumer) logError(message *sarama.ConsumerMessage, err error) { - if c.ErrorLogger == nil { - panic(err) - } - c.ErrorLogger.OnError(message, err) -}