diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go new file mode 100644 index 000000000..af764c016 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go @@ -0,0 +1,29 @@ +package consumers + +import ( + "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/common" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// RoomserverConsumer consumes events from the room server +type RoomserverConsumer struct { + Consumer common.ContinualConsumer +} + +// NewRoomserverConsumer creates a new roomserver consumer +func NewRoomserverConsumer(cfg *config.Sync, store common.PartitionStorer) (*RoomserverConsumer, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) + if err != nil { + return nil, err + } + + return &RoomserverConsumer{ + Consumer: common.ContinualConsumer{ + Topic: cfg.RoomserverOutputTopic, + Consumer: kafkaConsumer, + PartitionStore: store, + }, + }, nil + +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index f1d2dd5fe..a4ff04b31 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -5,6 +5,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/readers" "github.com/matrix-org/dendrite/clientapi/writers" @@ -49,7 +50,7 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI } // SetupSyncServer configures the given mux with sync-server listeners -func SetupSyncServer(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync) { +func SetupSyncServer(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, consumer *consumers.RoomserverConsumer) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index c92e70961..0ab56c9e4 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -6,6 +6,7 @@ import ( "path/filepath" "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/routing" log "github.com/Sirupsen/logrus" @@ -44,6 +45,11 @@ func main() { log.Info("Starting sync server") - routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg) + consumer, err := consumers.NewRoomserverConsumer(&cfg, nil) // TODO: partition storer + if err != nil { + panic(err) + } + + routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg, consumer) log.Fatal(http.ListenAndServe(bindAddr, nil)) }