diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go index 2c50103b4..df9f796f2 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go @@ -3,6 +3,7 @@ package input import ( "encoding/json" + "fmt" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/types" sarama "gopkg.in/Shopify/sarama.v1" @@ -45,6 +46,11 @@ type Consumer struct { // The ErrorLogger for this consumer. // If left as nil then the consumer will panic when it encounters an error ErrorLogger ErrorLogger + // If non-nil then the consumer will stop processing messages after this + // many messages and will shutdown + StopProcessingAfter *int + // If not-nil then the consumer will call this to shutdown the server. + ShutdownCallback func(reason string) } // WriteOutputRoomEvent implements OutputRoomEventWriter @@ -107,7 +113,14 @@ func (c *Consumer) Start() error { // consumePartition consumes the room events for a single partition of the kafkaesque stream. func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { defer pc.Close() + var processed int for message := range pc.Messages() { + if c.StopProcessingAfter != nil && processed >= *c.StopProcessingAfter { + if c.ShutdownCallback != nil { + c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", processed)) + } + return + } var input api.InputRoomEvent if err := json.Unmarshal(message.Value, &input); err != nil { // If the message is invalid then log it and move onto the next message in the stream. @@ -126,6 +139,7 @@ func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { if err := c.DB.SetPartitionOffset(c.InputRoomEventTopic, message.Partition, message.Offset); err != nil { c.logError(message, err) } + processed++ } } diff --git a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go index 689fb48d8..bfcaea146 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go +++ b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go @@ -7,7 +7,9 @@ import ( "github.com/matrix-org/dendrite/roomserver/storage" sarama "gopkg.in/Shopify/sarama.v1" "net/http" + _ "net/http/pprof" "os" + "strconv" "strings" ) @@ -17,6 +19,7 @@ var ( inputRoomEventTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT") outputRoomEventTopic = os.Getenv("TOPIC_OUTPUT_ROOM_EVENT") bindAddr = os.Getenv("BIND_ADDRESS") + stopProcessingAfter = os.Getenv("STOP_AFTER") ) func main() { @@ -43,6 +46,18 @@ func main() { OutputRoomEventTopic: outputRoomEventTopic, } + if stopProcessingAfter != "" { + count, err := strconv.Atoi(stopProcessingAfter) + if err != nil { + panic(err) + } + consumer.StopProcessingAfter = &count + consumer.ShutdownCallback = func(message string) { + fmt.Println("Stopping roomserver", message) + os.Exit(0) + } + } + if err = consumer.Start(); err != nil { panic(err) } @@ -56,5 +71,7 @@ func main() { fmt.Println("Started roomserver") // TODO: Implement clean shutdown. - http.ListenAndServe(bindAddr, nil) + if err := http.ListenAndServe(bindAddr, nil); err != nil { + panic(err) + } }