Consume outlier room events

This commit is contained in:
Mark Haines 2017-02-06 18:03:27 +00:00
parent 51b7b29c50
commit 9d0b3dc0ee
4 changed files with 121 additions and 9 deletions

View file

@ -2,12 +2,15 @@
package input
import (
"encoding/json"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
sarama "gopkg.in/Shopify/sarama.v1"
)
// A ConsumerDatabase has the storage APIs needed by the consumer.
type ConsumerDatabase interface {
RoomEventDatabase
// PartitionOffsets returns the offsets the consumer has reached for each partition.
PartitionOffsets(topic string) ([]types.PartitionOffset, error)
// SetPartitionOffset records where the consumer has reached for a partition.
@ -87,7 +90,14 @@ func (c *Consumer) Start() error {
func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
defer pc.Close()
for message := range pc.Messages() {
// TODO: Do stuff with message.
var input api.InputRoomEvent
if err := json.Unmarshal(message.Value, &message.Value); err != nil {
c.logError(message, err)
} else {
if err := processRoomEvent(c.DB, input); err != nil {
c.logError(message, err)
}
}
if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
c.logError(message, err)
}

View file

@ -0,0 +1,30 @@
package input
import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// A RoomEventDatabase has the storage APIs needed to store a room event.
type RoomEventDatabase interface {
StoreEvent(event gomatrixserverlib.Event) error
}
func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error {
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(input.Event)
if err != nil {
return err
}
if err := db.StoreEvent(event); err != nil {
return err
}
if input.Kind == api.KindOutlier {
// For outlier events we only need to store the event JSON.
return nil
}
// TODO: Handle the other kinds of input.
panic("Not implemented")
}

View file

@ -37,6 +37,14 @@ func (s *statements) prepare(db *sql.DB) error {
return err
}
if err = s.prepareEvents(db); err != nil {
return err
}
if err = s.prepareEventJSON(db); err != nil {
return err
}
return nil
}
@ -137,15 +145,14 @@ CREATE TABLE IF NOT EXISTS event_types (
-- The string event_type.
event_type TEXT NOT NULL CONSTRAINT event_type_unique UNIQUE
);
INSERT INTO event_types (event_type_nid, event_type) VALUES (
INSERT INTO event_types (event_type_nid, event_type) VALUES
(1, 'm.room.create'),
(2, 'm.room.power_levels'),
(3, 'm.room.join_rules'),
(4, 'm.room.third_party_invite'),
(5, 'm.room.member'),
(6, 'm.room.redaction'),
(7, 'm.room.history_visibility'),
) ON CONFLICT DO NOTHING;
(7, 'm.room.history_visibility') ON CONFLICT DO NOTHING;
`
const insertEventTypeNIDSQL = "" +
@ -199,9 +206,8 @@ CREATE TABLE IF NOT EXISTS event_state_keys (
event_state_key_nid BIGINT PRIMARY KEY DEFAULT nextval('event_state_key_nid_seq'),
event_state_key TEXT NOT NULL CONSTRAINT event_state_key_unique UNIQUE
);
INSERT INTO event_state_keys (event_state_key_nid, event_state_key) VALUES (
(1, '')
) ON CONFLICT DO NOTHING;
INSERT INTO event_state_keys (event_state_key_nid, event_state_key) VALUES
(1, '') ON CONFLICT DO NOTHING;
`
const insertEventStateKeyNIDSQL = "" +
@ -243,9 +249,9 @@ func (s *statements) prepareRooms(db *sql.DB) (err error) {
const roomsSchema = `
CREATE SEQUENCE IF NOT EXISTS room_nid_seq;
CREATE TABLE rooms (
CREATE TABLE IF NOT EXISTS rooms (
-- Local numeric ID for the room.
room_nid BIGINT PRIMARY KEY DEFAULT nextvalue('room_nid_seq'),
room_nid BIGINT PRIMARY KEY DEFAULT nextval('room_nid_seq'),
-- Textual ID for the room.
room_id TEXT NOT NULL CONSTRAINT room_id_unique UNIQUE
);

View file

@ -5,6 +5,7 @@ import (
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
// A Database is used to store room events and stream offsets.
@ -35,3 +36,68 @@ func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, erro
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
return d.statements.upsertPartitionOffset(topic, partition, offset)
}
// StoreEvent implements input.EventDatabase
func (d *Database) StoreEvent(event gomatrixserverlib.Event) error {
var (
roomNID int64
eventTypeNID int64
eventStateKeyNID int64
eventNID int64
err error
)
if roomNID, err = d.assignRoomNID(event.RoomID()); err != nil {
return err
}
if eventTypeNID, err = d.assignEventTypeNID(event.Type()); err != nil {
return err
}
eventStateKey := event.StateKey()
if eventStateKey != nil {
if eventStateKeyNID, err = d.assignStateKeyNID(*eventStateKey); err != nil {
return err
}
}
eventID := event.EventID()
referenceSHA256 := event.EventReference().EventSHA256
if eventNID, err = d.statements.insertEvent(roomNID, eventTypeNID, eventStateKeyNID, eventID, referenceSHA256); err != nil {
return err
}
return d.statements.insertEventJSON(eventNID, event.JSON())
}
func (d *Database) assignRoomNID(roomID string) (roomNID int64, err error) {
if roomNID, err = d.statements.selectRoomNID(roomID); err != nil {
return
}
if roomNID == 0 {
return d.statements.insertRoomNID(roomID)
}
return
}
func (d *Database) assignEventTypeNID(eventType string) (eventTypeNID int64, err error) {
if eventTypeNID, err = d.statements.selectEventTypeNID(eventType); err != nil {
return
}
if eventTypeNID == 0 {
return d.statements.insertEventTypeNID(eventType)
}
return
}
func (d *Database) assignStateKeyNID(eventStateKey string) (eventStateKeyNID int64, err error) {
if eventStateKeyNID, err = d.statements.selectEventStateKeyNID(eventStateKey); err != nil {
return
}
if eventStateKeyNID == 0 {
return d.statements.insertEventTypeNID(eventStateKey)
}
return
}