mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-06 14:33:10 -06:00
A kafkaesque room event consumer for the roomserver.
Implement the main input loop for the roomserver. It will receive events from a kafkaesque event source and track where it is in the stream. It currently does nothing with the events it consumes.
This commit is contained in:
parent
a78e0cba8e
commit
254fe7b683
28
.gitignore
vendored
Normal file
28
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
.*.swp
|
||||||
|
|
||||||
|
# Compiled Object files, Static and Dynamic libs (Shared Objects)
|
||||||
|
*.o
|
||||||
|
*.a
|
||||||
|
*.so
|
||||||
|
|
||||||
|
# Folders
|
||||||
|
bin
|
||||||
|
pkg
|
||||||
|
_obj
|
||||||
|
_test
|
||||||
|
|
||||||
|
# Architecture specific extensions/prefixes
|
||||||
|
*.[568vq]
|
||||||
|
[568vq].out
|
||||||
|
|
||||||
|
*.cgo1.go
|
||||||
|
*.cgo2.c
|
||||||
|
_cgo_defun.c
|
||||||
|
_cgo_gotypes.go
|
||||||
|
_cgo_export.*
|
||||||
|
|
||||||
|
_testmain.go
|
||||||
|
|
||||||
|
*.exe
|
||||||
|
*.test
|
||||||
|
*.prof
|
||||||
33
src/github.com/matrix-org/dendrite/roomserver/api/input.go
Normal file
33
src/github.com/matrix-org/dendrite/roomserver/api/input.go
Normal file
|
|
@ -0,0 +1,33 @@
|
||||||
|
// Package api provides the types that are used to communicate with the roomserver.
|
||||||
|
package api
|
||||||
|
|
||||||
|
const (
|
||||||
|
// KindOutlier events fall outside the contiguous event graph.
|
||||||
|
// We do not have the state for these events.
|
||||||
|
// These events are state events used to authenticate other events.
|
||||||
|
// They can become part of the contiguous event graph via backfill.
|
||||||
|
KindOutlier = 1
|
||||||
|
// KindJoin events start a new contiguous event graph. The first event
|
||||||
|
// in the list must be a m.room.memeber event joining this server to
|
||||||
|
// the room. This must come with the state at the event.
|
||||||
|
KindJoin = 2
|
||||||
|
// KindNew events extend the contiguous graph going forwards.
|
||||||
|
// They usually don't need state, but may include state if the
|
||||||
|
// there was a new event that references an event that we don't
|
||||||
|
// have a copy of.
|
||||||
|
KindNew = 3
|
||||||
|
// KindBackfill events extend the contiguous graph going backwards.
|
||||||
|
// They always have state.
|
||||||
|
KindBackfill = 4
|
||||||
|
)
|
||||||
|
|
||||||
|
// InputRoomEvent is a matrix room event to add to the room server database.
|
||||||
|
type InputRoomEvent struct {
|
||||||
|
// Whether these events are new, backfilled or outliers.
|
||||||
|
Kind int
|
||||||
|
// The event JSON for each event to add.
|
||||||
|
Event []byte
|
||||||
|
// Optional list of state events forming the state before this event.
|
||||||
|
// These state events must have already been persisted.
|
||||||
|
State []string
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,91 @@
|
||||||
|
// Package input contains the code that writes
|
||||||
|
package input
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A ConsumerDatabase has the storage APIs needed by the consumer.
|
||||||
|
type ConsumerDatabase interface {
|
||||||
|
// PartitionOffsets returns the offsets the consumer has reached for each partition.
|
||||||
|
PartitionOffsets(topic string) ([]types.PartitionOffset, error)
|
||||||
|
// SetPartitionOffset records where the consumer has reached for a partition.
|
||||||
|
SetPartitionOffset(topic string, partition int32, offset int64) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// An ErrorHandler handles the errors encountered by the consumer.
|
||||||
|
type ErrorHandler interface {
|
||||||
|
OnError(err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// A Consumer consumes a kafkaesque stream of room events.
|
||||||
|
type Consumer struct {
|
||||||
|
// A kafkaesque stream consumer.
|
||||||
|
Consumer sarama.Consumer
|
||||||
|
// The database used to store the room events.
|
||||||
|
DB ConsumerDatabase
|
||||||
|
// The kafkaesque topic to consume room events from.
|
||||||
|
RoomEventTopic string
|
||||||
|
// The ErrorHandler for this consumer.
|
||||||
|
// If left as nil then the consumer will panic with that error.
|
||||||
|
ErrorHandler ErrorHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the consumer consuming.
|
||||||
|
func (c *Consumer) Start() error {
|
||||||
|
offsets := map[int32]int64{}
|
||||||
|
|
||||||
|
partitions, err := c.Consumer.Partitions(c.RoomEventTopic)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, partition := range partitions {
|
||||||
|
// Default all the offsets to the beginning of the stream.
|
||||||
|
offsets[partition] = sarama.OffsetOldest
|
||||||
|
}
|
||||||
|
|
||||||
|
storedOffsets, err := c.DB.PartitionOffsets(c.RoomEventTopic)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, offset := range storedOffsets {
|
||||||
|
// We've already processed events from this partition so advance the offset to where we got to.
|
||||||
|
offsets[offset.Partition] = offset.Offset
|
||||||
|
}
|
||||||
|
|
||||||
|
var partitionConsumers []sarama.PartitionConsumer
|
||||||
|
for partition, offset := range offsets {
|
||||||
|
pc, err := c.Consumer.ConsumePartition(c.RoomEventTopic, partition, offset)
|
||||||
|
if err != nil {
|
||||||
|
for _, pc := range partitionConsumers {
|
||||||
|
pc.Close()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
partitionConsumers = append(partitionConsumers, pc)
|
||||||
|
}
|
||||||
|
for _, pc := range partitionConsumers {
|
||||||
|
go c.consumePartition(pc)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// consumePartition consumes the room events for a single partition of the kafkaesque stream.
|
||||||
|
func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
|
||||||
|
for message := range pc.Messages() {
|
||||||
|
// Do stuff with message.
|
||||||
|
if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
|
||||||
|
c.handleError(message, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleError is a convenience method for handling errors.
|
||||||
|
func (c *Consumer) handleError(message *sarama.ConsumerMessage, err error) {
|
||||||
|
if c.ErrorHandler == nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
c.ErrorHandler.OnError(err)
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,42 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/input"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
database = os.Getenv("DATABASE")
|
||||||
|
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
|
||||||
|
roomEventTopic = os.Getenv("TOPIC_ROOM_EVENT")
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
db, err := storage.Open(database)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
kafkaConsumer, err := sarama.NewConsumer(kafkaURIs, nil)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer := input.Consumer{
|
||||||
|
Consumer: kafkaConsumer,
|
||||||
|
DB: db,
|
||||||
|
RoomEventTopic: roomEventTopic,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = consumer.Start(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Started roomserver")
|
||||||
|
|
||||||
|
select {}
|
||||||
|
}
|
||||||
70
src/github.com/matrix-org/dendrite/roomserver/storage/sql.go
Normal file
70
src/github.com/matrix-org/dendrite/roomserver/storage/sql.go
Normal file
|
|
@ -0,0 +1,70 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type stmts struct {
|
||||||
|
selectPartitionOffsetsStmt *sql.Stmt
|
||||||
|
upsertPartitionOffsetStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stmts) prepare(db *sql.DB) error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
_, err = db.Exec(partitionOffsetsSchema)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const partitionOffsetsSchema = `
|
||||||
|
-- The offsets that the server has processed up to.
|
||||||
|
CREATE TABLE IF NOT EXISTS partition_offsets (
|
||||||
|
-- The name of the topic.
|
||||||
|
topic TEXT NOT NULL,
|
||||||
|
-- The 32-bit partition ID
|
||||||
|
partition INTEGER NOT NULL,
|
||||||
|
-- The 64-bit offset.
|
||||||
|
partition_offset BIGINT NOT NULL,
|
||||||
|
CONSTRAINT topic_partition_unique UNIQUE (topic, partition)
|
||||||
|
);
|
||||||
|
`
|
||||||
|
|
||||||
|
const selectPartitionOffsetsSQL = "" +
|
||||||
|
"SELECT partition, partition_offset FROM partition_offsets WHERE topic = $1"
|
||||||
|
|
||||||
|
const upsertPartitionOffsetsSQL = "" +
|
||||||
|
"INSERT INTO partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" +
|
||||||
|
" ON CONFLICT ON CONSTRAINT topic_partition_unique" +
|
||||||
|
" DO UPDATE SET partition_offset = $3"
|
||||||
|
|
||||||
|
func (s *stmts) selectPartitionOffsets(topic string) ([]types.PartitionOffset, error) {
|
||||||
|
rows, err := s.selectPartitionOffsetsStmt.Query(topic)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var results []types.PartitionOffset
|
||||||
|
for rows.Next() {
|
||||||
|
var offset types.PartitionOffset
|
||||||
|
if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stmts) upsertPartitionOffset(topic string, partition int32, offset int64) error {
|
||||||
|
_, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,37 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
// Import the postgres database driver.
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A Database is used to store room events and stream offsets.
|
||||||
|
type Database struct {
|
||||||
|
stmts stmts
|
||||||
|
db *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open a postgres database.
|
||||||
|
func Open(dataSourceName string) (*Database, error) {
|
||||||
|
var d Database
|
||||||
|
var err error
|
||||||
|
if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = d.stmts.prepare(d.db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &d, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PartitionOffsets implements input.ConsumerDatabase
|
||||||
|
func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, error) {
|
||||||
|
return d.stmts.selectPartitionOffsets(topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPartitionOffset implements input.ConsumerDatabase
|
||||||
|
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
||||||
|
return d.stmts.upsertPartitionOffset(topic, partition, offset)
|
||||||
|
}
|
||||||
24
src/github.com/matrix-org/dendrite/roomserver/types/types.go
Normal file
24
src/github.com/matrix-org/dendrite/roomserver/types/types.go
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
// Package types provides the types that are used internally within the roomserver.
|
||||||
|
package types
|
||||||
|
|
||||||
|
// A PartitionOffset is the offset into a partition of the input log.
|
||||||
|
type PartitionOffset struct {
|
||||||
|
// The ID of the partition.
|
||||||
|
Partition int32
|
||||||
|
// The offset into the partition.
|
||||||
|
Offset int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// An InvalidInput is a problem that was encountered when processing input.
|
||||||
|
type InvalidInput struct {
|
||||||
|
// The topic the input was read from.
|
||||||
|
Topic string
|
||||||
|
// The partition the input was read from.
|
||||||
|
Partition int32
|
||||||
|
// The offset in the partition the input was read from.
|
||||||
|
Offset int64
|
||||||
|
// The value that errored.
|
||||||
|
Value []byte
|
||||||
|
// The error itself
|
||||||
|
Error string
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue