mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-12 09:23:09 -06:00
Merge latest changes from master
This commit is contained in:
commit
40d31b7e73
|
|
@ -52,9 +52,13 @@ media:
|
||||||
kafka:
|
kafka:
|
||||||
# Where the kafka servers are running.
|
# Where the kafka servers are running.
|
||||||
addresses: ["localhost:9092"]
|
addresses: ["localhost:9092"]
|
||||||
|
# Whether to use naffka instead of kafka.
|
||||||
|
# Naffka can only be used when running dendrite as a single monolithic server.
|
||||||
|
# Kafka can be used both with a monolithic server and when running the
|
||||||
|
# components as separate servers.
|
||||||
|
use_naffka: false
|
||||||
# The names of the kafka topics to use.
|
# The names of the kafka topics to use.
|
||||||
topics:
|
topics:
|
||||||
input_room_event: roomserverInput
|
|
||||||
output_room_event: roomserverOutput
|
output_room_event: roomserverOutput
|
||||||
output_client_data: clientapiOutput
|
output_client_data: clientapiOutput
|
||||||
user_updates: userUpdates
|
user_updates: userUpdates
|
||||||
|
|
@ -72,6 +76,7 @@ database:
|
||||||
|
|
||||||
# The TCP host:port pairs to bind the internal HTTP APIs to.
|
# The TCP host:port pairs to bind the internal HTTP APIs to.
|
||||||
# These shouldn't be exposed to the public internet.
|
# These shouldn't be exposed to the public internet.
|
||||||
|
# These aren't needed when running dendrite as a monolithic server.
|
||||||
listen:
|
listen:
|
||||||
room_server: "localhost:7770"
|
room_server: "localhost:7770"
|
||||||
client_api: "localhost:7771"
|
client_api: "localhost:7771"
|
||||||
|
|
|
||||||
|
|
@ -17,12 +17,13 @@ package consumers
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/common/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
|
log "github.com/Sirupsen/logrus"
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -35,12 +36,12 @@ type OutputRoomEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputRoomEvent(cfg *config.Dendrite, store *accounts.Database) (*OutputRoomEvent, error) {
|
func NewOutputRoomEvent(
|
||||||
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
cfg *config.Dendrite,
|
||||||
if err != nil {
|
kafkaConsumer sarama.Consumer,
|
||||||
return nil, err
|
store *accounts.Database,
|
||||||
}
|
queryAPI api.RoomserverQueryAPI,
|
||||||
roomServerURL := cfg.RoomServerURL()
|
) *OutputRoomEvent {
|
||||||
|
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
|
|
@ -50,12 +51,12 @@ func NewOutputRoomEvent(cfg *config.Dendrite, store *accounts.Database) (*Output
|
||||||
s := &OutputRoomEvent{
|
s := &OutputRoomEvent{
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
|
query: queryAPI,
|
||||||
serverName: string(cfg.Matrix.ServerName),
|
serverName: string(cfg.Matrix.ServerName),
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
return s, nil
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
|
|
|
||||||
|
|
@ -28,18 +28,6 @@ type SyncAPIProducer struct {
|
||||||
Producer sarama.SyncProducer
|
Producer sarama.SyncProducer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSyncAPIProducer creates a new SyncAPIProducer
|
|
||||||
func NewSyncAPIProducer(kafkaURIs []string, topic string) (*SyncAPIProducer, error) {
|
|
||||||
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &SyncAPIProducer{
|
|
||||||
Topic: topic,
|
|
||||||
Producer: producer,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendData sends account data to the sync API server
|
// SendData sends account data to the sync API server
|
||||||
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error {
|
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error {
|
||||||
var m sarama.ProducerMessage
|
var m sarama.ProducerMessage
|
||||||
|
|
|
||||||
|
|
@ -34,18 +34,6 @@ type profileUpdate struct {
|
||||||
NewValue string `json:"new_value"` // The attribute's value after the update
|
NewValue string `json:"new_value"` // The attribute's value after the update
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewUserUpdateProducer creates a new UserUpdateProducer
|
|
||||||
func NewUserUpdateProducer(kafkaURIs []string, topic string) (*UserUpdateProducer, error) {
|
|
||||||
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &UserUpdateProducer{
|
|
||||||
Topic: topic,
|
|
||||||
Producer: producer,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendUpdate sends an update using kafka to notify the roomserver of the
|
// SendUpdate sends an update using kafka to notify the roomserver of the
|
||||||
// profile update. Returns an error if the update failed to send.
|
// profile update. Returns an error if the update failed to send.
|
||||||
func (p *UserUpdateProducer) SendUpdate(
|
func (p *UserUpdateProducer) SendUpdate(
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -50,24 +51,28 @@ func main() {
|
||||||
log.Fatalf("Invalid config file: %s", err)
|
log.Fatalf("Invalid config file: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("config: ", cfg)
|
|
||||||
|
|
||||||
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
aliasAPI := api.NewRoomserverAliasAPIHTTP(cfg.RoomServerURL(), nil)
|
aliasAPI := api.NewRoomserverAliasAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
inputAPI := api.NewRoomserverInputAPIHTTP(cfg.RoomServerURL(), nil)
|
inputAPI := api.NewRoomserverInputAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
|
|
||||||
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
|
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
|
||||||
userUpdateProducer, err := producers.NewUserUpdateProducer(
|
|
||||||
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates),
|
kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
|
log.WithFields(log.Fields{
|
||||||
|
log.ErrorKey: err,
|
||||||
|
"addresses": cfg.Kafka.Addresses,
|
||||||
|
}).Panic("Failed to setup kafka producers")
|
||||||
}
|
}
|
||||||
syncProducer, err := producers.NewSyncAPIProducer(
|
|
||||||
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.OutputClientData),
|
userUpdateProducer := &producers.UserUpdateProducer{
|
||||||
)
|
Producer: kafkaProducer,
|
||||||
if err != nil {
|
Topic: string(cfg.Kafka.Topics.UserUpdates),
|
||||||
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
|
}
|
||||||
|
|
||||||
|
syncProducer := &producers.SyncAPIProducer{
|
||||||
|
Producer: kafkaProducer,
|
||||||
|
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
||||||
}
|
}
|
||||||
|
|
||||||
federation := gomatrixserverlib.NewFederationClient(
|
federation := gomatrixserverlib.NewFederationClient(
|
||||||
|
|
@ -90,15 +95,20 @@ func main() {
|
||||||
keyRing := gomatrixserverlib.KeyRing{
|
keyRing := gomatrixserverlib.KeyRing{
|
||||||
KeyFetchers: []gomatrixserverlib.KeyFetcher{
|
KeyFetchers: []gomatrixserverlib.KeyFetcher{
|
||||||
// TODO: Use perspective key fetchers for production.
|
// TODO: Use perspective key fetchers for production.
|
||||||
&gomatrixserverlib.DirectKeyFetcher{federation.Client},
|
&gomatrixserverlib.DirectKeyFetcher{Client: federation.Client},
|
||||||
},
|
},
|
||||||
KeyDatabase: keyDB,
|
KeyDatabase: keyDB,
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer, err := consumers.NewOutputRoomEvent(cfg, accountDB)
|
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create room server consumer: %s", err)
|
log.WithFields(log.Fields{
|
||||||
|
log.ErrorKey: err,
|
||||||
|
"addresses": cfg.Kafka.Addresses,
|
||||||
|
}).Panic("Failed to setup kafka consumers")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, accountDB, queryAPI)
|
||||||
if err = consumer.Start(); err != nil {
|
if err = consumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer")
|
log.Panicf("startup: failed to start room server consumer")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,9 +25,11 @@ import (
|
||||||
"github.com/matrix-org/dendrite/federationsender/consumers"
|
"github.com/matrix-org/dendrite/federationsender/consumers"
|
||||||
"github.com/matrix-org/dendrite/federationsender/queue"
|
"github.com/matrix-org/dendrite/federationsender/queue"
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
||||||
|
|
@ -45,7 +47,15 @@ func main() {
|
||||||
log.Fatalf("Invalid config file: %s", err)
|
log.Fatalf("Invalid config file: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("config: ", cfg)
|
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
log.ErrorKey: err,
|
||||||
|
"addresses": cfg.Kafka.Addresses,
|
||||||
|
}).Panic("Failed to setup kafka consumers")
|
||||||
|
}
|
||||||
|
|
||||||
|
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
|
|
||||||
db, err := storage.NewDatabase(string(cfg.Database.FederationSender))
|
db, err := storage.NewDatabase(string(cfg.Database.FederationSender))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -58,10 +68,7 @@ func main() {
|
||||||
|
|
||||||
queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation)
|
queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation)
|
||||||
|
|
||||||
consumer, err := consumers.NewOutputRoomEvent(cfg, queues, db)
|
consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, queues, db, queryAPI)
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Panicf("startup: failed to create room server consumer")
|
|
||||||
}
|
|
||||||
if err = consumer.Start(); err != nil {
|
if err = consumer.Start(); err != nil {
|
||||||
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/common/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/common/keydb"
|
"github.com/matrix-org/dendrite/common/keydb"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/naffka"
|
||||||
|
|
||||||
mediaapi_routing "github.com/matrix-org/dendrite/mediaapi/routing"
|
mediaapi_routing "github.com/matrix-org/dendrite/mediaapi/routing"
|
||||||
mediaapi_storage "github.com/matrix-org/dendrite/mediaapi/storage"
|
mediaapi_storage "github.com/matrix-org/dendrite/mediaapi/storage"
|
||||||
|
|
@ -72,7 +73,7 @@ func main() {
|
||||||
if *configPath == "" {
|
if *configPath == "" {
|
||||||
log.Fatal("--config must be supplied")
|
log.Fatal("--config must be supplied")
|
||||||
}
|
}
|
||||||
cfg, err := config.Load(*configPath)
|
cfg, err := config.LoadMonolithic(*configPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Invalid config file: %s", err)
|
log.Fatalf("Invalid config file: %s", err)
|
||||||
}
|
}
|
||||||
|
|
@ -80,6 +81,7 @@ func main() {
|
||||||
m := newMonolith(cfg)
|
m := newMonolith(cfg)
|
||||||
m.setupDatabases()
|
m.setupDatabases()
|
||||||
m.setupFederation()
|
m.setupFederation()
|
||||||
|
m.setupKafka()
|
||||||
m.setupRoomServer()
|
m.setupRoomServer()
|
||||||
m.setupProducers()
|
m.setupProducers()
|
||||||
m.setupNotifiers()
|
m.setupNotifiers()
|
||||||
|
|
@ -125,6 +127,9 @@ type monolith struct {
|
||||||
queryAPI *roomserver_query.RoomserverQueryAPI
|
queryAPI *roomserver_query.RoomserverQueryAPI
|
||||||
aliasAPI *roomserver_alias.RoomserverAliasAPI
|
aliasAPI *roomserver_alias.RoomserverAliasAPI
|
||||||
|
|
||||||
|
kafkaConsumer sarama.Consumer
|
||||||
|
kafkaProducer sarama.SyncProducer
|
||||||
|
|
||||||
roomServerProducer *producers.RoomserverProducer
|
roomServerProducer *producers.RoomserverProducer
|
||||||
userUpdateProducer *producers.UserUpdateProducer
|
userUpdateProducer *producers.UserUpdateProducer
|
||||||
syncProducer *producers.SyncAPIProducer
|
syncProducer *producers.SyncAPIProducer
|
||||||
|
|
@ -182,15 +187,39 @@ func (m *monolith) setupFederation() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *monolith) setupRoomServer() {
|
func (m *monolith) setupKafka() {
|
||||||
kafkaProducer, err := sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil)
|
var err error
|
||||||
|
if m.cfg.Kafka.UseNaffka {
|
||||||
|
naff, err := naffka.New(&naffka.MemoryDatabase{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
log.WithFields(log.Fields{
|
||||||
|
log.ErrorKey: err,
|
||||||
|
}).Panic("Failed to setup naffka")
|
||||||
|
}
|
||||||
|
m.kafkaConsumer = naff
|
||||||
|
m.kafkaProducer = naff
|
||||||
|
} else {
|
||||||
|
m.kafkaConsumer, err = sarama.NewConsumer(m.cfg.Kafka.Addresses, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
log.ErrorKey: err,
|
||||||
|
"addresses": m.cfg.Kafka.Addresses,
|
||||||
|
}).Panic("Failed to setup kafka consumers")
|
||||||
|
}
|
||||||
|
m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
log.ErrorKey: err,
|
||||||
|
"addresses": m.cfg.Kafka.Addresses,
|
||||||
|
}).Panic("Failed to setup kafka producers")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *monolith) setupRoomServer() {
|
||||||
m.inputAPI = &roomserver_input.RoomserverInputAPI{
|
m.inputAPI = &roomserver_input.RoomserverInputAPI{
|
||||||
DB: m.roomServerDB,
|
DB: m.roomServerDB,
|
||||||
Producer: kafkaProducer,
|
Producer: m.kafkaProducer,
|
||||||
OutputRoomEventTopic: string(m.cfg.Kafka.Topics.OutputRoomEvent),
|
OutputRoomEventTopic: string(m.cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -207,19 +236,14 @@ func (m *monolith) setupRoomServer() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *monolith) setupProducers() {
|
func (m *monolith) setupProducers() {
|
||||||
var err error
|
|
||||||
m.roomServerProducer = producers.NewRoomserverProducer(m.inputAPI)
|
m.roomServerProducer = producers.NewRoomserverProducer(m.inputAPI)
|
||||||
m.userUpdateProducer, err = producers.NewUserUpdateProducer(
|
m.userUpdateProducer = &producers.UserUpdateProducer{
|
||||||
m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.UserUpdates),
|
Producer: m.kafkaProducer,
|
||||||
)
|
Topic: string(m.cfg.Kafka.Topics.UserUpdates),
|
||||||
if err != nil {
|
|
||||||
log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err)
|
|
||||||
}
|
}
|
||||||
m.syncProducer, err = producers.NewSyncAPIProducer(
|
m.syncProducer = &producers.SyncAPIProducer{
|
||||||
m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.OutputClientData),
|
Producer: m.kafkaProducer,
|
||||||
)
|
Topic: string(m.cfg.Kafka.Topics.OutputClientData),
|
||||||
if err != nil {
|
|
||||||
log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -236,42 +260,34 @@ func (m *monolith) setupNotifiers() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *monolith) setupConsumers() {
|
func (m *monolith) setupConsumers() {
|
||||||
clientAPIConsumer, err := clientapi_consumers.NewOutputRoomEvent(m.cfg, m.accountDB)
|
var err error
|
||||||
if err != nil {
|
|
||||||
log.Panicf("startup: failed to create room server consumer: %s", err)
|
clientAPIConsumer := clientapi_consumers.NewOutputRoomEvent(
|
||||||
}
|
m.cfg, m.kafkaConsumer, m.accountDB, m.queryAPI,
|
||||||
|
)
|
||||||
if err = clientAPIConsumer.Start(); err != nil {
|
if err = clientAPIConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer")
|
log.Panicf("startup: failed to start room server consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
syncAPIRoomConsumer, err := syncapi_consumers.NewOutputRoomEvent(
|
syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent(
|
||||||
m.cfg, m.syncAPINotifier, m.syncAPIDB,
|
m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB, m.queryAPI,
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
log.Panicf("startup: failed to create room server consumer: %s", err)
|
|
||||||
}
|
|
||||||
if err = syncAPIRoomConsumer.Start(); err != nil {
|
if err = syncAPIRoomConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
syncAPIClientConsumer, err := syncapi_consumers.NewOutputClientData(
|
syncAPIClientConsumer := syncapi_consumers.NewOutputClientData(
|
||||||
m.cfg, m.syncAPINotifier, m.syncAPIDB,
|
m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB,
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
log.Panicf("startup: failed to create client API server consumer: %s", err)
|
|
||||||
}
|
|
||||||
if err = syncAPIClientConsumer.Start(); err != nil {
|
if err = syncAPIClientConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
|
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
|
||||||
|
|
||||||
federationSenderRoomConsumer, err := federationsender_consumers.NewOutputRoomEvent(
|
federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent(
|
||||||
m.cfg, federationSenderQueues, m.federationSenderDB,
|
m.cfg, m.kafkaConsumer, federationSenderQueues, m.federationSenderDB, m.queryAPI,
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Panicf("startup: failed to create room server consumer")
|
|
||||||
}
|
|
||||||
if err = federationSenderRoomConsumer.Start(); err != nil {
|
if err = federationSenderRoomConsumer.Start(); err != nil {
|
||||||
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/common/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/consumers"
|
"github.com/matrix-org/dendrite/syncapi/consumers"
|
||||||
"github.com/matrix-org/dendrite/syncapi/routing"
|
"github.com/matrix-org/dendrite/syncapi/routing"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
|
|
@ -31,6 +32,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
||||||
|
|
@ -48,7 +50,7 @@ func main() {
|
||||||
log.Fatalf("Invalid config file: %s", err)
|
log.Fatalf("Invalid config file: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("config: ", cfg)
|
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
|
|
||||||
db, err := storage.NewSyncServerDatabase(string(cfg.Database.SyncAPI))
|
db, err := storage.NewSyncServerDatabase(string(cfg.Database.SyncAPI))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -74,17 +76,20 @@ func main() {
|
||||||
if err = n.Load(db); err != nil {
|
if err = n.Load(db); err != nil {
|
||||||
log.Panicf("startup: failed to set up notifier: %s", err)
|
log.Panicf("startup: failed to set up notifier: %s", err)
|
||||||
}
|
}
|
||||||
roomConsumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
|
|
||||||
|
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create room server consumer: %s", err)
|
log.WithFields(log.Fields{
|
||||||
|
log.ErrorKey: err,
|
||||||
|
"addresses": cfg.Kafka.Addresses,
|
||||||
|
}).Panic("Failed to setup kafka consumers")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, n, db, queryAPI)
|
||||||
if err = roomConsumer.Start(); err != nil {
|
if err = roomConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||||
}
|
}
|
||||||
clientConsumer, err := consumers.NewOutputClientData(cfg, n, db)
|
clientConsumer := consumers.NewOutputClientData(cfg, kafkaConsumer, n, db)
|
||||||
if err != nil {
|
|
||||||
log.Panicf("startup: failed to create client API server consumer: %s", err)
|
|
||||||
}
|
|
||||||
if err = clientConsumer.Start(); err != nil {
|
if err = clientConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -94,6 +94,11 @@ type Dendrite struct {
|
||||||
Kafka struct {
|
Kafka struct {
|
||||||
// A list of kafka addresses to connect to.
|
// A list of kafka addresses to connect to.
|
||||||
Addresses []string `yaml:"addresses"`
|
Addresses []string `yaml:"addresses"`
|
||||||
|
// Whether to use naffka instead of kafka.
|
||||||
|
// Naffka can only be used when running dendrite as a single monolithic server.
|
||||||
|
// Kafka can be used both with a monolithic server and when running the
|
||||||
|
// components as separate servers.
|
||||||
|
UseNaffka bool `yaml:"use_naffka,omitempty"`
|
||||||
// The names of the topics to use when reading and writing from kafka.
|
// The names of the topics to use when reading and writing from kafka.
|
||||||
Topics struct {
|
Topics struct {
|
||||||
// Topic for roomserver/api.OutputRoomEvent events.
|
// Topic for roomserver/api.OutputRoomEvent events.
|
||||||
|
|
@ -173,7 +178,10 @@ type ThumbnailSize struct {
|
||||||
ResizeMethod string `yaml:"method,omitempty"`
|
ResizeMethod string `yaml:"method,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load a yaml config file
|
// Load a yaml config file for a server run as multiple processes.
|
||||||
|
// Checks the config to ensure that it is valid.
|
||||||
|
// The checks are different if the server is run as a monolithic process instead
|
||||||
|
// of being split into multiple components
|
||||||
func Load(configPath string) (*Dendrite, error) {
|
func Load(configPath string) (*Dendrite, error) {
|
||||||
configData, err := ioutil.ReadFile(configPath)
|
configData, err := ioutil.ReadFile(configPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -185,7 +193,27 @@ func Load(configPath string) (*Dendrite, error) {
|
||||||
}
|
}
|
||||||
// Pass the current working directory and ioutil.ReadFile so that they can
|
// Pass the current working directory and ioutil.ReadFile so that they can
|
||||||
// be mocked in the tests
|
// be mocked in the tests
|
||||||
return loadConfig(basePath, configData, ioutil.ReadFile)
|
monolithic := false
|
||||||
|
return loadConfig(basePath, configData, ioutil.ReadFile, monolithic)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadMonolithic loads a yaml config file for a server run as a single monolith.
|
||||||
|
// Checks the config to ensure that it is valid.
|
||||||
|
// The checks are different if the server is run as a monolithic process instead
|
||||||
|
// of being split into multiple components
|
||||||
|
func LoadMonolithic(configPath string) (*Dendrite, error) {
|
||||||
|
configData, err := ioutil.ReadFile(configPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
basePath, err := filepath.Abs(".")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Pass the current working directory and ioutil.ReadFile so that they can
|
||||||
|
// be mocked in the tests
|
||||||
|
monolithic := true
|
||||||
|
return loadConfig(basePath, configData, ioutil.ReadFile, monolithic)
|
||||||
}
|
}
|
||||||
|
|
||||||
// An Error indicates a problem parsing the config.
|
// An Error indicates a problem parsing the config.
|
||||||
|
|
@ -198,6 +226,7 @@ func loadConfig(
|
||||||
basePath string,
|
basePath string,
|
||||||
configData []byte,
|
configData []byte,
|
||||||
readFile func(string) ([]byte, error),
|
readFile func(string) ([]byte, error),
|
||||||
|
monolithic bool,
|
||||||
) (*Dendrite, error) {
|
) (*Dendrite, error) {
|
||||||
var config Dendrite
|
var config Dendrite
|
||||||
var err error
|
var err error
|
||||||
|
|
@ -207,7 +236,7 @@ func loadConfig(
|
||||||
|
|
||||||
config.setDefaults()
|
config.setDefaults()
|
||||||
|
|
||||||
if err = config.check(); err != nil {
|
if err = config.check(monolithic); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -263,7 +292,7 @@ func (e Error) Error() string {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (config *Dendrite) check() error {
|
func (config *Dendrite) check(monolithic bool) error {
|
||||||
var problems []string
|
var problems []string
|
||||||
|
|
||||||
if config.Version != Version {
|
if config.Version != Version {
|
||||||
|
|
@ -301,21 +330,32 @@ func (config *Dendrite) check() error {
|
||||||
checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].width", i), int64(size.Width))
|
checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].width", i), int64(size.Width))
|
||||||
checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].height", i), int64(size.Height))
|
checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].height", i), int64(size.Height))
|
||||||
}
|
}
|
||||||
|
if config.Kafka.UseNaffka {
|
||||||
|
if !monolithic {
|
||||||
|
problems = append(problems, fmt.Sprintf("naffka can only be used in a monolithic server"))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If we aren't using naffka then we need to have at least one kafka
|
||||||
|
// server to talk to.
|
||||||
checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
|
checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
|
||||||
|
}
|
||||||
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
||||||
checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
|
checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
|
||||||
|
checkNotEmpty("kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates))
|
||||||
checkNotEmpty("database.account", string(config.Database.Account))
|
checkNotEmpty("database.account", string(config.Database.Account))
|
||||||
checkNotEmpty("database.device", string(config.Database.Device))
|
checkNotEmpty("database.device", string(config.Database.Device))
|
||||||
checkNotEmpty("database.server_key", string(config.Database.ServerKey))
|
checkNotEmpty("database.server_key", string(config.Database.ServerKey))
|
||||||
checkNotEmpty("database.media_api", string(config.Database.MediaAPI))
|
checkNotEmpty("database.media_api", string(config.Database.MediaAPI))
|
||||||
checkNotEmpty("database.sync_api", string(config.Database.SyncAPI))
|
checkNotEmpty("database.sync_api", string(config.Database.SyncAPI))
|
||||||
checkNotEmpty("database.room_server", string(config.Database.RoomServer))
|
checkNotEmpty("database.room_server", string(config.Database.RoomServer))
|
||||||
|
|
||||||
|
if !monolithic {
|
||||||
checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI))
|
checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI))
|
||||||
checkNotEmpty("listen.client_api", string(config.Listen.ClientAPI))
|
checkNotEmpty("listen.client_api", string(config.Listen.ClientAPI))
|
||||||
checkNotEmpty("listen.federation_api", string(config.Listen.FederationAPI))
|
checkNotEmpty("listen.federation_api", string(config.Listen.FederationAPI))
|
||||||
checkNotEmpty("listen.sync_api", string(config.Listen.SyncAPI))
|
checkNotEmpty("listen.sync_api", string(config.Listen.SyncAPI))
|
||||||
checkNotEmpty("listen.room_server", string(config.Listen.RoomServer))
|
checkNotEmpty("listen.room_server", string(config.Listen.RoomServer))
|
||||||
|
}
|
||||||
|
|
||||||
if problems != nil {
|
if problems != nil {
|
||||||
return Error{problems}
|
return Error{problems}
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ func TestLoadConfigRelative(t *testing.T) {
|
||||||
"/my/config/dir/matrix_key.pem": testKey,
|
"/my/config/dir/matrix_key.pem": testKey,
|
||||||
"/my/config/dir/tls_cert.pem": testCert,
|
"/my/config/dir/tls_cert.pem": testCert,
|
||||||
}.readFile,
|
}.readFile,
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error("failed to load config:", err)
|
t.Error("failed to load config:", err)
|
||||||
|
|
@ -42,9 +43,9 @@ media:
|
||||||
kafka:
|
kafka:
|
||||||
addresses: ["localhost:9092"]
|
addresses: ["localhost:9092"]
|
||||||
topics:
|
topics:
|
||||||
input_room_event: input.room
|
|
||||||
output_room_event: output.room
|
output_room_event: output.room
|
||||||
output_client_data: output.client
|
output_client_data: output.client
|
||||||
|
user_updates: output.user
|
||||||
database:
|
database:
|
||||||
media_api: "postgresql:///media_api"
|
media_api: "postgresql:///media_api"
|
||||||
account: "postgresql:///account"
|
account: "postgresql:///account"
|
||||||
|
|
|
||||||
|
|
@ -83,6 +83,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
|
||||||
// Make this configurable somehow?
|
// Make this configurable somehow?
|
||||||
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
|
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
|
||||||
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
|
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
|
||||||
|
cfg.Kafka.Topics.UserUpdates = "test.user.output"
|
||||||
|
|
||||||
// TODO: Use different databases for the different schemas.
|
// TODO: Use different databases for the different schemas.
|
||||||
// Using the same database for every schema currently works because
|
// Using the same database for every schema currently works because
|
||||||
|
|
|
||||||
|
|
@ -38,13 +38,13 @@ type OutputRoomEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, store *storage.Database) (*OutputRoomEvent, error) {
|
func NewOutputRoomEvent(
|
||||||
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
cfg *config.Dendrite,
|
||||||
if err != nil {
|
kafkaConsumer sarama.Consumer,
|
||||||
return nil, err
|
queues *queue.OutgoingQueues,
|
||||||
}
|
store *storage.Database,
|
||||||
roomServerURL := cfg.RoomServerURL()
|
queryAPI api.RoomserverQueryAPI,
|
||||||
|
) *OutputRoomEvent {
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
|
|
@ -54,11 +54,11 @@ func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, stor
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
queues: queues,
|
queues: queues,
|
||||||
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
|
query: queryAPI,
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
return s, nil
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
|
|
|
||||||
|
|
@ -33,11 +33,12 @@ type OutputClientData struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
|
// NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputClientData, error) {
|
func NewOutputClientData(
|
||||||
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
cfg *config.Dendrite,
|
||||||
if err != nil {
|
kafkaConsumer sarama.Consumer,
|
||||||
return nil, err
|
n *sync.Notifier,
|
||||||
}
|
store *storage.SyncServerDatabase,
|
||||||
|
) *OutputClientData {
|
||||||
|
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
||||||
|
|
@ -51,7 +52,7 @@ func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
return s, nil
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
|
|
|
||||||
|
|
@ -44,12 +44,13 @@ type prevEventRef struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) {
|
func NewOutputRoomEvent(
|
||||||
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
cfg *config.Dendrite,
|
||||||
if err != nil {
|
kafkaConsumer sarama.Consumer,
|
||||||
return nil, err
|
n *sync.Notifier,
|
||||||
}
|
store *storage.SyncServerDatabase,
|
||||||
roomServerURL := cfg.RoomServerURL()
|
queryAPI api.RoomserverQueryAPI,
|
||||||
|
) *OutputRoomEvent {
|
||||||
|
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
|
|
@ -60,11 +61,11 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
|
query: queryAPI,
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
return s, nil
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
|
|
|
||||||
6
vendor/manifest
vendored
6
vendor/manifest
vendored
|
|
@ -101,6 +101,12 @@
|
||||||
"revision": "768a8767051a4aca7f5e41f912954ae04d5f1efb",
|
"revision": "768a8767051a4aca7f5e41f912954ae04d5f1efb",
|
||||||
"branch": "master"
|
"branch": "master"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"importpath": "github.com/matrix-org/naffka",
|
||||||
|
"repository": "https://github.com/matrix-org/naffka",
|
||||||
|
"revision": "d28656e34f96a8eeaab53e3b7678c9ce14af5786",
|
||||||
|
"branch": "master"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"importpath": "github.com/matrix-org/util",
|
"importpath": "github.com/matrix-org/util",
|
||||||
"repository": "https://github.com/matrix-org/util",
|
"repository": "https://github.com/matrix-org/util",
|
||||||
|
|
|
||||||
5
vendor/src/github.com/matrix-org/naffka/README.md
vendored
Normal file
5
vendor/src/github.com/matrix-org/naffka/README.md
vendored
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
# naffka
|
||||||
|
|
||||||
|
Single in-process implementation of the [sarama golang kafka](https://github.com/Shopify/sarama) APIs.
|
||||||
|
|
||||||
|
It's like Kafka, but a bit [naff](https://www.collinsdictionary.com/dictionary/english/naff).
|
||||||
5
vendor/src/github.com/matrix-org/naffka/hooks/install.sh
vendored
Normal file
5
vendor/src/github.com/matrix-org/naffka/hooks/install.sh
vendored
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
#! /bin/bash
|
||||||
|
|
||||||
|
DOT_GIT="$(dirname $0)/../.git"
|
||||||
|
|
||||||
|
ln -s "../../hooks/pre-commit" "$DOT_GIT/hooks/pre-commit"
|
||||||
24
vendor/src/github.com/matrix-org/naffka/hooks/pre-commit
vendored
Normal file
24
vendor/src/github.com/matrix-org/naffka/hooks/pre-commit
vendored
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
#! /bin/bash
|
||||||
|
|
||||||
|
set -eu
|
||||||
|
|
||||||
|
golint ./...
|
||||||
|
misspell -error .
|
||||||
|
|
||||||
|
# gofmt doesn't exit with an error code if the files don't match the expected
|
||||||
|
# format. So we have to run it and see if it outputs anything.
|
||||||
|
if gofmt -l -s . 2>&1 | read
|
||||||
|
then
|
||||||
|
echo "Error: not all code had been formatted with gofmt."
|
||||||
|
echo "Fixing the following files"
|
||||||
|
gofmt -s -w -l .
|
||||||
|
echo
|
||||||
|
echo "Please add them to the commit"
|
||||||
|
git status --short
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
ineffassign .
|
||||||
|
go tool vet --all --shadow .
|
||||||
|
gocyclo -over 16 .
|
||||||
|
go test -timeout 5s . ./...
|
||||||
91
vendor/src/github.com/matrix-org/naffka/memorydatabase.go
vendored
Normal file
91
vendor/src/github.com/matrix-org/naffka/memorydatabase.go
vendored
Normal file
|
|
@ -0,0 +1,91 @@
|
||||||
|
package naffka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A MemoryDatabase stores the message history as arrays in memory.
|
||||||
|
// It can be used to run unit tests.
|
||||||
|
// If the process is stopped then any messages that haven't been
|
||||||
|
// processed by a consumer are lost forever.
|
||||||
|
type MemoryDatabase struct {
|
||||||
|
topicsMutex sync.Mutex
|
||||||
|
topics map[string]*memoryDatabaseTopic
|
||||||
|
}
|
||||||
|
|
||||||
|
type memoryDatabaseTopic struct {
|
||||||
|
messagesMutex sync.Mutex
|
||||||
|
messages []Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *memoryDatabaseTopic) addMessages(msgs []Message) error {
|
||||||
|
t.messagesMutex.Lock()
|
||||||
|
defer t.messagesMutex.Unlock()
|
||||||
|
if int64(len(t.messages)) != msgs[0].Offset {
|
||||||
|
return fmt.Errorf("message offset %d is not immediately after the previous offset %d", msgs[0].Offset, len(t.messages))
|
||||||
|
}
|
||||||
|
t.messages = append(t.messages, msgs...)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getMessages returns the current messages as a slice.
|
||||||
|
// This slice will have it's own copy of the length field so won't be affected
|
||||||
|
// by adding more messages in addMessages.
|
||||||
|
// The slice will share the same backing array with the slice we append new
|
||||||
|
// messages to. It is safe to read the messages in the backing array since we
|
||||||
|
// only append to the slice. It is not safe to write or append to the returned
|
||||||
|
// slice.
|
||||||
|
func (t *memoryDatabaseTopic) getMessages() []Message {
|
||||||
|
t.messagesMutex.Lock()
|
||||||
|
defer t.messagesMutex.Unlock()
|
||||||
|
return t.messages
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MemoryDatabase) getTopic(topicName string) *memoryDatabaseTopic {
|
||||||
|
m.topicsMutex.Lock()
|
||||||
|
defer m.topicsMutex.Unlock()
|
||||||
|
result := m.topics[topicName]
|
||||||
|
if result == nil {
|
||||||
|
result = &memoryDatabaseTopic{}
|
||||||
|
if m.topics == nil {
|
||||||
|
m.topics = map[string]*memoryDatabaseTopic{}
|
||||||
|
}
|
||||||
|
m.topics[topicName] = result
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// StoreMessages implements Database
|
||||||
|
func (m *MemoryDatabase) StoreMessages(topic string, messages []Message) error {
|
||||||
|
if err := m.getTopic(topic).addMessages(messages); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchMessages implements Database
|
||||||
|
func (m *MemoryDatabase) FetchMessages(topic string, startOffset, endOffset int64) ([]Message, error) {
|
||||||
|
messages := m.getTopic(topic).getMessages()
|
||||||
|
if endOffset > int64(len(messages)) {
|
||||||
|
return nil, fmt.Errorf("end offset %d out of range %d", endOffset, len(messages))
|
||||||
|
}
|
||||||
|
if startOffset >= endOffset {
|
||||||
|
return nil, fmt.Errorf("start offset %d greater than or equal to end offset %d", startOffset, endOffset)
|
||||||
|
}
|
||||||
|
if startOffset < -1 {
|
||||||
|
return nil, fmt.Errorf("start offset %d less than -1", startOffset)
|
||||||
|
}
|
||||||
|
return messages[startOffset+1 : endOffset], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxOffsets implements Database
|
||||||
|
func (m *MemoryDatabase) MaxOffsets() (map[string]int64, error) {
|
||||||
|
m.topicsMutex.Lock()
|
||||||
|
defer m.topicsMutex.Unlock()
|
||||||
|
result := map[string]int64{}
|
||||||
|
for name, t := range m.topics {
|
||||||
|
result[name] = int64(len(t.getMessages())) - 1
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
360
vendor/src/github.com/matrix-org/naffka/naffka.go
vendored
Normal file
360
vendor/src/github.com/matrix-org/naffka/naffka.go
vendored
Normal file
|
|
@ -0,0 +1,360 @@
|
||||||
|
package naffka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Naffka is an implementation of the sarama kafka API designed to run within a
|
||||||
|
// single go process. It implements both the sarama.SyncProducer and the
|
||||||
|
// sarama.Consumer interfaces. This means it can act as a drop in replacement
|
||||||
|
// for kafka for testing or single instance deployment.
|
||||||
|
type Naffka struct {
|
||||||
|
db Database
|
||||||
|
topicsMutex sync.Mutex
|
||||||
|
topics map[string]*topic
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new Naffka instance.
|
||||||
|
func New(db Database) (*Naffka, error) {
|
||||||
|
n := &Naffka{db: db, topics: map[string]*topic{}}
|
||||||
|
maxOffsets, err := db.MaxOffsets()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for topicName, offset := range maxOffsets {
|
||||||
|
n.topics[topicName] = &topic{
|
||||||
|
topicName: topicName,
|
||||||
|
nextOffset: offset + 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// A Message is used internally within naffka to store messages.
|
||||||
|
// It is converted to a sarama.ConsumerMessage when exposed to the
|
||||||
|
// public APIs to maintain API compatibility with sarama.
|
||||||
|
type Message struct {
|
||||||
|
Offset int64
|
||||||
|
Key []byte
|
||||||
|
Value []byte
|
||||||
|
Timestamp time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Message) consumerMessage(topic string) *sarama.ConsumerMessage {
|
||||||
|
return &sarama.ConsumerMessage{
|
||||||
|
Topic: topic,
|
||||||
|
Offset: m.Offset,
|
||||||
|
Key: m.Key,
|
||||||
|
Value: m.Value,
|
||||||
|
Timestamp: m.Timestamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// A Database is used to store naffka messages.
|
||||||
|
// Messages are stored so that new consumers can access the full message history.
|
||||||
|
type Database interface {
|
||||||
|
// StoreMessages stores a list of messages.
|
||||||
|
// Every message offset must be unique within each topic.
|
||||||
|
// Messages must be stored monotonically and contiguously for each topic.
|
||||||
|
// So for a given topic the message with offset n+1 is stored after the
|
||||||
|
// the message with offset n.
|
||||||
|
StoreMessages(topic string, messages []Message) error
|
||||||
|
// FetchMessages fetches all messages with an offset greater than but not
|
||||||
|
// including startOffset and less than but not including endOffset.
|
||||||
|
// The range of offsets requested must not overlap with those stored by a
|
||||||
|
// concurrent StoreMessages. The message offsets within the requested range
|
||||||
|
// are contigous. That is FetchMessage("foo", n, m) will only be called
|
||||||
|
// once the messages between n and m have been stored by StoreMessages.
|
||||||
|
// Every call must return at least one message. That is there must be at
|
||||||
|
// least one message between the start and offset.
|
||||||
|
FetchMessages(topic string, startOffset, endOffset int64) ([]Message, error)
|
||||||
|
// MaxOffsets returns the maximum offset for each topic.
|
||||||
|
MaxOffsets() (map[string]int64, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendMessage implements sarama.SyncProducer
|
||||||
|
func (n *Naffka) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
|
||||||
|
err = n.SendMessages([]*sarama.ProducerMessage{msg})
|
||||||
|
return msg.Partition, msg.Offset, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendMessages implements sarama.SyncProducer
|
||||||
|
func (n *Naffka) SendMessages(msgs []*sarama.ProducerMessage) error {
|
||||||
|
byTopic := map[string][]*sarama.ProducerMessage{}
|
||||||
|
for _, msg := range msgs {
|
||||||
|
byTopic[msg.Topic] = append(byTopic[msg.Topic], msg)
|
||||||
|
}
|
||||||
|
var topicNames []string
|
||||||
|
for topicName := range byTopic {
|
||||||
|
topicNames = append(topicNames, topicName)
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
topics := n.getTopics(topicNames)
|
||||||
|
for topicName := range byTopic {
|
||||||
|
if err := topics[topicName].send(now, byTopic[topicName]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Naffka) getTopics(topicNames []string) map[string]*topic {
|
||||||
|
n.topicsMutex.Lock()
|
||||||
|
defer n.topicsMutex.Unlock()
|
||||||
|
result := map[string]*topic{}
|
||||||
|
for _, topicName := range topicNames {
|
||||||
|
t := n.topics[topicName]
|
||||||
|
if t == nil {
|
||||||
|
// If the topic doesn't already exist then create it.
|
||||||
|
t = &topic{db: n.db, topicName: topicName}
|
||||||
|
n.topics[topicName] = t
|
||||||
|
}
|
||||||
|
result[topicName] = t
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// Topics implements sarama.Consumer
|
||||||
|
func (n *Naffka) Topics() ([]string, error) {
|
||||||
|
n.topicsMutex.Lock()
|
||||||
|
defer n.topicsMutex.Unlock()
|
||||||
|
var result []string
|
||||||
|
for topic := range n.topics {
|
||||||
|
result = append(result, topic)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Partitions implements sarama.Consumer
|
||||||
|
func (n *Naffka) Partitions(topic string) ([]int32, error) {
|
||||||
|
// Naffka stores a single partition per topic, so this always returns a single partition ID.
|
||||||
|
return []int32{0}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConsumePartition implements sarama.Consumer
|
||||||
|
func (n *Naffka) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
|
||||||
|
if partition != 0 {
|
||||||
|
return nil, fmt.Errorf("Unknown partition ID %d", partition)
|
||||||
|
}
|
||||||
|
topics := n.getTopics([]string{topic})
|
||||||
|
return topics[topic].consume(offset), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HighWaterMarks implements sarama.Consumer
|
||||||
|
func (n *Naffka) HighWaterMarks() map[string]map[int32]int64 {
|
||||||
|
n.topicsMutex.Lock()
|
||||||
|
defer n.topicsMutex.Unlock()
|
||||||
|
result := map[string]map[int32]int64{}
|
||||||
|
for topicName, topic := range n.topics {
|
||||||
|
result[topicName] = map[int32]int64{
|
||||||
|
0: topic.highwaterMark(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements sarama.SyncProducer and sarama.Consumer
|
||||||
|
func (n *Naffka) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const channelSize = 1024
|
||||||
|
|
||||||
|
type partitionConsumer struct {
|
||||||
|
topic *topic
|
||||||
|
messages chan *sarama.ConsumerMessage
|
||||||
|
// Whether the consumer is ready for new messages or whether it
|
||||||
|
// is catching up on historic messages.
|
||||||
|
// Reads and writes to this field are proctected by the topic mutex.
|
||||||
|
ready bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// AsyncClose implements sarama.PartitionConsumer
|
||||||
|
func (c *partitionConsumer) AsyncClose() {
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements sarama.PartitionConsumer
|
||||||
|
func (c *partitionConsumer) Close() error {
|
||||||
|
// TODO: Add support for performing a clean shutdown of the consumer.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Messages implements sarama.PartitionConsumer
|
||||||
|
func (c *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
|
||||||
|
return c.messages
|
||||||
|
}
|
||||||
|
|
||||||
|
// Errors implements sarama.PartitionConsumer
|
||||||
|
func (c *partitionConsumer) Errors() <-chan *sarama.ConsumerError {
|
||||||
|
// TODO: Add option to pass consumer errors to an errors channel.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HighWaterMarkOffset implements sarama.PartitionConsumer
|
||||||
|
func (c *partitionConsumer) HighWaterMarkOffset() int64 {
|
||||||
|
return c.topic.highwaterMark()
|
||||||
|
}
|
||||||
|
|
||||||
|
// block writes the message to the consumer blocking until the consumer is ready
|
||||||
|
// to add the message to the channel. Once the message is successfully added to
|
||||||
|
// the channel it will catch up by pulling historic messsages from the database.
|
||||||
|
func (c *partitionConsumer) block(cmsg *sarama.ConsumerMessage) {
|
||||||
|
c.messages <- cmsg
|
||||||
|
c.catchup(cmsg.Offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
// catchup reads historic messages from the database until the consumer has caught
|
||||||
|
// up on all the historic messages.
|
||||||
|
func (c *partitionConsumer) catchup(fromOffset int64) {
|
||||||
|
for {
|
||||||
|
// First check if we have caught up.
|
||||||
|
caughtUp, nextOffset := c.topic.hasCaughtUp(c, fromOffset)
|
||||||
|
if caughtUp {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Limit the number of messages we request from the database to be the
|
||||||
|
// capacity of the channel.
|
||||||
|
if nextOffset > fromOffset+int64(cap(c.messages)) {
|
||||||
|
nextOffset = fromOffset + int64(cap(c.messages))
|
||||||
|
}
|
||||||
|
// Fetch the messages from the database.
|
||||||
|
msgs, err := c.topic.db.FetchMessages(c.topic.topicName, fromOffset, nextOffset)
|
||||||
|
if err != nil {
|
||||||
|
// TODO: Add option to write consumer errors to an errors channel
|
||||||
|
// as an alternative to logging the errors.
|
||||||
|
log.Print("Error reading messages: ", err)
|
||||||
|
// Wait before retrying.
|
||||||
|
// TODO: Maybe use an exponentional backoff scheme here.
|
||||||
|
// TODO: This timeout should take account of all the other goroutines
|
||||||
|
// that might be doing the same thing. (If there are a 10000 consumers
|
||||||
|
// then we don't want to end up retrying every millisecond)
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(msgs) == 0 {
|
||||||
|
// This should only happen if the database is corrupted and has lost the
|
||||||
|
// messages between the requested offsets.
|
||||||
|
log.Fatalf("Corrupt database returned no messages between %d and %d", fromOffset, nextOffset)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pass the messages into the consumer channel.
|
||||||
|
// Blocking each write until the channel has enough space for the message.
|
||||||
|
for i := range msgs {
|
||||||
|
c.messages <- msgs[i].consumerMessage(c.topic.topicName)
|
||||||
|
}
|
||||||
|
// Update our the offset for the next loop iteration.
|
||||||
|
fromOffset = msgs[len(msgs)-1].Offset
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type topic struct {
|
||||||
|
db Database
|
||||||
|
topicName string
|
||||||
|
mutex sync.Mutex
|
||||||
|
consumers []*partitionConsumer
|
||||||
|
nextOffset int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *topic) send(now time.Time, pmsgs []*sarama.ProducerMessage) error {
|
||||||
|
var err error
|
||||||
|
// Encode the message keys and values.
|
||||||
|
msgs := make([]Message, len(pmsgs))
|
||||||
|
for i := range msgs {
|
||||||
|
if pmsgs[i].Key != nil {
|
||||||
|
msgs[i].Key, err = pmsgs[i].Key.Encode()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if pmsgs[i].Value != nil {
|
||||||
|
msgs[i].Value, err = pmsgs[i].Value.Encode()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pmsgs[i].Timestamp = now
|
||||||
|
msgs[i].Timestamp = now
|
||||||
|
}
|
||||||
|
// Take the lock before assigning the offsets.
|
||||||
|
t.mutex.Lock()
|
||||||
|
defer t.mutex.Unlock()
|
||||||
|
offset := t.nextOffset
|
||||||
|
for i := range msgs {
|
||||||
|
pmsgs[i].Offset = offset
|
||||||
|
msgs[i].Offset = offset
|
||||||
|
offset++
|
||||||
|
}
|
||||||
|
// Store the messages while we hold the lock.
|
||||||
|
err = t.db.StoreMessages(t.topicName, msgs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.nextOffset = offset
|
||||||
|
|
||||||
|
// Now notify the consumers about the messages.
|
||||||
|
for i := range msgs {
|
||||||
|
cmsg := msgs[i].consumerMessage(t.topicName)
|
||||||
|
for _, c := range t.consumers {
|
||||||
|
if c.ready {
|
||||||
|
select {
|
||||||
|
case c.messages <- cmsg:
|
||||||
|
default:
|
||||||
|
// The consumer wasn't ready to receive a message because
|
||||||
|
// the channel buffer was full.
|
||||||
|
// Fork a goroutine to send the message so that we don't
|
||||||
|
// block sending messages to the other consumers.
|
||||||
|
c.ready = false
|
||||||
|
go c.block(cmsg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *topic) consume(offset int64) *partitionConsumer {
|
||||||
|
t.mutex.Lock()
|
||||||
|
defer t.mutex.Unlock()
|
||||||
|
c := &partitionConsumer{
|
||||||
|
topic: t,
|
||||||
|
}
|
||||||
|
// Handle special offsets.
|
||||||
|
if offset == sarama.OffsetNewest {
|
||||||
|
offset = t.nextOffset
|
||||||
|
}
|
||||||
|
if offset == sarama.OffsetOldest {
|
||||||
|
offset = -1
|
||||||
|
}
|
||||||
|
c.messages = make(chan *sarama.ConsumerMessage, channelSize)
|
||||||
|
t.consumers = append(t.consumers, c)
|
||||||
|
// Start catching up on historic messages in the background.
|
||||||
|
go c.catchup(offset)
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *topic) hasCaughtUp(c *partitionConsumer, offset int64) (bool, int64) {
|
||||||
|
t.mutex.Lock()
|
||||||
|
defer t.mutex.Unlock()
|
||||||
|
// Check if we have caught up while holding a lock on the topic so there
|
||||||
|
// isn't a way for our check to race with a new message being sent on the topic.
|
||||||
|
if offset+1 == t.nextOffset {
|
||||||
|
// We've caught up, the consumer can now receive messages as they are
|
||||||
|
// sent rather than fetching them from the database.
|
||||||
|
c.ready = true
|
||||||
|
return true, t.nextOffset
|
||||||
|
}
|
||||||
|
return false, t.nextOffset
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *topic) highwaterMark() int64 {
|
||||||
|
t.mutex.Lock()
|
||||||
|
defer t.mutex.Unlock()
|
||||||
|
return t.nextOffset
|
||||||
|
}
|
||||||
86
vendor/src/github.com/matrix-org/naffka/naffka_test.go
vendored
Normal file
86
vendor/src/github.com/matrix-org/naffka/naffka_test.go
vendored
Normal file
|
|
@ -0,0 +1,86 @@
|
||||||
|
package naffka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSendAndReceive(t *testing.T) {
|
||||||
|
naffka, err := New(&MemoryDatabase{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
producer := sarama.SyncProducer(naffka)
|
||||||
|
consumer := sarama.Consumer(naffka)
|
||||||
|
const topic = "testTopic"
|
||||||
|
const value = "Hello, World"
|
||||||
|
|
||||||
|
c, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
message := sarama.ProducerMessage{
|
||||||
|
Value: sarama.StringEncoder(value),
|
||||||
|
Topic: topic,
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, _, err = producer.SendMessage(&message); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var result *sarama.ConsumerMessage
|
||||||
|
select {
|
||||||
|
case result = <-c.Messages():
|
||||||
|
case _ = <-time.NewTimer(10 * time.Second).C:
|
||||||
|
t.Fatal("expected to receive a message")
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(result.Value) != value {
|
||||||
|
t.Fatalf("wrong value: wanted %q got %q", value, string(result.Value))
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case result = <-c.Messages():
|
||||||
|
t.Fatal("expected to only receive one message")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDelayedReceive(t *testing.T) {
|
||||||
|
naffka, err := New(&MemoryDatabase{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
producer := sarama.SyncProducer(naffka)
|
||||||
|
consumer := sarama.Consumer(naffka)
|
||||||
|
const topic = "testTopic"
|
||||||
|
const value = "Hello, World"
|
||||||
|
|
||||||
|
message := sarama.ProducerMessage{
|
||||||
|
Value: sarama.StringEncoder(value),
|
||||||
|
Topic: topic,
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, _, err = producer.SendMessage(&message); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var result *sarama.ConsumerMessage
|
||||||
|
select {
|
||||||
|
case result = <-c.Messages():
|
||||||
|
case _ = <-time.NewTimer(10 * time.Second).C:
|
||||||
|
t.Fatal("expected to receive a message")
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(result.Value) != value {
|
||||||
|
t.Fatalf("wrong value: wanted %q got %q", value, string(result.Value))
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue