mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-02-25 05:53:09 -06:00
Review comments
This commit is contained in:
parent
0c5bf46382
commit
eda75719b7
|
|
@ -2,30 +2,33 @@
|
||||||
package api
|
package api
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// KindOutlier events fall outside the contiguous event graph.
|
// KindOutlier event fall outside the contiguous event graph.
|
||||||
// We do not have the state for these events.
|
// We do not have the state for these events.
|
||||||
// These events are state events used to authenticate other events.
|
// These events are state events used to authenticate other events.
|
||||||
// They can become part of the contiguous event graph via backfill.
|
// They can become part of the contiguous event graph via backfill.
|
||||||
KindOutlier = 1
|
KindOutlier = 1
|
||||||
// KindJoin events start a new contiguous event graph. The first event
|
// KindJoin event start a new contiguous event graph. The first event
|
||||||
// in the list must be a m.room.memeber event joining this server to
|
// in the list must be a m.room.memeber event joining this server to
|
||||||
// the room. This must come with the state at the event.
|
// the room. This must come with the state at the event.
|
||||||
|
// If the event is contiguous with the existing graph for the room then
|
||||||
|
// it is treated as a normal new event.
|
||||||
KindJoin = 2
|
KindJoin = 2
|
||||||
// KindNew events extend the contiguous graph going forwards.
|
// KindNew event extend the contiguous graph going forwards.
|
||||||
// They usually don't need state, but may include state if the
|
// They usually don't need state, but may include state if the
|
||||||
// there was a new event that references an event that we don't
|
// there was a new event that references an event that we don't
|
||||||
// have a copy of.
|
// have a copy of.
|
||||||
KindNew = 3
|
KindNew = 3
|
||||||
// KindBackfill events extend the contiguous graph going backwards.
|
// KindBackfill event extend the contiguous graph going backwards.
|
||||||
// They always have state.
|
// They always have state.
|
||||||
KindBackfill = 4
|
KindBackfill = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
// InputRoomEvent is a matrix room event to add to the room server database.
|
// InputRoomEvent is a matrix room event to add to the room server database.
|
||||||
type InputRoomEvent struct {
|
type InputRoomEvent struct {
|
||||||
// Whether these events are new, backfilled or outliers.
|
// Whether this event is new, backfilled or an outlier.
|
||||||
|
// This controls how the event is processed.
|
||||||
Kind int
|
Kind int
|
||||||
// The event JSON for each event to add.
|
// The event JSON for the event to add.
|
||||||
Event []byte
|
Event []byte
|
||||||
// Optional list of state events forming the state before this event.
|
// Optional list of state events forming the state before this event.
|
||||||
// These state events must have already been persisted.
|
// These state events must have already been persisted.
|
||||||
|
|
|
||||||
|
|
@ -14,25 +14,36 @@ type ConsumerDatabase interface {
|
||||||
SetPartitionOffset(topic string, partition int32, offset int64) error
|
SetPartitionOffset(topic string, partition int32, offset int64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// An ErrorHandler handles the errors encountered by the consumer.
|
// An ErrorLogger handles the errors encountered by the consumer.
|
||||||
type ErrorHandler interface {
|
type ErrorLogger interface {
|
||||||
OnError(err error)
|
OnError(message *sarama.ConsumerMessage, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// A Consumer consumes a kafkaesque stream of room events.
|
// A Consumer consumes a kafkaesque stream of room events.
|
||||||
|
// The room events are supplied as api.InputRoomEvent structs serialised as JSON.
|
||||||
|
// The events should be valid matrix events.
|
||||||
|
// The events needed to authenticate the event should already be stored on the roomserver.
|
||||||
|
// The events needed to construct the state at the event should already be stored on the roomserver.
|
||||||
|
// If the event is not valid then it will be discarded and an error will be logged.
|
||||||
type Consumer struct {
|
type Consumer struct {
|
||||||
// A kafkaesque stream consumer.
|
// A kafkaesque stream consumer providing the APIs for talking to the event source.
|
||||||
|
// The interface is taken from a client library for Apache Kafka.
|
||||||
|
// But any equivalent event streaming protocol could be made to implement the same interface.
|
||||||
Consumer sarama.Consumer
|
Consumer sarama.Consumer
|
||||||
// The database used to store the room events.
|
// The database used to store the room events.
|
||||||
DB ConsumerDatabase
|
DB ConsumerDatabase
|
||||||
// The kafkaesque topic to consume room events from.
|
// The kafkaesque topic to consume room events from.
|
||||||
|
// This is the name used in kafka to identify the stream to consume events from.
|
||||||
RoomEventTopic string
|
RoomEventTopic string
|
||||||
// The ErrorHandler for this consumer.
|
// The ErrorLogger for this consumer.
|
||||||
// If left as nil then the consumer will panic with that error.
|
// If left as nil then the consumer will panic when it encounters an error
|
||||||
ErrorHandler ErrorHandler
|
ErrorLogger ErrorLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the consumer consuming.
|
// Start starts the consumer consuming.
|
||||||
|
// Starts up a goroutine for each partition in the kafka stream.
|
||||||
|
// Returns nil once all the goroutines are started.
|
||||||
|
// Returns an error if it can't start consuming for any of the partitions.
|
||||||
func (c *Consumer) Start() error {
|
func (c *Consumer) Start() error {
|
||||||
offsets := map[int32]int64{}
|
offsets := map[int32]int64{}
|
||||||
|
|
||||||
|
|
@ -58,8 +69,8 @@ func (c *Consumer) Start() error {
|
||||||
for partition, offset := range offsets {
|
for partition, offset := range offsets {
|
||||||
pc, err := c.Consumer.ConsumePartition(c.RoomEventTopic, partition, offset)
|
pc, err := c.Consumer.ConsumePartition(c.RoomEventTopic, partition, offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
for _, pc := range partitionConsumers {
|
for _, p := range partitionConsumers {
|
||||||
pc.Close()
|
p.Close()
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -74,18 +85,19 @@ func (c *Consumer) Start() error {
|
||||||
|
|
||||||
// consumePartition consumes the room events for a single partition of the kafkaesque stream.
|
// consumePartition consumes the room events for a single partition of the kafkaesque stream.
|
||||||
func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
|
func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
|
||||||
|
defer pc.Close()
|
||||||
for message := range pc.Messages() {
|
for message := range pc.Messages() {
|
||||||
// Do stuff with message.
|
// TODO: Do stuff with message.
|
||||||
if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
|
if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
|
||||||
c.handleError(message, err)
|
c.logError(message, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleError is a convenience method for handling errors.
|
// logError is a convenience method for logging errors.
|
||||||
func (c *Consumer) handleError(message *sarama.ConsumerMessage, err error) {
|
func (c *Consumer) logError(message *sarama.ConsumerMessage, err error) {
|
||||||
if c.ErrorHandler == nil {
|
if c.ErrorLogger == nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
c.ErrorHandler.OnError(err)
|
c.ErrorLogger.OnError(message, err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,5 +38,7 @@ func main() {
|
||||||
|
|
||||||
fmt.Println("Started roomserver")
|
fmt.Println("Started roomserver")
|
||||||
|
|
||||||
|
// Wait forever.
|
||||||
|
// TODO: Implement clean shutdown.
|
||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,12 +5,12 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type stmts struct {
|
type statements struct {
|
||||||
selectPartitionOffsetsStmt *sql.Stmt
|
selectPartitionOffsetsStmt *sql.Stmt
|
||||||
upsertPartitionOffsetStmt *sql.Stmt
|
upsertPartitionOffsetStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stmts) prepare(db *sql.DB) error {
|
func (s *statements) prepare(db *sql.DB) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
_, err = db.Exec(partitionOffsetsSchema)
|
_, err = db.Exec(partitionOffsetsSchema)
|
||||||
|
|
@ -48,7 +48,7 @@ const upsertPartitionOffsetsSQL = "" +
|
||||||
" ON CONFLICT ON CONSTRAINT topic_partition_unique" +
|
" ON CONFLICT ON CONSTRAINT topic_partition_unique" +
|
||||||
" DO UPDATE SET partition_offset = $3"
|
" DO UPDATE SET partition_offset = $3"
|
||||||
|
|
||||||
func (s *stmts) selectPartitionOffsets(topic string) ([]types.PartitionOffset, error) {
|
func (s *statements) selectPartitionOffsets(topic string) ([]types.PartitionOffset, error) {
|
||||||
rows, err := s.selectPartitionOffsetsStmt.Query(topic)
|
rows, err := s.selectPartitionOffsetsStmt.Query(topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -64,7 +64,7 @@ func (s *stmts) selectPartitionOffsets(topic string) ([]types.PartitionOffset, e
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stmts) upsertPartitionOffset(topic string, partition int32, offset int64) error {
|
func (s *statements) upsertPartitionOffset(topic string, partition int32, offset int64) error {
|
||||||
_, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset)
|
_, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import (
|
||||||
|
|
||||||
// A Database is used to store room events and stream offsets.
|
// A Database is used to store room events and stream offsets.
|
||||||
type Database struct {
|
type Database struct {
|
||||||
stmts stmts
|
statements statements
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -20,7 +20,7 @@ func Open(dataSourceName string) (*Database, error) {
|
||||||
if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
|
if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err = d.stmts.prepare(d.db); err != nil {
|
if err = d.statements.prepare(d.db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &d, nil
|
return &d, nil
|
||||||
|
|
@ -28,10 +28,10 @@ func Open(dataSourceName string) (*Database, error) {
|
||||||
|
|
||||||
// PartitionOffsets implements input.ConsumerDatabase
|
// PartitionOffsets implements input.ConsumerDatabase
|
||||||
func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, error) {
|
func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, error) {
|
||||||
return d.stmts.selectPartitionOffsets(topic)
|
return d.statements.selectPartitionOffsets(topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetPartitionOffset implements input.ConsumerDatabase
|
// SetPartitionOffset implements input.ConsumerDatabase
|
||||||
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
||||||
return d.stmts.upsertPartitionOffset(topic, partition, offset)
|
return d.statements.upsertPartitionOffset(topic, partition, offset)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue