Make roomserver Consumer use the factored out Consumer

This commit is contained in:
Kegan Dougal 2017-03-24 11:55:38 +00:00
parent f04769efb3
commit fb20ec09da
2 changed files with 43 additions and 84 deletions

View file

@ -2,16 +2,18 @@ package main
import ( import (
"fmt" "fmt"
"github.com/matrix-org/dendrite/roomserver/input"
"github.com/matrix-org/dendrite/roomserver/query"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/prometheus/client_golang/prometheus"
sarama "gopkg.in/Shopify/sarama.v1"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/input"
"github.com/matrix-org/dendrite/roomserver/query"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/prometheus/client_golang/prometheus"
sarama "gopkg.in/Shopify/sarama.v1"
) )
var ( var (
@ -43,10 +45,13 @@ func main() {
} }
consumer := input.Consumer{ consumer := input.Consumer{
Consumer: kafkaConsumer, ContinualConsumer: common.ContinualConsumer{
Topic: inputRoomEventTopic,
Consumer: kafkaConsumer,
PartitionStore: db,
},
DB: db, DB: db,
Producer: kafkaProducer, Producer: kafkaProducer,
InputRoomEventTopic: inputRoomEventTopic,
OutputRoomEventTopic: outputRoomEventTopic, OutputRoomEventTopic: outputRoomEventTopic,
} }

View file

@ -14,10 +14,7 @@ import (
// A ConsumerDatabase has the storage APIs needed by the consumer. // A ConsumerDatabase has the storage APIs needed by the consumer.
type ConsumerDatabase interface { type ConsumerDatabase interface {
RoomEventDatabase RoomEventDatabase
// PartitionOffsets returns the offsets the consumer has reached for each partition. common.PartitionStorer
PartitionOffsets(topic string) ([]common.PartitionOffset, error)
// SetPartitionOffset records where the consumer has reached for a partition.
SetPartitionOffset(topic string, partition int32, offset int64) error
} }
// An ErrorLogger handles the errors encountered by the consumer. // An ErrorLogger handles the errors encountered by the consumer.
@ -32,16 +29,10 @@ type ErrorLogger interface {
// The events needed to construct the state at the event should already be stored on the roomserver. // The events needed to construct the state at the event should already be stored on the roomserver.
// If the event is not valid then it will be discarded and an error will be logged. // If the event is not valid then it will be discarded and an error will be logged.
type Consumer struct { type Consumer struct {
// A kafkaesque stream consumer providing the APIs for talking to the event source. ContinualConsumer common.ContinualConsumer
// The interface is taken from a client library for Apache Kafka.
// But any equivalent event streaming protocol could be made to implement the same interface.
Consumer sarama.Consumer
// The database used to store the room events. // The database used to store the room events.
DB ConsumerDatabase DB ConsumerDatabase
Producer sarama.SyncProducer Producer sarama.SyncProducer
// The kafkaesque topic to consume room events from.
// This is the name used in kafka to identify the stream to consume events from.
InputRoomEventTopic string
// The kafkaesque topic to output new room events to. // The kafkaesque topic to output new room events to.
// This is the name used in kafka to identify the stream to write events to. // This is the name used in kafka to identify the stream to write events to.
OutputRoomEventTopic string OutputRoomEventTopic string
@ -76,79 +67,42 @@ func (c *Consumer) WriteOutputRoomEvent(output api.OutputRoomEvent) error {
// Returns nil once all the goroutines are started. // Returns nil once all the goroutines are started.
// Returns an error if it can't start consuming for any of the partitions. // Returns an error if it can't start consuming for any of the partitions.
func (c *Consumer) Start() error { func (c *Consumer) Start() error {
offsets := map[int32]int64{} c.ContinualConsumer.ProcessMessage = c.processMessage
c.ContinualConsumer.ShutdownCallback = c.shutdown
return c.ContinualConsumer.Start()
}
partitions, err := c.Consumer.Partitions(c.InputRoomEventTopic) func (c *Consumer) processMessage(message *sarama.ConsumerMessage) error {
if err != nil { var input api.InputRoomEvent
return err if err := json.Unmarshal(message.Value, &input); err != nil {
} // If the message is invalid then log it and move onto the next message in the stream.
for _, partition := range partitions { c.logError(message, err)
// Default all the offsets to the beginning of the stream. } else {
offsets[partition] = sarama.OffsetOldest if err := processRoomEvent(c.DB, c, input); err != nil {
} // If there was an error processing the message then log it and
// move onto the next message in the stream.
storedOffsets, err := c.DB.PartitionOffsets(c.InputRoomEventTopic) // TODO: If the error was due to a problem talking to the database
if err != nil { // then we shouldn't move onto the next message and we should either
return err // retry processing the message, or panic and kill ourselves.
} c.logError(message, err)
for _, offset := range storedOffsets {
// We've already processed events from this partition so advance the offset to where we got to.
offsets[offset.Partition] = offset.Offset
}
var partitionConsumers []sarama.PartitionConsumer
for partition, offset := range offsets {
pc, err := c.Consumer.ConsumePartition(c.InputRoomEventTopic, partition, offset)
if err != nil {
for _, p := range partitionConsumers {
p.Close()
}
return err
} }
partitionConsumers = append(partitionConsumers, pc)
} }
for _, pc := range partitionConsumers { // Update the number of processed messages using atomic addition because it is accessed from multiple goroutines.
go c.consumePartition(pc) processed := atomic.AddInt64(&c.processed, 1)
// Check if we should stop processing.
// Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages.
// If we try to stop processing after M message and we have N goroutines then we will process somewhere
// between M and (N + M) messages because the N goroutines could all try to process what they think will be the
// last message. We could be more careful here but this is good enough for getting rough benchmarks.
if c.StopProcessingAfter != nil && processed >= int64(*c.StopProcessingAfter) {
return common.ErrShutdown
} }
return nil return nil
} }
// consumePartition consumes the room events for a single partition of the kafkaesque stream. func (c *Consumer) shutdown() {
func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { if c.ShutdownCallback != nil {
defer pc.Close() c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", c.processed))
for message := range pc.Messages() {
var input api.InputRoomEvent
if err := json.Unmarshal(message.Value, &input); err != nil {
// If the message is invalid then log it and move onto the next message in the stream.
c.logError(message, err)
} else {
if err := processRoomEvent(c.DB, c, input); err != nil {
// If there was an error processing the message then log it and
// move onto the next message in the stream.
// TODO: If the error was due to a problem talking to the database
// then we shouldn't move onto the next message and we should either
// retry processing the message, or panic and kill ourselves.
c.logError(message, err)
}
}
// Advance our position in the stream so that we will start at the right position after a restart.
if err := c.DB.SetPartitionOffset(c.InputRoomEventTopic, message.Partition, message.Offset); err != nil {
c.logError(message, err)
}
// Update the number of processed messages using atomic addition because it is accessed from multiple goroutines.
processed := atomic.AddInt64(&c.processed, 1)
// Check if we should stop processing.
// Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages.
// If we try to stop processing after M message and we have N goroutines then we will process somewhere
// between M and (N + M) messages because the N goroutines could all try to process what they think will be the
// last message. We could be more careful here but this is good enough for getting rough benchmarks.
if c.StopProcessingAfter != nil && processed >= int64(*c.StopProcessingAfter) {
if c.ShutdownCallback != nil {
c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", c.processed))
}
return
}
} }
} }