From 9d0b3dc0ee8738ddce2213a3741c293dccdfe15e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 6 Feb 2017 18:03:27 +0000 Subject: [PATCH] Consume outlier room events --- .../dendrite/roomserver/input/consumer.go | 12 +++- .../dendrite/roomserver/input/events.go | 30 +++++++++ .../dendrite/roomserver/storage/sql.go | 22 ++++--- .../dendrite/roomserver/storage/storage.go | 66 +++++++++++++++++++ 4 files changed, 121 insertions(+), 9 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/roomserver/input/events.go diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go index 6d2783f73..9a5cb668d 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go @@ -2,12 +2,15 @@ package input import ( + "encoding/json" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/types" sarama "gopkg.in/Shopify/sarama.v1" ) // A ConsumerDatabase has the storage APIs needed by the consumer. type ConsumerDatabase interface { + RoomEventDatabase // PartitionOffsets returns the offsets the consumer has reached for each partition. PartitionOffsets(topic string) ([]types.PartitionOffset, error) // SetPartitionOffset records where the consumer has reached for a partition. @@ -87,7 +90,14 @@ func (c *Consumer) Start() error { func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { defer pc.Close() for message := range pc.Messages() { - // TODO: Do stuff with message. + var input api.InputRoomEvent + if err := json.Unmarshal(message.Value, &message.Value); err != nil { + c.logError(message, err) + } else { + if err := processRoomEvent(c.DB, input); err != nil { + c.logError(message, err) + } + } if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil { c.logError(message, err) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go new file mode 100644 index 000000000..124bb5478 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -0,0 +1,30 @@ +package input + +import ( + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" +) + +// A RoomEventDatabase has the storage APIs needed to store a room event. +type RoomEventDatabase interface { + StoreEvent(event gomatrixserverlib.Event) error +} + +func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error { + event, err := gomatrixserverlib.NewEventFromUntrustedJSON(input.Event) + if err != nil { + return err + } + + if err := db.StoreEvent(event); err != nil { + return err + } + + if input.Kind == api.KindOutlier { + // For outlier events we only need to store the event JSON. + return nil + } + + // TODO: Handle the other kinds of input. + panic("Not implemented") +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go index f67914593..4efafb02f 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -37,6 +37,14 @@ func (s *statements) prepare(db *sql.DB) error { return err } + if err = s.prepareEvents(db); err != nil { + return err + } + + if err = s.prepareEventJSON(db); err != nil { + return err + } + return nil } @@ -137,15 +145,14 @@ CREATE TABLE IF NOT EXISTS event_types ( -- The string event_type. event_type TEXT NOT NULL CONSTRAINT event_type_unique UNIQUE ); -INSERT INTO event_types (event_type_nid, event_type) VALUES ( +INSERT INTO event_types (event_type_nid, event_type) VALUES (1, 'm.room.create'), (2, 'm.room.power_levels'), (3, 'm.room.join_rules'), (4, 'm.room.third_party_invite'), (5, 'm.room.member'), (6, 'm.room.redaction'), - (7, 'm.room.history_visibility'), -) ON CONFLICT DO NOTHING; + (7, 'm.room.history_visibility') ON CONFLICT DO NOTHING; ` const insertEventTypeNIDSQL = "" + @@ -199,9 +206,8 @@ CREATE TABLE IF NOT EXISTS event_state_keys ( event_state_key_nid BIGINT PRIMARY KEY DEFAULT nextval('event_state_key_nid_seq'), event_state_key TEXT NOT NULL CONSTRAINT event_state_key_unique UNIQUE ); -INSERT INTO event_state_keys (event_state_key_nid, event_state_key) VALUES ( - (1, '') -) ON CONFLICT DO NOTHING; +INSERT INTO event_state_keys (event_state_key_nid, event_state_key) VALUES + (1, '') ON CONFLICT DO NOTHING; ` const insertEventStateKeyNIDSQL = "" + @@ -243,9 +249,9 @@ func (s *statements) prepareRooms(db *sql.DB) (err error) { const roomsSchema = ` CREATE SEQUENCE IF NOT EXISTS room_nid_seq; -CREATE TABLE rooms ( +CREATE TABLE IF NOT EXISTS rooms ( -- Local numeric ID for the room. - room_nid BIGINT PRIMARY KEY DEFAULT nextvalue('room_nid_seq'), + room_nid BIGINT PRIMARY KEY DEFAULT nextval('room_nid_seq'), -- Textual ID for the room. room_id TEXT NOT NULL CONSTRAINT room_id_unique UNIQUE ); diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 2b162a81f..3964f7064 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -5,6 +5,7 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" ) // A Database is used to store room events and stream offsets. @@ -35,3 +36,68 @@ func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, erro func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { return d.statements.upsertPartitionOffset(topic, partition, offset) } + +// StoreEvent implements input.EventDatabase +func (d *Database) StoreEvent(event gomatrixserverlib.Event) error { + var ( + roomNID int64 + eventTypeNID int64 + eventStateKeyNID int64 + eventNID int64 + err error + ) + + if roomNID, err = d.assignRoomNID(event.RoomID()); err != nil { + return err + } + + if eventTypeNID, err = d.assignEventTypeNID(event.Type()); err != nil { + return err + } + + eventStateKey := event.StateKey() + if eventStateKey != nil { + if eventStateKeyNID, err = d.assignStateKeyNID(*eventStateKey); err != nil { + return err + } + } + + eventID := event.EventID() + referenceSHA256 := event.EventReference().EventSHA256 + + if eventNID, err = d.statements.insertEvent(roomNID, eventTypeNID, eventStateKeyNID, eventID, referenceSHA256); err != nil { + return err + } + + return d.statements.insertEventJSON(eventNID, event.JSON()) +} + +func (d *Database) assignRoomNID(roomID string) (roomNID int64, err error) { + if roomNID, err = d.statements.selectRoomNID(roomID); err != nil { + return + } + if roomNID == 0 { + return d.statements.insertRoomNID(roomID) + } + return +} + +func (d *Database) assignEventTypeNID(eventType string) (eventTypeNID int64, err error) { + if eventTypeNID, err = d.statements.selectEventTypeNID(eventType); err != nil { + return + } + if eventTypeNID == 0 { + return d.statements.insertEventTypeNID(eventType) + } + return +} + +func (d *Database) assignStateKeyNID(eventStateKey string) (eventStateKeyNID int64, err error) { + if eventStateKeyNID, err = d.statements.selectEventStateKeyNID(eventStateKey); err != nil { + return + } + if eventStateKeyNID == 0 { + return d.statements.insertEventTypeNID(eventStateKey) + } + return +}