Store outlier room events. (#3)
* Storage functions for event types * Consume outlier room events
This commit is contained in:
parent
a45a824f41
commit
600f56b4b8
|
@ -23,6 +23,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// InputRoomEvent is a matrix room event to add to the room server database.
|
// InputRoomEvent is a matrix room event to add to the room server database.
|
||||||
|
// TODO: Implement UnmarshalJSON/MarshalJSON in a way that does something sensible with the event JSON.
|
||||||
type InputRoomEvent struct {
|
type InputRoomEvent struct {
|
||||||
// Whether this event is new, backfilled or an outlier.
|
// Whether this event is new, backfilled or an outlier.
|
||||||
// This controls how the event is processed.
|
// This controls how the event is processed.
|
||||||
|
|
|
@ -2,12 +2,15 @@
|
||||||
package input
|
package input
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A ConsumerDatabase has the storage APIs needed by the consumer.
|
// A ConsumerDatabase has the storage APIs needed by the consumer.
|
||||||
type ConsumerDatabase interface {
|
type ConsumerDatabase interface {
|
||||||
|
RoomEventDatabase
|
||||||
// PartitionOffsets returns the offsets the consumer has reached for each partition.
|
// PartitionOffsets returns the offsets the consumer has reached for each partition.
|
||||||
PartitionOffsets(topic string) ([]types.PartitionOffset, error)
|
PartitionOffsets(topic string) ([]types.PartitionOffset, error)
|
||||||
// SetPartitionOffset records where the consumer has reached for a partition.
|
// SetPartitionOffset records where the consumer has reached for a partition.
|
||||||
|
@ -87,7 +90,21 @@ func (c *Consumer) Start() error {
|
||||||
func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
|
func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
|
||||||
defer pc.Close()
|
defer pc.Close()
|
||||||
for message := range pc.Messages() {
|
for message := range pc.Messages() {
|
||||||
// TODO: Do stuff with message.
|
var input api.InputRoomEvent
|
||||||
|
if err := json.Unmarshal(message.Value, &input); err != nil {
|
||||||
|
// If the message is invalid then log it and move onto the next message in the stream.
|
||||||
|
c.logError(message, err)
|
||||||
|
} else {
|
||||||
|
if err := processRoomEvent(c.DB, input); err != nil {
|
||||||
|
// If there was an error processing the message then log it and
|
||||||
|
// move onto the next message in the stream.
|
||||||
|
// TODO: If the error was due to a problem talking to the database
|
||||||
|
// then we shouldn't move onto the next message and we should either
|
||||||
|
// retry processing the message, or panic and kill ourselves.
|
||||||
|
c.logError(message, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Advance our position in the stream so that we will start at the right position after a restart.
|
||||||
if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
|
if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
|
||||||
c.logError(message, err)
|
c.logError(message, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
package input
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A RoomEventDatabase has the storage APIs needed to store a room event.
|
||||||
|
type RoomEventDatabase interface {
|
||||||
|
StoreEvent(event gomatrixserverlib.Event) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error {
|
||||||
|
// Parse and validate the event JSON
|
||||||
|
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(input.Event)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.StoreEvent(event); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO:
|
||||||
|
// * Check that the event passes authentication checks.
|
||||||
|
|
||||||
|
if input.Kind == api.KindOutlier {
|
||||||
|
// For outliers we can stop after we've stored the event itself as it
|
||||||
|
// doesn't have any associated state to store and we don't need to
|
||||||
|
// notify anyone about it.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO:
|
||||||
|
// * Calcuate the state at the event if necessary.
|
||||||
|
// * Store the state at the event.
|
||||||
|
// * Update the extremities of the event graph for the room
|
||||||
|
// * Caculate the new current state for the room if the forward extremities have changed.
|
||||||
|
// * Work out the delta between the new current state and the previous current state.
|
||||||
|
// * Work out the visibility of the event.
|
||||||
|
// * Write a message to the output logs containing:
|
||||||
|
// - The event itself
|
||||||
|
// - The visiblity of the event, i.e. who is allowed to see the event.
|
||||||
|
// - The changes to the current state of the room.
|
||||||
|
panic("Not implemented")
|
||||||
|
}
|
|
@ -8,23 +8,58 @@ import (
|
||||||
type statements struct {
|
type statements struct {
|
||||||
selectPartitionOffsetsStmt *sql.Stmt
|
selectPartitionOffsetsStmt *sql.Stmt
|
||||||
upsertPartitionOffsetStmt *sql.Stmt
|
upsertPartitionOffsetStmt *sql.Stmt
|
||||||
|
insertEventTypeNIDStmt *sql.Stmt
|
||||||
|
selectEventTypeNIDStmt *sql.Stmt
|
||||||
|
insertEventStateKeyNIDStmt *sql.Stmt
|
||||||
|
selectEventStateKeyNIDStmt *sql.Stmt
|
||||||
|
insertRoomNIDStmt *sql.Stmt
|
||||||
|
selectRoomNIDStmt *sql.Stmt
|
||||||
|
insertEventStmt *sql.Stmt
|
||||||
|
insertEventJSONStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *statements) prepare(db *sql.DB) error {
|
func (s *statements) prepare(db *sql.DB) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
_, err = db.Exec(partitionOffsetsSchema)
|
if err = s.preparePartitionOffsets(db); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil {
|
if err = s.prepareEventTypes(db); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = s.prepareEventStateKeys(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = s.prepareRooms(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = s.prepareEvents(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = s.prepareEventJSON(db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *statements) preparePartitionOffsets(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(partitionOffsetsSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil {
|
if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil {
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
const partitionOffsetsSchema = `
|
const partitionOffsetsSchema = `
|
||||||
|
@ -68,3 +103,276 @@ func (s *statements) upsertPartitionOffset(topic string, partition int32, offset
|
||||||
_, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset)
|
_, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *statements) prepareEventTypes(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(eventTypesSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertEventTypeNIDStmt, err = db.Prepare(insertEventTypeNIDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectEventTypeNIDStmt, err = db.Prepare(selectEventTypeNIDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const eventTypesSchema = `
|
||||||
|
-- Numeric versions of the event "type"s. Event types tend to be taken from a
|
||||||
|
-- small common pool. Assigning each a numeric ID should reduce the amount of
|
||||||
|
-- data that needs to be stored and fetched from the database.
|
||||||
|
-- It also means that many operations can work with int64 arrays rather than
|
||||||
|
-- string arrays which may help reduce GC pressure.
|
||||||
|
-- Well known event types are pre-assigned numeric IDs:
|
||||||
|
-- 1 -> m.room.create
|
||||||
|
-- 2 -> m.room.power_levels
|
||||||
|
-- 3 -> m.room.join_rules
|
||||||
|
-- 4 -> m.room.third_party_invite
|
||||||
|
-- 5 -> m.room.member
|
||||||
|
-- 6 -> m.room.redaction
|
||||||
|
-- 7 -> m.room.history_visibility
|
||||||
|
-- Picking well-known numeric IDs for the events types that require special
|
||||||
|
-- attention during state conflict resolution means that we write that code
|
||||||
|
-- using numeric constants.
|
||||||
|
-- It also means that the numeric IDs for common event types should be
|
||||||
|
-- consistent between different instances which might make ad-hoc debugging
|
||||||
|
-- easier.
|
||||||
|
-- Other event types are automatically assigned numeric IDs starting from 2**16.
|
||||||
|
-- This leaves room to add more pre-assigned numeric IDs and clearly separates
|
||||||
|
-- the automatically assigned IDs from the pre-assigned IDs.
|
||||||
|
CREATE SEQUENCE IF NOT EXISTS event_type_nid_seq START 65536;
|
||||||
|
CREATE TABLE IF NOT EXISTS event_types (
|
||||||
|
-- Local numeric ID for the event type.
|
||||||
|
event_type_nid BIGINT PRIMARY KEY DEFAULT nextval('event_type_nid_seq'),
|
||||||
|
-- The string event_type.
|
||||||
|
event_type TEXT NOT NULL CONSTRAINT event_type_unique UNIQUE
|
||||||
|
);
|
||||||
|
INSERT INTO event_types (event_type_nid, event_type) VALUES
|
||||||
|
(1, 'm.room.create'),
|
||||||
|
(2, 'm.room.power_levels'),
|
||||||
|
(3, 'm.room.join_rules'),
|
||||||
|
(4, 'm.room.third_party_invite'),
|
||||||
|
(5, 'm.room.member'),
|
||||||
|
(6, 'm.room.redaction'),
|
||||||
|
(7, 'm.room.history_visibility') ON CONFLICT DO NOTHING;
|
||||||
|
`
|
||||||
|
|
||||||
|
// Assign a new numeric event type ID.
|
||||||
|
// The usual case is that the event type is not in the database.
|
||||||
|
// In that case the ID will be assigned using the next value from the sequence.
|
||||||
|
// We use `RETURNING` to tell postgres to return the assigned ID.
|
||||||
|
// But it's possible that the type was added in a query that raced with us.
|
||||||
|
// This will result in a conflict on the event_type_unique constraint.
|
||||||
|
// We peform a update that does nothing rather that doing nothing at all because
|
||||||
|
// postgres won't return anything unless we touch a row in the table.
|
||||||
|
const insertEventTypeNIDSQL = "" +
|
||||||
|
"INSERT INTO event_types (event_type) VALUES ($1)" +
|
||||||
|
" ON CONFLICT ON CONSTRAINT event_type_unique" +
|
||||||
|
" DO UPDATE SET event_type = $1" +
|
||||||
|
" RETURNING (event_type_nid)"
|
||||||
|
|
||||||
|
const selectEventTypeNIDSQL = "" +
|
||||||
|
"SELECT event_type_nid FROM event_types WHERE event_type = $1"
|
||||||
|
|
||||||
|
func (s *statements) insertEventTypeNID(eventType string) (eventTypeNID int64, err error) {
|
||||||
|
err = s.insertEventTypeNIDStmt.QueryRow(eventType).Scan(&eventTypeNID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *statements) selectEventTypeNID(eventType string) (eventTypeNID int64, err error) {
|
||||||
|
err = s.selectEventTypeNIDStmt.QueryRow(eventType).Scan(&eventTypeNID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *statements) prepareEventStateKeys(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(eventStateKeysSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertEventStateKeyNIDStmt, err = db.Prepare(insertEventStateKeyNIDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectEventStateKeyNIDStmt, err = db.Prepare(selectEventStateKeyNIDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const eventStateKeysSchema = `
|
||||||
|
-- Numeric versions of the event "state_key"s. State keys tend to be reused so
|
||||||
|
-- assigning each string a numeric ID should reduce the amount of data that
|
||||||
|
-- needs to be stored and fetched from the database.
|
||||||
|
-- It also means that many operations can work with int64 arrays rather than
|
||||||
|
-- string arrays which may help reduce GC pressure.
|
||||||
|
-- Well known state keys are pre-assigned numeric IDs:
|
||||||
|
-- 1 -> "" (the empty string)
|
||||||
|
-- Other state keys are automatically assigned numeric IDs starting from 2**16.
|
||||||
|
-- This leaves room to add more pre-assigned numeric IDs and clearly separates
|
||||||
|
-- the automatically assigned IDs from the pre-assigned IDs.
|
||||||
|
CREATE SEQUENCE IF NOT EXISTS event_state_key_nid_seq START 65536;
|
||||||
|
CREATE TABLE IF NOT EXISTS event_state_keys (
|
||||||
|
-- Local numeric ID for the state key.
|
||||||
|
event_state_key_nid BIGINT PRIMARY KEY DEFAULT nextval('event_state_key_nid_seq'),
|
||||||
|
event_state_key TEXT NOT NULL CONSTRAINT event_state_key_unique UNIQUE
|
||||||
|
);
|
||||||
|
INSERT INTO event_state_keys (event_state_key_nid, event_state_key) VALUES
|
||||||
|
(1, '') ON CONFLICT DO NOTHING;
|
||||||
|
`
|
||||||
|
|
||||||
|
// Same as insertEventTypeNIDSQL
|
||||||
|
const insertEventStateKeyNIDSQL = "" +
|
||||||
|
"INSERT INTO event_state_keys (event_state_key) VALUES ($1)" +
|
||||||
|
" ON CONFLICT ON CONSTRAINT event_state_key_unique" +
|
||||||
|
" DO UPDATE SET event_state_key = $1" +
|
||||||
|
" RETURNING (event_state_key_nid)"
|
||||||
|
|
||||||
|
const selectEventStateKeyNIDSQL = "" +
|
||||||
|
"SELECT event_state_key_nid FROM event_state_keys WHERE event_state_key = $1"
|
||||||
|
|
||||||
|
func (s *statements) insertEventStateKeyNID(eventStateKey string) (eventStateKeyNID int64, err error) {
|
||||||
|
err = s.insertEventStateKeyNIDStmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *statements) selectEventStateKeyNID(eventStateKey string) (eventStateKeyNID int64, err error) {
|
||||||
|
err = s.selectEventStateKeyNIDStmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *statements) prepareRooms(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(roomsSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertRoomNIDStmt, err = db.Prepare(insertRoomNIDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectRoomNIDStmt, err = db.Prepare(selectRoomNIDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const roomsSchema = `
|
||||||
|
CREATE SEQUENCE IF NOT EXISTS room_nid_seq;
|
||||||
|
CREATE TABLE IF NOT EXISTS rooms (
|
||||||
|
-- Local numeric ID for the room.
|
||||||
|
room_nid BIGINT PRIMARY KEY DEFAULT nextval('room_nid_seq'),
|
||||||
|
-- Textual ID for the room.
|
||||||
|
room_id TEXT NOT NULL CONSTRAINT room_id_unique UNIQUE
|
||||||
|
);
|
||||||
|
`
|
||||||
|
|
||||||
|
// Same as insertEventTypeNIDSQL
|
||||||
|
const insertRoomNIDSQL = "" +
|
||||||
|
"INSERT INTO rooms (room_id) VALUES ($1)" +
|
||||||
|
" ON CONFLICT ON CONSTRAINT room_id_unique" +
|
||||||
|
" DO UPDATE SET room_id = $1" +
|
||||||
|
" RETURNING (room_nid)"
|
||||||
|
|
||||||
|
const selectRoomNIDSQL = "" +
|
||||||
|
"SELECT room_nid FROM rooms WHERE room_id = $1"
|
||||||
|
|
||||||
|
func (s *statements) insertRoomNID(roomID string) (roomNID int64, err error) {
|
||||||
|
err = s.insertRoomNIDStmt.QueryRow(roomID).Scan(&roomNID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *statements) selectRoomNID(roomID string) (roomNID int64, err error) {
|
||||||
|
err = s.selectRoomNIDStmt.QueryRow(roomID).Scan(&roomNID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const eventsSchema = `
|
||||||
|
-- The events table holds metadata for each event, the actual JSON is stored
|
||||||
|
-- separately to keep the size of the rows small.
|
||||||
|
CREATE SEQUENCE IF NOT EXISTS event_nid_seq;
|
||||||
|
CREATE TABLE IF NOT EXISTS events (
|
||||||
|
-- Local numeric ID for the event.
|
||||||
|
event_nid BIGINT PRIMARY KEY DEFAULT nextval('event_nid_seq'),
|
||||||
|
-- Local numeric ID for the room the event is in.
|
||||||
|
-- This is never 0.
|
||||||
|
room_nid BIGINT NOT NULL,
|
||||||
|
-- Local numeric ID for the type of the event.
|
||||||
|
-- This is never 0.
|
||||||
|
event_type_nid BIGINT NOT NULL,
|
||||||
|
-- Local numeric ID for the state_key of the event
|
||||||
|
-- This is 0 if the event is not a state event.
|
||||||
|
event_state_key_nid BIGINT NOT NULL,
|
||||||
|
-- The textual event id.
|
||||||
|
-- Used to lookup the numeric ID when processing requests.
|
||||||
|
-- Needed for state resolution.
|
||||||
|
-- An event may only appear in this table once.
|
||||||
|
event_id TEXT NOT NULL CONSTRAINT event_id_unique UNIQUE,
|
||||||
|
-- The sha256 reference hash for the event.
|
||||||
|
-- Needed for setting reference hashes when sending new events.
|
||||||
|
reference_sha256 BYTEA NOT NULL
|
||||||
|
);
|
||||||
|
`
|
||||||
|
|
||||||
|
const insertEventSQL = "" +
|
||||||
|
"INSERT INTO events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256)" +
|
||||||
|
" VALUES ($1, $2, $3, $4, $5)" +
|
||||||
|
" ON CONFLICT ON CONSTRAINT event_id_unique" +
|
||||||
|
" DO UPDATE SET event_id = $1" +
|
||||||
|
" RETURNING event_nid"
|
||||||
|
|
||||||
|
func (s *statements) prepareEvents(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(eventsSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *statements) insertEvent(
|
||||||
|
roomNID, eventTypeNID, eventStateKeyNID int64,
|
||||||
|
eventID string,
|
||||||
|
referenceSHA256 []byte,
|
||||||
|
) (eventNID int64, err error) {
|
||||||
|
err = s.insertEventStmt.QueryRow(
|
||||||
|
roomNID, eventTypeNID, eventStateKeyNID, eventID, referenceSHA256,
|
||||||
|
).Scan(&eventNID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *statements) prepareEventJSON(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(eventJSONSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertEventJSONStmt, err = db.Prepare(insertEventJSONSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const eventJSONSchema = `
|
||||||
|
-- Stores the JSON for each event. This kept separate from the main events
|
||||||
|
-- table to keep the rows in the main events table small.
|
||||||
|
CREATE TABLE IF NOT EXISTS event_json (
|
||||||
|
-- Local numeric ID for the event.
|
||||||
|
event_nid BIGINT NOT NULL PRIMARY KEY,
|
||||||
|
-- The JSON for the event.
|
||||||
|
-- Stored as TEXT because this should be valid UTF-8.
|
||||||
|
-- Not stored as a JSONB because we always just pull the entire event
|
||||||
|
-- so there is no point in postgres parsing it.
|
||||||
|
-- Not stored as JSON because we already validate the JSON in the server
|
||||||
|
-- so there is no point in postgres validating it.
|
||||||
|
-- TODO: Should we be compressing the events with Snappy or DEFLATE?
|
||||||
|
event_json TEXT NOT NULL
|
||||||
|
);
|
||||||
|
`
|
||||||
|
|
||||||
|
const insertEventJSONSQL = "" +
|
||||||
|
"INSERT INTO event_json (event_nid, event_json) VALUES ($1, $2)" +
|
||||||
|
" ON CONFLICT DO NOTHING"
|
||||||
|
|
||||||
|
func (s *statements) insertEventJSON(eventNID int64, eventJSON []byte) error {
|
||||||
|
_, err := s.insertEventJSONStmt.Exec(eventNID, eventJSON)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A Database is used to store room events and stream offsets.
|
// A Database is used to store room events and stream offsets.
|
||||||
|
@ -35,3 +36,82 @@ func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, erro
|
||||||
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
||||||
return d.statements.upsertPartitionOffset(topic, partition, offset)
|
return d.statements.upsertPartitionOffset(topic, partition, offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StoreEvent implements input.EventDatabase
|
||||||
|
func (d *Database) StoreEvent(event gomatrixserverlib.Event) error {
|
||||||
|
var (
|
||||||
|
roomNID int64
|
||||||
|
eventTypeNID int64
|
||||||
|
eventStateKeyNID int64
|
||||||
|
eventNID int64
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if roomNID, err = d.assignRoomNID(event.RoomID()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if eventTypeNID, err = d.assignEventTypeNID(event.Type()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
eventStateKey := event.StateKey()
|
||||||
|
// Assigned a numeric ID for the state_key if there is one present.
|
||||||
|
// Otherwise set the numeric ID for the state_key to 0.
|
||||||
|
if eventStateKey != nil {
|
||||||
|
if eventStateKeyNID, err = d.assignStateKeyNID(*eventStateKey); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if eventNID, err = d.statements.insertEvent(
|
||||||
|
roomNID,
|
||||||
|
eventTypeNID,
|
||||||
|
eventStateKeyNID,
|
||||||
|
event.EventID(),
|
||||||
|
event.EventReference().EventSHA256,
|
||||||
|
); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return d.statements.insertEventJSON(eventNID, event.JSON())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) assignRoomNID(roomID string) (int64, error) {
|
||||||
|
// Check if we already have a numeric ID in the database.
|
||||||
|
roomNID, err := d.statements.selectRoomNID(roomID)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
// We don't have a numeric ID so insert one into the database.
|
||||||
|
return d.statements.insertRoomNID(roomID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return roomNID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) assignEventTypeNID(eventType string) (int64, error) {
|
||||||
|
// Check if we already have a numeric ID in the database.
|
||||||
|
eventTypeNID, err := d.statements.selectEventTypeNID(eventType)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
// We don't have a numeric ID so insert one into the database.
|
||||||
|
return d.statements.insertEventTypeNID(eventType)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return eventTypeNID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) assignStateKeyNID(eventStateKey string) (int64, error) {
|
||||||
|
// Check if we already have a numeric ID in the database.
|
||||||
|
eventStateKeyNID, err := d.statements.selectEventStateKeyNID(eventStateKey)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
// We don't have a numeric ID so insert one into the database.
|
||||||
|
return d.statements.insertEventStateKeyNID(eventStateKey)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return eventStateKeyNID, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue