diff --git a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go index 03d03d33d..70df0a2d1 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go +++ b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go @@ -8,12 +8,12 @@ import ( "golang.org/x/crypto/ed25519" "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/routing" "github.com/matrix-org/dendrite/roomserver/api" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dugong" - sarama "gopkg.in/Shopify/sarama.v1" ) func setupLogging(logDir string) { @@ -59,12 +59,13 @@ func main() { log.Info("Starting clientapi") - producer, err := sarama.NewSyncProducer(cfg.KafkaProducerURIs, nil) + roomserverProducer, err := producers.NewRoomserverProducer(cfg.KafkaProducerURIs, cfg.ClientAPIOutputTopic) if err != nil { log.Panicf("Failed to setup kafka producers(%s): %s", cfg.KafkaProducerURIs, err) } + queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil) - routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, producer, queryAPI) + routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, roomserverProducer, queryAPI) log.Fatal(http.ListenAndServe(bindAddr, nil)) } diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go new file mode 100644 index 000000000..c30342c98 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go @@ -0,0 +1,82 @@ +package producers + +import ( + "encoding/json" + "fmt" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// RoomserverProducer produces events for the roomserver to consume. +type RoomserverProducer struct { + Topic string + Producer sarama.SyncProducer +} + +// NewRoomserverProducer creates a new RoomserverProducer +func NewRoomserverProducer(kafkaURIs []string, topic string) (*RoomserverProducer, error) { + producer, err := sarama.NewSyncProducer(kafkaURIs, nil) + if err != nil { + return nil, err + } + return &RoomserverProducer{ + Topic: topic, + Producer: producer, + }, nil +} + +// SendEvents writes the given events to the roomserver input log. The events are written with KindNew. +func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event) error { + eventIDs := make([]string, len(events)) + ires := make([]api.InputRoomEvent, len(events)) + for i := range events { + var authEventIDs []string + for _, ref := range events[i].AuthEvents() { + authEventIDs = append(authEventIDs, ref.EventID) + } + ire := api.InputRoomEvent{ + Kind: api.KindNew, + Event: events[i].JSON(), + AuthEventIDs: authEventIDs, + } + ires[i] = ire + eventIDs[i] = events[i].EventID() + } + return c.SendInputRoomEvents(ires, eventIDs) +} + +// SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both +// arrays must match, and each element must correspond to the same event. +func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent, eventIDs []string) error { + // TODO: Nicer way of doing this. Options are: + // A) Like this + // B) Add EventID field to InputRoomEvent + // C) Add wrapper struct with the EventID and the InputRoomEvent + if len(eventIDs) != len(ires) { + return fmt.Errorf("WriteInputRoomEvents: length mismatch %d != %d", len(eventIDs), len(ires)) + } + + msgs := make([]*sarama.ProducerMessage, len(ires)) + for i := range ires { + msg, err := c.toProducerMessage(ires[i], eventIDs[i]) + if err != nil { + return err + } + msgs[i] = msg + } + return c.Producer.SendMessages(msgs) +} + +func (c *RoomserverProducer) toProducerMessage(ire api.InputRoomEvent, eventID string) (*sarama.ProducerMessage, error) { + value, err := json.Marshal(ire) + if err != nil { + return nil, err + } + var m sarama.ProducerMessage + m.Topic = c.Topic + m.Key = sarama.StringEncoder(eventID) + m.Value = sarama.ByteEncoder(value) + return &m, nil +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 24943da2d..b076f268d 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -5,19 +5,19 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/readers" "github.com/matrix-org/dendrite/clientapi/writers" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" - sarama "gopkg.in/Shopify/sarama.v1" ) const pathPrefixR0 = "/_matrix/client/r0" // Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client // to clients which need to make outbound HTTP requests. -func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI, producer sarama.SyncProducer, queryAPI api.RoomserverQueryAPI) { +func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI, producer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/createRoom", make("createRoom", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go index 790fd5989..79bc285a7 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go @@ -13,11 +13,10 @@ import ( "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - sarama "gopkg.in/Shopify/sarama.v1" ) // https://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-createroom @@ -79,7 +78,7 @@ type fledglingEvent struct { } // CreateRoom implements /createRoom -func CreateRoom(req *http.Request, cfg config.ClientAPI, producer sarama.SyncProducer) util.JSONResponse { +func CreateRoom(req *http.Request, cfg config.ClientAPI, producer *producers.RoomserverProducer) util.JSONResponse { // TODO: Check room ID doesn't clash with an existing one, and we // probably shouldn't be using pseudo-random strings, maybe GUIDs? roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.ServerName) @@ -87,7 +86,7 @@ func CreateRoom(req *http.Request, cfg config.ClientAPI, producer sarama.SyncPro } // createRoom implements /createRoom -func createRoom(req *http.Request, cfg config.ClientAPI, roomID string, producer sarama.SyncProducer) util.JSONResponse { +func createRoom(req *http.Request, cfg config.ClientAPI, roomID string, producer *producers.RoomserverProducer) util.JSONResponse { logger := util.GetLogger(req.Context()) userID, resErr := auth.VerifyAccessToken(req) if resErr != nil { @@ -115,7 +114,7 @@ func createRoom(req *http.Request, cfg config.ClientAPI, roomID string, producer // Remember events we've built and key off the state tuple so we can look them up easily when filling in auth_events builtEventMap := make(map[common.StateKeyTuple]*gomatrixserverlib.Event) - var builtEvents []*gomatrixserverlib.Event + var builtEvents []gomatrixserverlib.Event // send events into the room in order of: // 1- m.room.create @@ -177,16 +176,12 @@ func createRoom(req *http.Request, cfg config.ClientAPI, roomID string, producer // Add the event to the list of auth events builtEventMap[common.StateKeyTuple{e.Type, e.StateKey}] = ev - builtEvents = append(builtEvents, ev) + builtEvents = append(builtEvents, *ev) authEvents.AddEvent(ev) } // send events to the room server - msgs, err := eventsToMessages(builtEvents, cfg.ClientAPIOutputTopic) - if err != nil { - return httputil.LogThenError(req, err) - } - if err = producer.SendMessages(msgs); err != nil { + if err := producer.SendEvents(builtEvents); err != nil { return httputil.LogThenError(req, err) } @@ -252,32 +247,3 @@ func authEventsFromStateNeeded(eventsNeeded gomatrixserverlib.StateNeeded, } return } - -func eventsToMessages(events []*gomatrixserverlib.Event, topic string) ([]*sarama.ProducerMessage, error) { - msgs := make([]*sarama.ProducerMessage, len(events)) - for i, e := range events { - var m sarama.ProducerMessage - - // map auth event references to IDs - var authEventIDs []string - for _, ref := range e.AuthEvents() { - authEventIDs = append(authEventIDs, ref.EventID) - } - - ire := api.InputRoomEvent{ - Kind: api.KindNew, - Event: e.JSON(), - AuthEventIDs: authEventIDs, - } - - value, err := json.Marshal(ire) - if err != nil { - return nil, err - } - m.Topic = topic - m.Key = sarama.StringEncoder(e.EventID()) - m.Value = sarama.ByteEncoder(value) - msgs[i] = &m - } - return msgs, nil -} diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/sendmessage.go b/src/github.com/matrix-org/dendrite/clientapi/writers/sendmessage.go index 717025fa8..7e236c3a1 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/sendmessage.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/sendmessage.go @@ -3,18 +3,18 @@ package writers import ( "net/http" - "encoding/json" "fmt" + "time" + "github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - sarama "gopkg.in/Shopify/sarama.v1" - "time" ) // http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid @@ -23,7 +23,7 @@ type sendMessageResponse struct { } // SendMessage implements /rooms/{roomID}/send/{eventType}/{txnID} -func SendMessage(req *http.Request, roomID, eventType, txnID string, cfg config.ClientAPI, queryAPI api.RoomserverQueryAPI, producer sarama.SyncProducer) util.JSONResponse { +func SendMessage(req *http.Request, roomID, eventType, txnID string, cfg config.ClientAPI, queryAPI api.RoomserverQueryAPI, producer *producers.RoomserverProducer) util.JSONResponse { // parse the incoming http request userID, resErr := auth.VerifyAccessToken(req) if resErr != nil { @@ -93,7 +93,7 @@ func SendMessage(req *http.Request, roomID, eventType, txnID string, cfg config. } // pass the new event to the roomserver - if err := sendToRoomserver(e, producer, cfg.ClientAPIOutputTopic); err != nil { + if err := producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil { return httputil.LogThenError(req, err) } @@ -103,31 +103,6 @@ func SendMessage(req *http.Request, roomID, eventType, txnID string, cfg config. } } -func sendToRoomserver(e gomatrixserverlib.Event, producer sarama.SyncProducer, topic string) error { - var authEventIDs []string - for _, ref := range e.AuthEvents() { - authEventIDs = append(authEventIDs, ref.EventID) - } - ire := api.InputRoomEvent{ - Kind: api.KindNew, - Event: e.JSON(), - AuthEventIDs: authEventIDs, - } - - value, err := json.Marshal(ire) - if err != nil { - return err - } - var m sarama.ProducerMessage - m.Topic = topic - m.Key = sarama.StringEncoder(e.EventID()) - m.Value = sarama.ByteEncoder(value) - if _, _, err := producer.SendMessage(&m); err != nil { - return err - } - return nil -} - func stateNeeded(builder *gomatrixserverlib.EventBuilder) (requiredStateEvents []common.StateKeyTuple, err error) { authEvents, err := gomatrixserverlib.StateNeededForEventBuilder(builder) if err != nil {