mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-26 08:11:55 -06:00
gb vendor fetch github.com/matrix-org/naffka
This commit is contained in:
parent
2071387f3c
commit
0d894e3da5
6
vendor/manifest
vendored
6
vendor/manifest
vendored
|
@ -101,6 +101,12 @@
|
|||
"revision": "768a8767051a4aca7f5e41f912954ae04d5f1efb",
|
||||
"branch": "master"
|
||||
},
|
||||
{
|
||||
"importpath": "github.com/matrix-org/naffka",
|
||||
"repository": "https://github.com/matrix-org/naffka",
|
||||
"revision": "d28656e34f96a8eeaab53e3b7678c9ce14af5786",
|
||||
"branch": "master"
|
||||
},
|
||||
{
|
||||
"importpath": "github.com/matrix-org/util",
|
||||
"repository": "https://github.com/matrix-org/util",
|
||||
|
|
5
vendor/src/github.com/matrix-org/naffka/README.md
vendored
Normal file
5
vendor/src/github.com/matrix-org/naffka/README.md
vendored
Normal file
|
@ -0,0 +1,5 @@
|
|||
# naffka
|
||||
|
||||
Single in-process implementation of the [sarama golang kafka](https://github.com/Shopify/sarama) APIs.
|
||||
|
||||
It's like Kafka, but a bit [naff](https://www.collinsdictionary.com/dictionary/english/naff).
|
5
vendor/src/github.com/matrix-org/naffka/hooks/install.sh
vendored
Normal file
5
vendor/src/github.com/matrix-org/naffka/hooks/install.sh
vendored
Normal file
|
@ -0,0 +1,5 @@
|
|||
#! /bin/bash
|
||||
|
||||
DOT_GIT="$(dirname $0)/../.git"
|
||||
|
||||
ln -s "../../hooks/pre-commit" "$DOT_GIT/hooks/pre-commit"
|
24
vendor/src/github.com/matrix-org/naffka/hooks/pre-commit
vendored
Normal file
24
vendor/src/github.com/matrix-org/naffka/hooks/pre-commit
vendored
Normal file
|
@ -0,0 +1,24 @@
|
|||
#! /bin/bash
|
||||
|
||||
set -eu
|
||||
|
||||
golint ./...
|
||||
misspell -error .
|
||||
|
||||
# gofmt doesn't exit with an error code if the files don't match the expected
|
||||
# format. So we have to run it and see if it outputs anything.
|
||||
if gofmt -l -s . 2>&1 | read
|
||||
then
|
||||
echo "Error: not all code had been formatted with gofmt."
|
||||
echo "Fixing the following files"
|
||||
gofmt -s -w -l .
|
||||
echo
|
||||
echo "Please add them to the commit"
|
||||
git status --short
|
||||
exit 1
|
||||
fi
|
||||
|
||||
ineffassign .
|
||||
go tool vet --all --shadow .
|
||||
gocyclo -over 16 .
|
||||
go test -timeout 5s . ./...
|
91
vendor/src/github.com/matrix-org/naffka/memorydatabase.go
vendored
Normal file
91
vendor/src/github.com/matrix-org/naffka/memorydatabase.go
vendored
Normal file
|
@ -0,0 +1,91 @@
|
|||
package naffka
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// A MemoryDatabase stores the message history as arrays in memory.
|
||||
// It can be used to run unit tests.
|
||||
// If the process is stopped then any messages that haven't been
|
||||
// processed by a consumer are lost forever.
|
||||
type MemoryDatabase struct {
|
||||
topicsMutex sync.Mutex
|
||||
topics map[string]*memoryDatabaseTopic
|
||||
}
|
||||
|
||||
type memoryDatabaseTopic struct {
|
||||
messagesMutex sync.Mutex
|
||||
messages []Message
|
||||
}
|
||||
|
||||
func (t *memoryDatabaseTopic) addMessages(msgs []Message) error {
|
||||
t.messagesMutex.Lock()
|
||||
defer t.messagesMutex.Unlock()
|
||||
if int64(len(t.messages)) != msgs[0].Offset {
|
||||
return fmt.Errorf("message offset %d is not immediately after the previous offset %d", msgs[0].Offset, len(t.messages))
|
||||
}
|
||||
t.messages = append(t.messages, msgs...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getMessages returns the current messages as a slice.
|
||||
// This slice will have it's own copy of the length field so won't be affected
|
||||
// by adding more messages in addMessages.
|
||||
// The slice will share the same backing array with the slice we append new
|
||||
// messages to. It is safe to read the messages in the backing array since we
|
||||
// only append to the slice. It is not safe to write or append to the returned
|
||||
// slice.
|
||||
func (t *memoryDatabaseTopic) getMessages() []Message {
|
||||
t.messagesMutex.Lock()
|
||||
defer t.messagesMutex.Unlock()
|
||||
return t.messages
|
||||
}
|
||||
|
||||
func (m *MemoryDatabase) getTopic(topicName string) *memoryDatabaseTopic {
|
||||
m.topicsMutex.Lock()
|
||||
defer m.topicsMutex.Unlock()
|
||||
result := m.topics[topicName]
|
||||
if result == nil {
|
||||
result = &memoryDatabaseTopic{}
|
||||
if m.topics == nil {
|
||||
m.topics = map[string]*memoryDatabaseTopic{}
|
||||
}
|
||||
m.topics[topicName] = result
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// StoreMessages implements Database
|
||||
func (m *MemoryDatabase) StoreMessages(topic string, messages []Message) error {
|
||||
if err := m.getTopic(topic).addMessages(messages); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FetchMessages implements Database
|
||||
func (m *MemoryDatabase) FetchMessages(topic string, startOffset, endOffset int64) ([]Message, error) {
|
||||
messages := m.getTopic(topic).getMessages()
|
||||
if endOffset > int64(len(messages)) {
|
||||
return nil, fmt.Errorf("end offset %d out of range %d", endOffset, len(messages))
|
||||
}
|
||||
if startOffset >= endOffset {
|
||||
return nil, fmt.Errorf("start offset %d greater than or equal to end offset %d", startOffset, endOffset)
|
||||
}
|
||||
if startOffset < -1 {
|
||||
return nil, fmt.Errorf("start offset %d less than -1", startOffset)
|
||||
}
|
||||
return messages[startOffset+1 : endOffset], nil
|
||||
}
|
||||
|
||||
// MaxOffsets implements Database
|
||||
func (m *MemoryDatabase) MaxOffsets() (map[string]int64, error) {
|
||||
m.topicsMutex.Lock()
|
||||
defer m.topicsMutex.Unlock()
|
||||
result := map[string]int64{}
|
||||
for name, t := range m.topics {
|
||||
result[name] = int64(len(t.getMessages())) - 1
|
||||
}
|
||||
return result, nil
|
||||
}
|
360
vendor/src/github.com/matrix-org/naffka/naffka.go
vendored
Normal file
360
vendor/src/github.com/matrix-org/naffka/naffka.go
vendored
Normal file
|
@ -0,0 +1,360 @@
|
|||
package naffka
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
// Naffka is an implementation of the sarama kafka API designed to run within a
|
||||
// single go process. It implements both the sarama.SyncProducer and the
|
||||
// sarama.Consumer interfaces. This means it can act as a drop in replacement
|
||||
// for kafka for testing or single instance deployment.
|
||||
type Naffka struct {
|
||||
db Database
|
||||
topicsMutex sync.Mutex
|
||||
topics map[string]*topic
|
||||
}
|
||||
|
||||
// New creates a new Naffka instance.
|
||||
func New(db Database) (*Naffka, error) {
|
||||
n := &Naffka{db: db, topics: map[string]*topic{}}
|
||||
maxOffsets, err := db.MaxOffsets()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for topicName, offset := range maxOffsets {
|
||||
n.topics[topicName] = &topic{
|
||||
topicName: topicName,
|
||||
nextOffset: offset + 1,
|
||||
}
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// A Message is used internally within naffka to store messages.
|
||||
// It is converted to a sarama.ConsumerMessage when exposed to the
|
||||
// public APIs to maintain API compatibility with sarama.
|
||||
type Message struct {
|
||||
Offset int64
|
||||
Key []byte
|
||||
Value []byte
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
func (m *Message) consumerMessage(topic string) *sarama.ConsumerMessage {
|
||||
return &sarama.ConsumerMessage{
|
||||
Topic: topic,
|
||||
Offset: m.Offset,
|
||||
Key: m.Key,
|
||||
Value: m.Value,
|
||||
Timestamp: m.Timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
// A Database is used to store naffka messages.
|
||||
// Messages are stored so that new consumers can access the full message history.
|
||||
type Database interface {
|
||||
// StoreMessages stores a list of messages.
|
||||
// Every message offset must be unique within each topic.
|
||||
// Messages must be stored monotonically and contiguously for each topic.
|
||||
// So for a given topic the message with offset n+1 is stored after the
|
||||
// the message with offset n.
|
||||
StoreMessages(topic string, messages []Message) error
|
||||
// FetchMessages fetches all messages with an offset greater than but not
|
||||
// including startOffset and less than but not including endOffset.
|
||||
// The range of offsets requested must not overlap with those stored by a
|
||||
// concurrent StoreMessages. The message offsets within the requested range
|
||||
// are contigous. That is FetchMessage("foo", n, m) will only be called
|
||||
// once the messages between n and m have been stored by StoreMessages.
|
||||
// Every call must return at least one message. That is there must be at
|
||||
// least one message between the start and offset.
|
||||
FetchMessages(topic string, startOffset, endOffset int64) ([]Message, error)
|
||||
// MaxOffsets returns the maximum offset for each topic.
|
||||
MaxOffsets() (map[string]int64, error)
|
||||
}
|
||||
|
||||
// SendMessage implements sarama.SyncProducer
|
||||
func (n *Naffka) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
|
||||
err = n.SendMessages([]*sarama.ProducerMessage{msg})
|
||||
return msg.Partition, msg.Offset, err
|
||||
}
|
||||
|
||||
// SendMessages implements sarama.SyncProducer
|
||||
func (n *Naffka) SendMessages(msgs []*sarama.ProducerMessage) error {
|
||||
byTopic := map[string][]*sarama.ProducerMessage{}
|
||||
for _, msg := range msgs {
|
||||
byTopic[msg.Topic] = append(byTopic[msg.Topic], msg)
|
||||
}
|
||||
var topicNames []string
|
||||
for topicName := range byTopic {
|
||||
topicNames = append(topicNames, topicName)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
topics := n.getTopics(topicNames)
|
||||
for topicName := range byTopic {
|
||||
if err := topics[topicName].send(now, byTopic[topicName]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Naffka) getTopics(topicNames []string) map[string]*topic {
|
||||
n.topicsMutex.Lock()
|
||||
defer n.topicsMutex.Unlock()
|
||||
result := map[string]*topic{}
|
||||
for _, topicName := range topicNames {
|
||||
t := n.topics[topicName]
|
||||
if t == nil {
|
||||
// If the topic doesn't already exist then create it.
|
||||
t = &topic{db: n.db, topicName: topicName}
|
||||
n.topics[topicName] = t
|
||||
}
|
||||
result[topicName] = t
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Topics implements sarama.Consumer
|
||||
func (n *Naffka) Topics() ([]string, error) {
|
||||
n.topicsMutex.Lock()
|
||||
defer n.topicsMutex.Unlock()
|
||||
var result []string
|
||||
for topic := range n.topics {
|
||||
result = append(result, topic)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Partitions implements sarama.Consumer
|
||||
func (n *Naffka) Partitions(topic string) ([]int32, error) {
|
||||
// Naffka stores a single partition per topic, so this always returns a single partition ID.
|
||||
return []int32{0}, nil
|
||||
}
|
||||
|
||||
// ConsumePartition implements sarama.Consumer
|
||||
func (n *Naffka) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
|
||||
if partition != 0 {
|
||||
return nil, fmt.Errorf("Unknown partition ID %d", partition)
|
||||
}
|
||||
topics := n.getTopics([]string{topic})
|
||||
return topics[topic].consume(offset), nil
|
||||
}
|
||||
|
||||
// HighWaterMarks implements sarama.Consumer
|
||||
func (n *Naffka) HighWaterMarks() map[string]map[int32]int64 {
|
||||
n.topicsMutex.Lock()
|
||||
defer n.topicsMutex.Unlock()
|
||||
result := map[string]map[int32]int64{}
|
||||
for topicName, topic := range n.topics {
|
||||
result[topicName] = map[int32]int64{
|
||||
0: topic.highwaterMark(),
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Close implements sarama.SyncProducer and sarama.Consumer
|
||||
func (n *Naffka) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
const channelSize = 1024
|
||||
|
||||
type partitionConsumer struct {
|
||||
topic *topic
|
||||
messages chan *sarama.ConsumerMessage
|
||||
// Whether the consumer is ready for new messages or whether it
|
||||
// is catching up on historic messages.
|
||||
// Reads and writes to this field are proctected by the topic mutex.
|
||||
ready bool
|
||||
}
|
||||
|
||||
// AsyncClose implements sarama.PartitionConsumer
|
||||
func (c *partitionConsumer) AsyncClose() {
|
||||
}
|
||||
|
||||
// Close implements sarama.PartitionConsumer
|
||||
func (c *partitionConsumer) Close() error {
|
||||
// TODO: Add support for performing a clean shutdown of the consumer.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Messages implements sarama.PartitionConsumer
|
||||
func (c *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
|
||||
return c.messages
|
||||
}
|
||||
|
||||
// Errors implements sarama.PartitionConsumer
|
||||
func (c *partitionConsumer) Errors() <-chan *sarama.ConsumerError {
|
||||
// TODO: Add option to pass consumer errors to an errors channel.
|
||||
return nil
|
||||
}
|
||||
|
||||
// HighWaterMarkOffset implements sarama.PartitionConsumer
|
||||
func (c *partitionConsumer) HighWaterMarkOffset() int64 {
|
||||
return c.topic.highwaterMark()
|
||||
}
|
||||
|
||||
// block writes the message to the consumer blocking until the consumer is ready
|
||||
// to add the message to the channel. Once the message is successfully added to
|
||||
// the channel it will catch up by pulling historic messsages from the database.
|
||||
func (c *partitionConsumer) block(cmsg *sarama.ConsumerMessage) {
|
||||
c.messages <- cmsg
|
||||
c.catchup(cmsg.Offset)
|
||||
}
|
||||
|
||||
// catchup reads historic messages from the database until the consumer has caught
|
||||
// up on all the historic messages.
|
||||
func (c *partitionConsumer) catchup(fromOffset int64) {
|
||||
for {
|
||||
// First check if we have caught up.
|
||||
caughtUp, nextOffset := c.topic.hasCaughtUp(c, fromOffset)
|
||||
if caughtUp {
|
||||
return
|
||||
}
|
||||
// Limit the number of messages we request from the database to be the
|
||||
// capacity of the channel.
|
||||
if nextOffset > fromOffset+int64(cap(c.messages)) {
|
||||
nextOffset = fromOffset + int64(cap(c.messages))
|
||||
}
|
||||
// Fetch the messages from the database.
|
||||
msgs, err := c.topic.db.FetchMessages(c.topic.topicName, fromOffset, nextOffset)
|
||||
if err != nil {
|
||||
// TODO: Add option to write consumer errors to an errors channel
|
||||
// as an alternative to logging the errors.
|
||||
log.Print("Error reading messages: ", err)
|
||||
// Wait before retrying.
|
||||
// TODO: Maybe use an exponentional backoff scheme here.
|
||||
// TODO: This timeout should take account of all the other goroutines
|
||||
// that might be doing the same thing. (If there are a 10000 consumers
|
||||
// then we don't want to end up retrying every millisecond)
|
||||
time.Sleep(10 * time.Second)
|
||||
continue
|
||||
}
|
||||
if len(msgs) == 0 {
|
||||
// This should only happen if the database is corrupted and has lost the
|
||||
// messages between the requested offsets.
|
||||
log.Fatalf("Corrupt database returned no messages between %d and %d", fromOffset, nextOffset)
|
||||
}
|
||||
|
||||
// Pass the messages into the consumer channel.
|
||||
// Blocking each write until the channel has enough space for the message.
|
||||
for i := range msgs {
|
||||
c.messages <- msgs[i].consumerMessage(c.topic.topicName)
|
||||
}
|
||||
// Update our the offset for the next loop iteration.
|
||||
fromOffset = msgs[len(msgs)-1].Offset
|
||||
}
|
||||
}
|
||||
|
||||
type topic struct {
|
||||
db Database
|
||||
topicName string
|
||||
mutex sync.Mutex
|
||||
consumers []*partitionConsumer
|
||||
nextOffset int64
|
||||
}
|
||||
|
||||
func (t *topic) send(now time.Time, pmsgs []*sarama.ProducerMessage) error {
|
||||
var err error
|
||||
// Encode the message keys and values.
|
||||
msgs := make([]Message, len(pmsgs))
|
||||
for i := range msgs {
|
||||
if pmsgs[i].Key != nil {
|
||||
msgs[i].Key, err = pmsgs[i].Key.Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if pmsgs[i].Value != nil {
|
||||
msgs[i].Value, err = pmsgs[i].Value.Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
pmsgs[i].Timestamp = now
|
||||
msgs[i].Timestamp = now
|
||||
}
|
||||
// Take the lock before assigning the offsets.
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
offset := t.nextOffset
|
||||
for i := range msgs {
|
||||
pmsgs[i].Offset = offset
|
||||
msgs[i].Offset = offset
|
||||
offset++
|
||||
}
|
||||
// Store the messages while we hold the lock.
|
||||
err = t.db.StoreMessages(t.topicName, msgs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.nextOffset = offset
|
||||
|
||||
// Now notify the consumers about the messages.
|
||||
for i := range msgs {
|
||||
cmsg := msgs[i].consumerMessage(t.topicName)
|
||||
for _, c := range t.consumers {
|
||||
if c.ready {
|
||||
select {
|
||||
case c.messages <- cmsg:
|
||||
default:
|
||||
// The consumer wasn't ready to receive a message because
|
||||
// the channel buffer was full.
|
||||
// Fork a goroutine to send the message so that we don't
|
||||
// block sending messages to the other consumers.
|
||||
c.ready = false
|
||||
go c.block(cmsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *topic) consume(offset int64) *partitionConsumer {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
c := &partitionConsumer{
|
||||
topic: t,
|
||||
}
|
||||
// Handle special offsets.
|
||||
if offset == sarama.OffsetNewest {
|
||||
offset = t.nextOffset
|
||||
}
|
||||
if offset == sarama.OffsetOldest {
|
||||
offset = -1
|
||||
}
|
||||
c.messages = make(chan *sarama.ConsumerMessage, channelSize)
|
||||
t.consumers = append(t.consumers, c)
|
||||
// Start catching up on historic messages in the background.
|
||||
go c.catchup(offset)
|
||||
return c
|
||||
}
|
||||
|
||||
func (t *topic) hasCaughtUp(c *partitionConsumer, offset int64) (bool, int64) {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
// Check if we have caught up while holding a lock on the topic so there
|
||||
// isn't a way for our check to race with a new message being sent on the topic.
|
||||
if offset+1 == t.nextOffset {
|
||||
// We've caught up, the consumer can now receive messages as they are
|
||||
// sent rather than fetching them from the database.
|
||||
c.ready = true
|
||||
return true, t.nextOffset
|
||||
}
|
||||
return false, t.nextOffset
|
||||
}
|
||||
|
||||
func (t *topic) highwaterMark() int64 {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
return t.nextOffset
|
||||
}
|
86
vendor/src/github.com/matrix-org/naffka/naffka_test.go
vendored
Normal file
86
vendor/src/github.com/matrix-org/naffka/naffka_test.go
vendored
Normal file
|
@ -0,0 +1,86 @@
|
|||
package naffka
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
func TestSendAndReceive(t *testing.T) {
|
||||
naffka, err := New(&MemoryDatabase{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
producer := sarama.SyncProducer(naffka)
|
||||
consumer := sarama.Consumer(naffka)
|
||||
const topic = "testTopic"
|
||||
const value = "Hello, World"
|
||||
|
||||
c, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
message := sarama.ProducerMessage{
|
||||
Value: sarama.StringEncoder(value),
|
||||
Topic: topic,
|
||||
}
|
||||
|
||||
if _, _, err = producer.SendMessage(&message); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var result *sarama.ConsumerMessage
|
||||
select {
|
||||
case result = <-c.Messages():
|
||||
case _ = <-time.NewTimer(10 * time.Second).C:
|
||||
t.Fatal("expected to receive a message")
|
||||
}
|
||||
|
||||
if string(result.Value) != value {
|
||||
t.Fatalf("wrong value: wanted %q got %q", value, string(result.Value))
|
||||
}
|
||||
|
||||
select {
|
||||
case result = <-c.Messages():
|
||||
t.Fatal("expected to only receive one message")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelayedReceive(t *testing.T) {
|
||||
naffka, err := New(&MemoryDatabase{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
producer := sarama.SyncProducer(naffka)
|
||||
consumer := sarama.Consumer(naffka)
|
||||
const topic = "testTopic"
|
||||
const value = "Hello, World"
|
||||
|
||||
message := sarama.ProducerMessage{
|
||||
Value: sarama.StringEncoder(value),
|
||||
Topic: topic,
|
||||
}
|
||||
|
||||
if _, _, err = producer.SendMessage(&message); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
c, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var result *sarama.ConsumerMessage
|
||||
select {
|
||||
case result = <-c.Messages():
|
||||
case _ = <-time.NewTimer(10 * time.Second).C:
|
||||
t.Fatal("expected to receive a message")
|
||||
}
|
||||
|
||||
if string(result.Value) != value {
|
||||
t.Fatalf("wrong value: wanted %q got %q", value, string(result.Value))
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue