Add a RoomserverConsumer

This commit is contained in:
Kegan Dougal 2017-03-24 16:20:39 +00:00
parent 349b991178
commit 39f129918a
3 changed files with 38 additions and 2 deletions

View file

@ -0,0 +1,29 @@
package consumers
import (
"github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/common"
sarama "gopkg.in/Shopify/sarama.v1"
)
// RoomserverConsumer consumes events from the room server
type RoomserverConsumer struct {
Consumer common.ContinualConsumer
}
// NewRoomserverConsumer creates a new roomserver consumer
func NewRoomserverConsumer(cfg *config.Sync, store common.PartitionStorer) (*RoomserverConsumer, error) {
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
if err != nil {
return nil, err
}
return &RoomserverConsumer{
Consumer: common.ContinualConsumer{
Topic: cfg.RoomserverOutputTopic,
Consumer: kafkaConsumer,
PartitionStore: store,
},
}, nil
}

View file

@ -5,6 +5,7 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/clientapi/consumers"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/readers"
"github.com/matrix-org/dendrite/clientapi/writers"
@ -49,7 +50,7 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI
}
// SetupSyncServer configures the given mux with sync-server listeners
func SetupSyncServer(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync) {
func SetupSyncServer(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, consumer *consumers.RoomserverConsumer) {
apiMux := mux.NewRouter()
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse {

View file

@ -6,6 +6,7 @@ import (
"path/filepath"
"github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/clientapi/consumers"
"github.com/matrix-org/dendrite/clientapi/routing"
log "github.com/Sirupsen/logrus"
@ -44,6 +45,11 @@ func main() {
log.Info("Starting sync server")
routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg)
consumer, err := consumers.NewRoomserverConsumer(&cfg, nil) // TODO: partition storer
if err != nil {
panic(err)
}
routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg, consumer)
log.Fatal(http.ListenAndServe(bindAddr, nil))
}