From 2fcf6fd6eb7e97e6bf1f2d27f7d25a26dc75fb2a Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 10 Mar 2017 16:19:23 +0000 Subject: [PATCH] Send /createRoom events to kafka (#33) --- .../dendrite/clientapi/clientapi.go | 25 ++++++++--- .../dendrite/clientapi/config/config.go | 10 ++++- .../dendrite/clientapi/routing/routing.go | 5 ++- .../dendrite/clientapi/writers/createroom.go | 45 +++++++++++++++++-- 4 files changed, 73 insertions(+), 12 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go index a487135bc..9f17edea1 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go +++ b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go @@ -12,6 +12,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/dugong" + sarama "gopkg.in/Shopify/sarama.v1" ) func setupLogging(logDir string) { @@ -38,17 +39,29 @@ func main() { if logDir != "" { setupLogging(logDir) } - log.Info("Starting clientapi") + // TODO: Rather than generating a new key on every startup, we should be // reading a PEM formatted file instead. _, privKey, err := ed25519.GenerateKey(nil) if err != nil { log.Panicf("Failed to generate private key: %s", err) } - routing.Setup(http.DefaultServeMux, http.DefaultClient, config.ClientAPI{ - ServerName: "localhost", - KeyID: "ed25519:something", - PrivateKey: privKey, - }) + + cfg := config.ClientAPI{ + ServerName: "localhost", + KeyID: "ed25519:something", + PrivateKey: privKey, + KafkaProducerURIs: []string{"localhost:9092"}, + ClientAPIOutputTopic: "clientapiOutput", + } + + log.Info("Starting clientapi") + + producer, err := sarama.NewSyncProducer(cfg.KafkaProducerURIs, nil) + if err != nil { + log.Panicf("Failed to setup kafka producers(%s): %s", cfg.KafkaProducerURIs, err) + } + + routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, producer) log.Fatal(http.ListenAndServe(bindAddr, nil)) } diff --git a/src/github.com/matrix-org/dendrite/clientapi/config/config.go b/src/github.com/matrix-org/dendrite/clientapi/config/config.go index f743dcba8..001da837b 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/clientapi/config/config.go @@ -4,7 +4,15 @@ import "golang.org/x/crypto/ed25519" // ClientAPI contains the config information necessary to spin up a clientapi process. type ClientAPI struct { + // The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'. ServerName string + // The private key which will be used to sign events. PrivateKey ed25519.PrivateKey - KeyID string + // An arbitrary string used to uniquely identify the PrivateKey. Must start with the + // prefix "ed25519:". + KeyID string + // A list of URIs to send events to. These kafka logs should be consumed by a Room Server. + KafkaProducerURIs []string + // The topic for events which are written to the logs. + ClientAPIOutputTopic string } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 50f502405..a98c05b57 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -9,17 +9,18 @@ import ( "github.com/matrix-org/dendrite/clientapi/writers" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" + sarama "gopkg.in/Shopify/sarama.v1" ) const pathPrefixR0 = "/_matrix/client/r0" // Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client // to clients which need to make outbound HTTP requests. -func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI) { +func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI, producer sarama.SyncProducer) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/createRoom", make("createRoom", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { - return writers.CreateRoom(req, cfg) + return writers.CreateRoom(req, cfg, producer) }))) r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { return readers.Sync(req) diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go index 6e7d07f45..9f7f7efd9 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go @@ -14,8 +14,10 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + sarama "gopkg.in/Shopify/sarama.v1" ) // https://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-createroom @@ -77,15 +79,15 @@ type fledglingEvent struct { } // CreateRoom implements /createRoom -func CreateRoom(req *http.Request, cfg config.ClientAPI) util.JSONResponse { +func CreateRoom(req *http.Request, cfg config.ClientAPI, producer sarama.SyncProducer) util.JSONResponse { // TODO: Check room ID doesn't clash with an existing one, and we // probably shouldn't be using pseudo-random strings, maybe GUIDs? roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.ServerName) - return createRoom(req, cfg, roomID) + return createRoom(req, cfg, roomID, producer) } // createRoom implements /createRoom -func createRoom(req *http.Request, cfg config.ClientAPI, roomID string) util.JSONResponse { +func createRoom(req *http.Request, cfg config.ClientAPI, roomID string, producer sarama.SyncProducer) util.JSONResponse { logger := util.GetLogger(req.Context()) userID, resErr := auth.VerifyAccessToken(req) if resErr != nil { @@ -176,7 +178,15 @@ func createRoom(req *http.Request, cfg config.ClientAPI, roomID string) util.JSO // Add the event to the list of auth events builtEventMap[common.StateKeyTuple{e.Type, e.StateKey}] = ev builtEvents = append(builtEvents, ev) + } + // send events to the room server + msgs, err := eventsToMessages(builtEvents, cfg.ClientAPIOutputTopic) + if err != nil { + return util.ErrorResponse(err) + } + if err = producer.SendMessages(msgs); err != nil { + return util.ErrorResponse(err) } return util.JSONResponse{ @@ -242,6 +252,35 @@ func authEventsFromStateNeeded(eventsNeeded gomatrixserverlib.StateNeeded, return } +func eventsToMessages(events []*gomatrixserverlib.Event, topic string) ([]*sarama.ProducerMessage, error) { + msgs := make([]*sarama.ProducerMessage, len(events)) + for i, e := range events { + var m sarama.ProducerMessage + + // map auth event references to IDs + var authEventIDs []string + for _, ref := range e.AuthEvents() { + authEventIDs = append(authEventIDs, ref.EventID) + } + + ire := api.InputRoomEvent{ + Kind: api.KindNew, + Event: e.JSON(), + AuthEventIDs: authEventIDs, + } + + value, err := json.Marshal(ire) + if err != nil { + return nil, err + } + m.Topic = topic + m.Key = sarama.StringEncoder(e.EventID()) + m.Value = sarama.ByteEncoder(value) + msgs[i] = &m + } + return msgs, nil +} + type authEventProvider struct { events map[common.StateKeyTuple]*gomatrixserverlib.Event }