From a4230089873475f4f8c16963ced4f21cb58dfc35 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 29 Mar 2017 14:05:43 +0100 Subject: [PATCH] Read roomserver output log and remember position across restarts (#52) --- .../dendrite/clientapi/config/config.go | 2 + .../dendrite/clientapi/routing/routing.go | 4 +- .../dendrite/clientapi/storage/syncserver.go | 39 +++++++++++++++++ .../dendrite/clientapi/sync/syncserver.go | 43 +++++++++++++++++++ .../dendrite/cmd/dendrite-sync-server/main.go | 18 +++++++- .../dendrite/common/partition_offset_table.go | 1 + 6 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go create mode 100644 src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/config/config.go b/src/github.com/matrix-org/dendrite/clientapi/config/config.go index 4f431bee3..08e1f4d14 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/clientapi/config/config.go @@ -25,4 +25,6 @@ type Sync struct { RoomserverOutputTopic string // A list of URIs to consume events from. These kafka logs should be produced by a Room Server. KafkaConsumerURIs []string + // The postgres connection config for connecting to the database e.g a postgres:// URI + DataSource string } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index f1d2dd5fe..499881b41 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -48,8 +48,8 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) } -// SetupSyncServer configures the given mux with sync-server listeners -func SetupSyncServer(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync) { +// SetupSyncServerListeners configures the given mux with sync-server listeners +func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go new file mode 100644 index 000000000..7d8f4f504 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go @@ -0,0 +1,39 @@ +package storage + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/common" + // Import the postgres database driver. + _ "github.com/lib/pq" +) + +// SyncServerDatabase represents a sync server database +type SyncServerDatabase struct { + db *sql.DB + partitions common.PartitionOffsetStatements +} + +// NewSyncServerDatabase creates a new sync server database +func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { + var db *sql.DB + var err error + if db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + partitions := common.PartitionOffsetStatements{} + if err = partitions.Prepare(db); err != nil { + return nil, err + } + return &SyncServerDatabase{db, partitions}, nil +} + +// PartitionOffsets implements common.PartitionStorer +func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { + return d.partitions.SelectPartitionOffsets(topic) +} + +// SetPartitionOffset implements common.PartitionStorer +func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error { + return d.partitions.UpsertPartitionOffset(topic, partition, offset) +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go new file mode 100644 index 000000000..71c6ec887 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -0,0 +1,43 @@ +package sync + +import ( + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/common" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// Server contains all the logic for running a sync server +type Server struct { + roomServerConsumer *common.ContinualConsumer +} + +// NewServer creates a new sync server. Call Start() to begin consuming from room servers. +func NewServer(cfg *config.Sync, store common.PartitionStorer) (*Server, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) + if err != nil { + return nil, err + } + + consumer := common.ContinualConsumer{ + Topic: cfg.RoomserverOutputTopic, + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &Server{ + roomServerConsumer: &consumer, + } + consumer.ProcessMessage = s.onMessage + + return s, nil +} + +// Start consuming from room servers +func (s *Server) Start() error { + return s.roomServerConsumer.Start() +} + +func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { + log.WithField("key", string(msg.Key)).WithField("val", string(msg.Value)).Info("Recv") + return nil +} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index c92e70961..83ea51c3d 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -7,6 +7,8 @@ import ( "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/routing" + "github.com/matrix-org/dendrite/clientapi/storage" + "github.com/matrix-org/dendrite/clientapi/sync" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dugong" @@ -40,10 +42,24 @@ func main() { cfg := config.Sync{ KafkaConsumerURIs: []string{"localhost:9092"}, RoomserverOutputTopic: "roomserverOutput", + DataSource: "postgres://dendrite:itsasecret@localhost/syncserver?sslmode=disable", } log.Info("Starting sync server") - routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg) + db, err := storage.NewSyncServerDatabase(cfg.DataSource) + if err != nil { + log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) + } + + server, err := sync.NewServer(&cfg, db) + if err != nil { + log.Panicf("startup: failed to create sync server: %s", err) + } + if err = server.Start(); err != nil { + log.Panicf("startup: failed to start sync server") + } + + routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, cfg) log.Fatal(http.ListenAndServe(bindAddr, nil)) } diff --git a/src/github.com/matrix-org/dendrite/common/partition_offset_table.go b/src/github.com/matrix-org/dendrite/common/partition_offset_table.go index 904e0be81..5b7a3fd17 100644 --- a/src/github.com/matrix-org/dendrite/common/partition_offset_table.go +++ b/src/github.com/matrix-org/dendrite/common/partition_offset_table.go @@ -57,6 +57,7 @@ func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]Part if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { return nil, err } + results = append(results, offset) } return results, nil }