mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 15:03:09 -06:00
s/Roomserver/NewOutputRoomEvent/
This commit is contained in:
parent
3c6a8e80c2
commit
5b468ec165
|
|
@ -81,7 +81,7 @@ func main() {
|
||||||
if err := n.Load(db); err != nil {
|
if err := n.Load(db); err != nil {
|
||||||
log.Panicf("startup: failed to set up notifier: %s", err)
|
log.Panicf("startup: failed to set up notifier: %s", err)
|
||||||
}
|
}
|
||||||
consumer, err := consumers.NewRoomserver(cfg, n, db)
|
consumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create room server consumer: %s", err)
|
log.Panicf("startup: failed to create room server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,15 +28,15 @@ import (
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Roomserver consumes events that originated in the room server.
|
// OutputRoomEvent consumes events that originated in the room server.
|
||||||
type Roomserver struct {
|
type OutputRoomEvent struct {
|
||||||
roomServerConsumer *common.ContinualConsumer
|
roomServerConsumer *common.ContinualConsumer
|
||||||
db *storage.SyncServerDatabase
|
db *storage.SyncServerDatabase
|
||||||
notifier *sync.Notifier
|
notifier *sync.Notifier
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRoomserver creates a new room server consumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||||
func NewRoomserver(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*Roomserver, error) {
|
func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) {
|
||||||
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
|
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -47,7 +47,7 @@ func NewRoomserver(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServer
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
s := &Roomserver{
|
s := &OutputRoomEvent{
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
|
|
@ -58,14 +58,14 @@ func NewRoomserver(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *Roomserver) Start() error {
|
func (s *OutputRoomEvent) Start() error {
|
||||||
return s.roomServerConsumer.Start()
|
return s.roomServerConsumer.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called when the sync server receives a new event from the room server output log.
|
// onMessage is called when the sync server receives a new event from the room server output log.
|
||||||
// It is not safe for this function to be called from multiple goroutines, or else the
|
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||||
// sync stream position may race and be incorrectly calculated.
|
// sync stream position may race and be incorrectly calculated.
|
||||||
func (s *Roomserver) onMessage(msg *sarama.ConsumerMessage) error {
|
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
var output api.OutputRoomEvent
|
var output api.OutputRoomEvent
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue