mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-10 16:33:11 -06:00
Use synchronous HTTP API for writing events to the roomserver
This commit is contained in:
parent
926600c1d0
commit
5a536aaf18
|
|
@ -16,11 +16,9 @@ package main
|
|||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
|
|
@ -33,9 +31,8 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
logDir = os.Getenv("LOG_DIR")
|
||||
configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.")
|
||||
stopProcessingAfter = os.Getenv("STOP_AFTER")
|
||||
logDir = os.Getenv("LOG_DIR")
|
||||
configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.")
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
|
@ -56,49 +53,25 @@ func main() {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
consumer := input.Consumer{
|
||||
ContinualConsumer: common.ContinualConsumer{
|
||||
Topic: string(cfg.Kafka.Topics.InputRoomEvent),
|
||||
Consumer: kafkaConsumer,
|
||||
PartitionStore: db,
|
||||
},
|
||||
DB: db,
|
||||
Producer: kafkaProducer,
|
||||
OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||
}
|
||||
|
||||
if stopProcessingAfter != "" {
|
||||
count, err := strconv.ParseInt(stopProcessingAfter, 10, 64)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
consumer.StopProcessingAfter = &count
|
||||
consumer.ShutdownCallback = func(message string) {
|
||||
fmt.Println("Stopping roomserver", message)
|
||||
os.Exit(0)
|
||||
}
|
||||
}
|
||||
|
||||
if err = consumer.Start(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
queryAPI := query.RoomserverQueryAPI{
|
||||
DB: db,
|
||||
}
|
||||
|
||||
queryAPI.SetupHTTP(http.DefaultServeMux)
|
||||
|
||||
inputAPI := input.RoomserverInputAPI{
|
||||
DB: db,
|
||||
Producer: kafkaProducer,
|
||||
OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||
}
|
||||
|
||||
inputAPI.SetupHTTP(http.DefaultServeMux)
|
||||
|
||||
http.DefaultServeMux.Handle("/metrics", prometheus.Handler())
|
||||
|
||||
log.Info("Started room server on ", cfg.Listen.RoomServer)
|
||||
|
|
|
|||
|
|
@ -23,6 +23,10 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"encoding/json"
|
||||
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/common/test"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -90,7 +94,7 @@ func createDatabase(database string) error {
|
|||
// messages is reached or after a timeout. It kills the command before it returns.
|
||||
// It returns a list of the messages read from the command on success or an error
|
||||
// on failure.
|
||||
func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAPI func()) ([]string, error) {
|
||||
func runAndReadFromTopic(runCmd *exec.Cmd, readyURL string, doInput func(), topic string, count int, checkQueryAPI func()) ([]string, error) {
|
||||
type result struct {
|
||||
// data holds all of stdout on success.
|
||||
data []byte
|
||||
|
|
@ -107,6 +111,11 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP
|
|||
)
|
||||
// Send stderr to our stderr so the user can see any error messages.
|
||||
readCmd.Stderr = os.Stderr
|
||||
|
||||
// Kill both processes before we exit.
|
||||
defer func() { runCmd.Process.Kill() }()
|
||||
defer func() { readCmd.Process.Kill() }()
|
||||
|
||||
// Run the command, read the messages and wait for a timeout in parallel.
|
||||
go func() {
|
||||
// Read all of stdout.
|
||||
|
|
@ -131,14 +140,42 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP
|
|||
time.Sleep(timeout)
|
||||
done <- result{nil, fmt.Errorf("Timeout reading %d messages from topic %q", count, topic)}
|
||||
}()
|
||||
|
||||
ready := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
delay := 10 * time.Millisecond
|
||||
for {
|
||||
time.Sleep(delay)
|
||||
if delay < 100*time.Millisecond {
|
||||
delay *= 2
|
||||
}
|
||||
fmt.Printf("Checking %s\n", readyURL)
|
||||
resp, err := http.Get(readyURL)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode == 200 {
|
||||
break
|
||||
}
|
||||
}
|
||||
ready <- struct{}{}
|
||||
}()
|
||||
|
||||
// Wait for the roomserver to either be read to receive input or for it to
|
||||
// crash.
|
||||
select {
|
||||
case <-ready:
|
||||
case r := <-done:
|
||||
return nil, r.err
|
||||
}
|
||||
|
||||
// Write the input now that the server is running.
|
||||
doInput()
|
||||
|
||||
// Wait for one of the tasks to finsh.
|
||||
r := <-done
|
||||
|
||||
// Kill both processes. We don't check if the processes are running and
|
||||
// we ignore failures since we are just trying to clean up before returning.
|
||||
runCmd.Process.Kill()
|
||||
readCmd.Process.Kill()
|
||||
|
||||
if r.err != nil {
|
||||
return nil, r.err
|
||||
}
|
||||
|
|
@ -153,6 +190,20 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP
|
|||
return lines, nil
|
||||
}
|
||||
|
||||
func writeToRoomServer(input []string, roomserverURL string) error {
|
||||
var request api.InputRoomEventsRequest
|
||||
var response api.InputRoomEventsResponse
|
||||
var err error
|
||||
request.InputRoomEvents = make([]api.InputRoomEvent, len(input))
|
||||
for i := range input {
|
||||
if err = json.Unmarshal([]byte(input[i]), &request.InputRoomEvents[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
x := api.NewRoomserverInputAPIHTTP(roomserverURL, nil)
|
||||
return x.InputRoomEvents(&request, &response)
|
||||
}
|
||||
|
||||
// testRoomserver is used to run integration tests against a single roomserver.
|
||||
// It creates new kafka topics for the input and output of the roomserver.
|
||||
// It writes the input messages to the input kafka topic, formatting each message
|
||||
|
|
@ -176,24 +227,21 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R
|
|||
panic(err)
|
||||
}
|
||||
|
||||
inputTopic := string(cfg.Kafka.Topics.InputRoomEvent)
|
||||
outputTopic := string(cfg.Kafka.Topics.OutputRoomEvent)
|
||||
|
||||
exe.DeleteTopic(inputTopic)
|
||||
if err := exe.CreateTopic(inputTopic); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
exe.DeleteTopic(outputTopic)
|
||||
if err := exe.CreateTopic(outputTopic); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil {
|
||||
if err = createDatabase(testDatabaseName); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = createDatabase(testDatabaseName); err != nil {
|
||||
panic(err)
|
||||
doInput := func() {
|
||||
if err = writeToRoomServer(input, cfg.RoomServerURL()); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
cmd := exec.Command(filepath.Join(filepath.Dir(os.Args[0]), "dendrite-room-server"))
|
||||
|
|
@ -205,7 +253,7 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R
|
|||
cmd.Stderr = os.Stderr
|
||||
cmd.Args = []string{"dendrite-room-server", "--config", filepath.Join(dir, test.ConfigFile)}
|
||||
|
||||
gotOutput, err := runAndReadFromTopic(cmd, outputTopic, len(wantOutput), func() {
|
||||
gotOutput, err := runAndReadFromTopic(cmd, cfg.RoomServerURL()+"/metrics", doInput, outputTopic, len(wantOutput), func() {
|
||||
queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+string(cfg.Listen.RoomServer), nil)
|
||||
checkQueries(queryAPI)
|
||||
})
|
||||
|
|
|
|||
|
|
@ -1,133 +0,0 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package input contains the code that writes
|
||||
package input
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
// A ConsumerDatabase has the storage APIs needed by the consumer.
|
||||
type ConsumerDatabase interface {
|
||||
RoomEventDatabase
|
||||
common.PartitionStorer
|
||||
}
|
||||
|
||||
// An ErrorLogger handles the errors encountered by the consumer.
|
||||
type ErrorLogger interface {
|
||||
OnError(message *sarama.ConsumerMessage, err error)
|
||||
}
|
||||
|
||||
// A Consumer consumes a kafkaesque stream of room events.
|
||||
// The room events are supplied as api.InputRoomEvent structs serialised as JSON.
|
||||
// The events should be valid matrix events.
|
||||
// The events needed to authenticate the event should already be stored on the roomserver.
|
||||
// The events needed to construct the state at the event should already be stored on the roomserver.
|
||||
// If the event is not valid then it will be discarded and an error will be logged.
|
||||
type Consumer struct {
|
||||
ContinualConsumer common.ContinualConsumer
|
||||
// The database used to store the room events.
|
||||
DB ConsumerDatabase
|
||||
Producer sarama.SyncProducer
|
||||
// The kafkaesque topic to output new room events to.
|
||||
// This is the name used in kafka to identify the stream to write events to.
|
||||
OutputRoomEventTopic string
|
||||
// The ErrorLogger for this consumer.
|
||||
// If left as nil then the consumer will panic when it encounters an error
|
||||
ErrorLogger ErrorLogger
|
||||
// If non-nil then the consumer will stop processing messages after this
|
||||
// many messages and will shutdown. Malformed messages are included in the count.
|
||||
StopProcessingAfter *int64
|
||||
// If not-nil then the consumer will call this to shutdown the server.
|
||||
ShutdownCallback func(reason string)
|
||||
// How many messages the consumer has processed.
|
||||
processed int64
|
||||
}
|
||||
|
||||
// WriteOutputRoomEvent implements OutputRoomEventWriter
|
||||
func (c *Consumer) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error {
|
||||
var m sarama.ProducerMessage
|
||||
oe := api.OutputEvent{
|
||||
Type: api.OutputTypeNewRoomEvent,
|
||||
NewRoomEvent: &output,
|
||||
}
|
||||
value, err := json.Marshal(oe)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.Topic = c.OutputRoomEventTopic
|
||||
m.Key = sarama.StringEncoder("")
|
||||
m.Value = sarama.ByteEncoder(value)
|
||||
_, _, err = c.Producer.SendMessage(&m)
|
||||
return err
|
||||
}
|
||||
|
||||
// Start starts the consumer consuming.
|
||||
// Starts up a goroutine for each partition in the kafka stream.
|
||||
// Returns nil once all the goroutines are started.
|
||||
// Returns an error if it can't start consuming for any of the partitions.
|
||||
func (c *Consumer) Start() error {
|
||||
c.ContinualConsumer.ProcessMessage = c.processMessage
|
||||
c.ContinualConsumer.ShutdownCallback = c.shutdown
|
||||
return c.ContinualConsumer.Start()
|
||||
}
|
||||
|
||||
func (c *Consumer) processMessage(message *sarama.ConsumerMessage) error {
|
||||
var input api.InputRoomEvent
|
||||
if err := json.Unmarshal(message.Value, &input); err != nil {
|
||||
// If the message is invalid then log it and move onto the next message in the stream.
|
||||
c.logError(message, err)
|
||||
} else {
|
||||
if err := processRoomEvent(c.DB, c, input); err != nil {
|
||||
// If there was an error processing the message then log it and
|
||||
// move onto the next message in the stream.
|
||||
// TODO: If the error was due to a problem talking to the database
|
||||
// then we shouldn't move onto the next message and we should either
|
||||
// retry processing the message, or panic and kill ourselves.
|
||||
c.logError(message, err)
|
||||
}
|
||||
}
|
||||
// Update the number of processed messages using atomic addition because it is accessed from multiple goroutines.
|
||||
processed := atomic.AddInt64(&c.processed, 1)
|
||||
// Check if we should stop processing.
|
||||
// Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages.
|
||||
// If we try to stop processing after M message and we have N goroutines then we will process somewhere
|
||||
// between M and (N + M) messages because the N goroutines could all try to process what they think will be the
|
||||
// last message. We could be more careful here but this is good enough for getting rough benchmarks.
|
||||
if c.StopProcessingAfter != nil && processed >= int64(*c.StopProcessingAfter) {
|
||||
return common.ErrShutdown
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Consumer) shutdown() {
|
||||
if c.ShutdownCallback != nil {
|
||||
c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", c.processed))
|
||||
}
|
||||
}
|
||||
|
||||
// logError is a convenience method for logging errors.
|
||||
func (c *Consumer) logError(message *sarama.ConsumerMessage, err error) {
|
||||
if c.ErrorLogger == nil {
|
||||
panic(err)
|
||||
}
|
||||
c.ErrorLogger.OnError(message, err)
|
||||
}
|
||||
Loading…
Reference in a new issue