diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..a11dc9318 --- /dev/null +++ b/.gitignore @@ -0,0 +1,28 @@ +.*.swp + +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +bin +pkg +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go new file mode 100644 index 000000000..34b355617 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go @@ -0,0 +1,33 @@ +// Package api provides the types that are used to communicate with the roomserver. +package api + +const ( + // KindOutlier events fall outside the contiguous event graph. + // We do not have the state for these events. + // These events are state events used to authenticate other events. + // They can become part of the contiguous event graph via backfill. + KindOutlier = 1 + // KindJoin events start a new contiguous event graph. The first event + // in the list must be a m.room.memeber event joining this server to + // the room. This must come with the state at the event. + KindJoin = 2 + // KindNew events extend the contiguous graph going forwards. + // They usually don't need state, but may include state if the + // there was a new event that references an event that we don't + // have a copy of. + KindNew = 3 + // KindBackfill events extend the contiguous graph going backwards. + // They always have state. + KindBackfill = 4 +) + +// InputRoomEvent is a matrix room event to add to the room server database. +type InputRoomEvent struct { + // Whether these events are new, backfilled or outliers. + Kind int + // The event JSON for each event to add. + Event []byte + // Optional list of state events forming the state before this event. + // These state events must have already been persisted. + State []string +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go new file mode 100644 index 000000000..a4ed611e9 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go @@ -0,0 +1,91 @@ +// Package input contains the code that writes +package input + +import ( + "github.com/matrix-org/dendrite/roomserver/types" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// A ConsumerDatabase has the storage APIs needed by the consumer. +type ConsumerDatabase interface { + // PartitionOffsets returns the offsets the consumer has reached for each partition. + PartitionOffsets(topic string) ([]types.PartitionOffset, error) + // SetPartitionOffset records where the consumer has reached for a partition. + SetPartitionOffset(topic string, partition int32, offset int64) error +} + +// An ErrorHandler handles the errors encountered by the consumer. +type ErrorHandler interface { + OnError(err error) +} + +// A Consumer consumes a kafkaesque stream of room events. +type Consumer struct { + // A kafkaesque stream consumer. + Consumer sarama.Consumer + // The database used to store the room events. + DB ConsumerDatabase + // The kafkaesque topic to consume room events from. + RoomEventTopic string + // The ErrorHandler for this consumer. + // If left as nil then the consumer will panic with that error. + ErrorHandler ErrorHandler +} + +// Start starts the consumer consuming. +func (c *Consumer) Start() error { + offsets := map[int32]int64{} + + partitions, err := c.Consumer.Partitions(c.RoomEventTopic) + if err != nil { + return err + } + for _, partition := range partitions { + // Default all the offsets to the beginning of the stream. + offsets[partition] = sarama.OffsetOldest + } + + storedOffsets, err := c.DB.PartitionOffsets(c.RoomEventTopic) + if err != nil { + return err + } + for _, offset := range storedOffsets { + // We've already processed events from this partition so advance the offset to where we got to. + offsets[offset.Partition] = offset.Offset + } + + var partitionConsumers []sarama.PartitionConsumer + for partition, offset := range offsets { + pc, err := c.Consumer.ConsumePartition(c.RoomEventTopic, partition, offset) + if err != nil { + for _, pc := range partitionConsumers { + pc.Close() + } + return err + } + partitionConsumers = append(partitionConsumers, pc) + } + for _, pc := range partitionConsumers { + go c.consumePartition(pc) + } + + return nil +} + +// consumePartition consumes the room events for a single partition of the kafkaesque stream. +func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { + for message := range pc.Messages() { + // Do stuff with message. + if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil { + c.handleError(message, err) + } + } +} + +// handleError is a convenience method for handling errors. +func (c *Consumer) handleError(message *sarama.ConsumerMessage, err error) { + if c.ErrorHandler == nil { + panic(err) + } + c.ErrorHandler.OnError(err) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go new file mode 100644 index 000000000..c37a24ffd --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go @@ -0,0 +1,42 @@ +package main + +import ( + "fmt" + "github.com/matrix-org/dendrite/roomserver/input" + "github.com/matrix-org/dendrite/roomserver/storage" + sarama "gopkg.in/Shopify/sarama.v1" + "os" + "strings" +) + +var ( + database = os.Getenv("DATABASE") + kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",") + roomEventTopic = os.Getenv("TOPIC_ROOM_EVENT") +) + +func main() { + db, err := storage.Open(database) + if err != nil { + panic(err) + } + + kafkaConsumer, err := sarama.NewConsumer(kafkaURIs, nil) + if err != nil { + panic(err) + } + + consumer := input.Consumer{ + Consumer: kafkaConsumer, + DB: db, + RoomEventTopic: roomEventTopic, + } + + if err = consumer.Start(); err != nil { + panic(err) + } + + fmt.Println("Started roomserver") + + select {} +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go new file mode 100644 index 000000000..6d28facbe --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -0,0 +1,70 @@ +package storage + +import ( + "database/sql" + "github.com/matrix-org/dendrite/roomserver/types" +) + +type stmts struct { + selectPartitionOffsetsStmt *sql.Stmt + upsertPartitionOffsetStmt *sql.Stmt +} + +func (s *stmts) prepare(db *sql.DB) error { + var err error + + _, err = db.Exec(partitionOffsetsSchema) + if err != nil { + return err + } + + if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil { + return err + } + if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil { + return err + } + return nil +} + +const partitionOffsetsSchema = ` +-- The offsets that the server has processed up to. +CREATE TABLE IF NOT EXISTS partition_offsets ( + -- The name of the topic. + topic TEXT NOT NULL, + -- The 32-bit partition ID + partition INTEGER NOT NULL, + -- The 64-bit offset. + partition_offset BIGINT NOT NULL, + CONSTRAINT topic_partition_unique UNIQUE (topic, partition) +); +` + +const selectPartitionOffsetsSQL = "" + + "SELECT partition, partition_offset FROM partition_offsets WHERE topic = $1" + +const upsertPartitionOffsetsSQL = "" + + "INSERT INTO partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" + + " ON CONFLICT ON CONSTRAINT topic_partition_unique" + + " DO UPDATE SET partition_offset = $3" + +func (s *stmts) selectPartitionOffsets(topic string) ([]types.PartitionOffset, error) { + rows, err := s.selectPartitionOffsetsStmt.Query(topic) + if err != nil { + return nil, err + } + defer rows.Close() + var results []types.PartitionOffset + for rows.Next() { + var offset types.PartitionOffset + if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { + return nil, err + } + } + return results, nil +} + +func (s *stmts) upsertPartitionOffset(topic string, partition int32, offset int64) error { + _, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset) + return err +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go new file mode 100644 index 000000000..cdf173bba --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -0,0 +1,37 @@ +package storage + +import ( + "database/sql" + // Import the postgres database driver. + _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/roomserver/types" +) + +// A Database is used to store room events and stream offsets. +type Database struct { + stmts stmts + db *sql.DB +} + +// Open a postgres database. +func Open(dataSourceName string) (*Database, error) { + var d Database + var err error + if d.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = d.stmts.prepare(d.db); err != nil { + return nil, err + } + return &d, nil +} + +// PartitionOffsets implements input.ConsumerDatabase +func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, error) { + return d.stmts.selectPartitionOffsets(topic) +} + +// SetPartitionOffset implements input.ConsumerDatabase +func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { + return d.stmts.upsertPartitionOffset(topic, partition, offset) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go new file mode 100644 index 000000000..6ab4092a8 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -0,0 +1,24 @@ +// Package types provides the types that are used internally within the roomserver. +package types + +// A PartitionOffset is the offset into a partition of the input log. +type PartitionOffset struct { + // The ID of the partition. + Partition int32 + // The offset into the partition. + Offset int64 +} + +// An InvalidInput is a problem that was encountered when processing input. +type InvalidInput struct { + // The topic the input was read from. + Topic string + // The partition the input was read from. + Partition int32 + // The offset in the partition the input was read from. + Offset int64 + // The value that errored. + Value []byte + // The error itself + Error string +}