From 39f129918af5140b2c775fcb9a19f9a34f6fe56c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 24 Mar 2017 16:20:39 +0000 Subject: [PATCH] Add a RoomserverConsumer --- .../clientapi/consumers/roomserver.go | 29 +++++++++++++++++++ .../dendrite/clientapi/routing/routing.go | 3 +- .../dendrite/cmd/dendrite-sync-server/main.go | 8 ++++- 3 files changed, 38 insertions(+), 2 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go new file mode 100644 index 000000000..af764c016 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go @@ -0,0 +1,29 @@ +package consumers + +import ( + "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/common" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// RoomserverConsumer consumes events from the room server +type RoomserverConsumer struct { + Consumer common.ContinualConsumer +} + +// NewRoomserverConsumer creates a new roomserver consumer +func NewRoomserverConsumer(cfg *config.Sync, store common.PartitionStorer) (*RoomserverConsumer, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) + if err != nil { + return nil, err + } + + return &RoomserverConsumer{ + Consumer: common.ContinualConsumer{ + Topic: cfg.RoomserverOutputTopic, + Consumer: kafkaConsumer, + PartitionStore: store, + }, + }, nil + +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index f1d2dd5fe..a4ff04b31 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -5,6 +5,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/readers" "github.com/matrix-org/dendrite/clientapi/writers" @@ -49,7 +50,7 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI } // SetupSyncServer configures the given mux with sync-server listeners -func SetupSyncServer(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync) { +func SetupSyncServer(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, consumer *consumers.RoomserverConsumer) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index c92e70961..0ab56c9e4 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -6,6 +6,7 @@ import ( "path/filepath" "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/routing" log "github.com/Sirupsen/logrus" @@ -44,6 +45,11 @@ func main() { log.Info("Starting sync server") - routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg) + consumer, err := consumers.NewRoomserverConsumer(&cfg, nil) // TODO: partition storer + if err != nil { + panic(err) + } + + routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg, consumer) log.Fatal(http.ListenAndServe(bindAddr, nil)) }