Write OutputRoomEvents into the database and remember the arrival order (#54)

This commit is contained in:
Kegsay 2017-03-30 15:29:23 +01:00 committed by GitHub
parent 2d2c7e7169
commit a3c66f7fa0
3 changed files with 94 additions and 5 deletions

View file

@ -0,0 +1,48 @@
package storage
import (
"database/sql"
"github.com/lib/pq"
)
const outputRoomEventsSchema = `
-- Stores output room events received from the roomserver.
CREATE TABLE IF NOT EXISTS output_room_events (
-- An incrementing ID which denotes the position in the log that this event resides at.
-- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
-- This isn't a problem for us since we just want to order by this field.
id BIGSERIAL PRIMARY KEY,
-- The 'room_id' key for the event.
room_id TEXT NOT NULL,
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
event_json TEXT NOT NULL,
-- A list of event IDs which represent a delta of added/removed room state.
add_state_ids TEXT[] NOT NULL,
remove_state_ids TEXT[] NOT NULL
);
`
const insertEventSQL = "" +
"INSERT INTO output_room_events (room_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4)"
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
}
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(outputRoomEventsSchema)
if err != nil {
return
}
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
return
}
return
}
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs.
func (s *outputRoomEventsStatements) InsertEvent(roomID string, eventJSON []byte, addState, removeState []string) error {
_, err := s.insertEventStmt.Exec(roomID, eventJSON, pq.StringArray(addState), pq.StringArray(removeState))
return err
}

View file

@ -2,16 +2,17 @@ package storage
import ( import (
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/common"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/gomatrixserverlib"
) )
// SyncServerDatabase represents a sync server database // SyncServerDatabase represents a sync server database
type SyncServerDatabase struct { type SyncServerDatabase struct {
db *sql.DB db *sql.DB
partitions common.PartitionOffsetStatements partitions common.PartitionOffsetStatements
events outputRoomEventsStatements
} }
// NewSyncServerDatabase creates a new sync server database // NewSyncServerDatabase creates a new sync server database
@ -25,7 +26,17 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
if err = partitions.Prepare(db); err != nil { if err = partitions.Prepare(db); err != nil {
return nil, err return nil, err
} }
return &SyncServerDatabase{db, partitions}, nil events := outputRoomEventsStatements{}
if err = events.prepare(db); err != nil {
return nil, err
}
return &SyncServerDatabase{db, partitions, events}, nil
}
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
// when generating the stream position for this event. Returns an error if there was a problem inserting this event.
func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) error {
return d.events.InsertEvent(ev.RoomID(), ev.JSON(), addStateEventIDs, removeStateEventIDs)
} }
// PartitionOffsets implements common.PartitionStorer // PartitionOffsets implements common.PartitionStorer

View file

@ -1,19 +1,25 @@
package sync package sync
import ( import (
"encoding/json"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/clientapi/storage"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
// Server contains all the logic for running a sync server // Server contains all the logic for running a sync server
type Server struct { type Server struct {
roomServerConsumer *common.ContinualConsumer roomServerConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase
} }
// NewServer creates a new sync server. Call Start() to begin consuming from room servers. // NewServer creates a new sync server. Call Start() to begin consuming from room servers.
func NewServer(cfg *config.Sync, store common.PartitionStorer) (*Server, error) { func NewServer(cfg *config.Sync, store *storage.SyncServerDatabase) (*Server, error) {
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -26,6 +32,7 @@ func NewServer(cfg *config.Sync, store common.PartitionStorer) (*Server, error)
} }
s := &Server{ s := &Server{
roomServerConsumer: &consumer, roomServerConsumer: &consumer,
db: store,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -38,6 +45,29 @@ func (s *Server) Start() error {
} }
func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
log.WithField("key", string(msg.Key)).WithField("val", string(msg.Value)).Info("Recv") // Parse out the event JSON
var output api.OutputRoomEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false)
if err != nil {
log.WithError(err).Errorf("roomserver output log: event parse failure")
return nil
}
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
}).Info("received event from roomserver")
if err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs); err != nil {
// panic rather than continue with an inconsistent database
log.WithError(err).WithField("OutputRoomEvent", output).Panicf("roomserver output log: write event failure")
return nil
}
return nil return nil
} }