Factor out writing events to the roomserver input log (#40)

This commit is contained in:
Kegsay 2017-03-15 13:36:26 +00:00 committed by GitHub
parent 6739f65752
commit 742e4aca85
5 changed files with 99 additions and 75 deletions

View file

@ -8,12 +8,12 @@ import (
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
"github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/routing" "github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dugong" "github.com/matrix-org/dugong"
sarama "gopkg.in/Shopify/sarama.v1"
) )
func setupLogging(logDir string) { func setupLogging(logDir string) {
@ -59,12 +59,13 @@ func main() {
log.Info("Starting clientapi") log.Info("Starting clientapi")
producer, err := sarama.NewSyncProducer(cfg.KafkaProducerURIs, nil) roomserverProducer, err := producers.NewRoomserverProducer(cfg.KafkaProducerURIs, cfg.ClientAPIOutputTopic)
if err != nil { if err != nil {
log.Panicf("Failed to setup kafka producers(%s): %s", cfg.KafkaProducerURIs, err) log.Panicf("Failed to setup kafka producers(%s): %s", cfg.KafkaProducerURIs, err)
} }
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil) queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil)
routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, producer, queryAPI) routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, roomserverProducer, queryAPI)
log.Fatal(http.ListenAndServe(bindAddr, nil)) log.Fatal(http.ListenAndServe(bindAddr, nil))
} }

View file

@ -0,0 +1,82 @@
package producers
import (
"encoding/json"
"fmt"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
sarama "gopkg.in/Shopify/sarama.v1"
)
// RoomserverProducer produces events for the roomserver to consume.
type RoomserverProducer struct {
Topic string
Producer sarama.SyncProducer
}
// NewRoomserverProducer creates a new RoomserverProducer
func NewRoomserverProducer(kafkaURIs []string, topic string) (*RoomserverProducer, error) {
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
if err != nil {
return nil, err
}
return &RoomserverProducer{
Topic: topic,
Producer: producer,
}, nil
}
// SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event) error {
eventIDs := make([]string, len(events))
ires := make([]api.InputRoomEvent, len(events))
for i := range events {
var authEventIDs []string
for _, ref := range events[i].AuthEvents() {
authEventIDs = append(authEventIDs, ref.EventID)
}
ire := api.InputRoomEvent{
Kind: api.KindNew,
Event: events[i].JSON(),
AuthEventIDs: authEventIDs,
}
ires[i] = ire
eventIDs[i] = events[i].EventID()
}
return c.SendInputRoomEvents(ires, eventIDs)
}
// SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both
// arrays must match, and each element must correspond to the same event.
func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent, eventIDs []string) error {
// TODO: Nicer way of doing this. Options are:
// A) Like this
// B) Add EventID field to InputRoomEvent
// C) Add wrapper struct with the EventID and the InputRoomEvent
if len(eventIDs) != len(ires) {
return fmt.Errorf("WriteInputRoomEvents: length mismatch %d != %d", len(eventIDs), len(ires))
}
msgs := make([]*sarama.ProducerMessage, len(ires))
for i := range ires {
msg, err := c.toProducerMessage(ires[i], eventIDs[i])
if err != nil {
return err
}
msgs[i] = msg
}
return c.Producer.SendMessages(msgs)
}
func (c *RoomserverProducer) toProducerMessage(ire api.InputRoomEvent, eventID string) (*sarama.ProducerMessage, error) {
value, err := json.Marshal(ire)
if err != nil {
return nil, err
}
var m sarama.ProducerMessage
m.Topic = c.Topic
m.Key = sarama.StringEncoder(eventID)
m.Value = sarama.ByteEncoder(value)
return &m, nil
}

View file

@ -5,19 +5,19 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/readers" "github.com/matrix-org/dendrite/clientapi/readers"
"github.com/matrix-org/dendrite/clientapi/writers" "github.com/matrix-org/dendrite/clientapi/writers"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
sarama "gopkg.in/Shopify/sarama.v1"
) )
const pathPrefixR0 = "/_matrix/client/r0" const pathPrefixR0 = "/_matrix/client/r0"
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client // Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
// to clients which need to make outbound HTTP requests. // to clients which need to make outbound HTTP requests.
func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI, producer sarama.SyncProducer, queryAPI api.RoomserverQueryAPI) { func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI, producer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI) {
apiMux := mux.NewRouter() apiMux := mux.NewRouter()
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
r0mux.Handle("/createRoom", make("createRoom", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { r0mux.Handle("/createRoom", make("createRoom", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse {

View file

@ -13,11 +13,10 @@ import (
"github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/clientapi/events"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
sarama "gopkg.in/Shopify/sarama.v1"
) )
// https://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-createroom // https://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-createroom
@ -79,7 +78,7 @@ type fledglingEvent struct {
} }
// CreateRoom implements /createRoom // CreateRoom implements /createRoom
func CreateRoom(req *http.Request, cfg config.ClientAPI, producer sarama.SyncProducer) util.JSONResponse { func CreateRoom(req *http.Request, cfg config.ClientAPI, producer *producers.RoomserverProducer) util.JSONResponse {
// TODO: Check room ID doesn't clash with an existing one, and we // TODO: Check room ID doesn't clash with an existing one, and we
// probably shouldn't be using pseudo-random strings, maybe GUIDs? // probably shouldn't be using pseudo-random strings, maybe GUIDs?
roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.ServerName) roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.ServerName)
@ -87,7 +86,7 @@ func CreateRoom(req *http.Request, cfg config.ClientAPI, producer sarama.SyncPro
} }
// createRoom implements /createRoom // createRoom implements /createRoom
func createRoom(req *http.Request, cfg config.ClientAPI, roomID string, producer sarama.SyncProducer) util.JSONResponse { func createRoom(req *http.Request, cfg config.ClientAPI, roomID string, producer *producers.RoomserverProducer) util.JSONResponse {
logger := util.GetLogger(req.Context()) logger := util.GetLogger(req.Context())
userID, resErr := auth.VerifyAccessToken(req) userID, resErr := auth.VerifyAccessToken(req)
if resErr != nil { if resErr != nil {
@ -115,7 +114,7 @@ func createRoom(req *http.Request, cfg config.ClientAPI, roomID string, producer
// Remember events we've built and key off the state tuple so we can look them up easily when filling in auth_events // Remember events we've built and key off the state tuple so we can look them up easily when filling in auth_events
builtEventMap := make(map[common.StateKeyTuple]*gomatrixserverlib.Event) builtEventMap := make(map[common.StateKeyTuple]*gomatrixserverlib.Event)
var builtEvents []*gomatrixserverlib.Event var builtEvents []gomatrixserverlib.Event
// send events into the room in order of: // send events into the room in order of:
// 1- m.room.create // 1- m.room.create
@ -177,16 +176,12 @@ func createRoom(req *http.Request, cfg config.ClientAPI, roomID string, producer
// Add the event to the list of auth events // Add the event to the list of auth events
builtEventMap[common.StateKeyTuple{e.Type, e.StateKey}] = ev builtEventMap[common.StateKeyTuple{e.Type, e.StateKey}] = ev
builtEvents = append(builtEvents, ev) builtEvents = append(builtEvents, *ev)
authEvents.AddEvent(ev) authEvents.AddEvent(ev)
} }
// send events to the room server // send events to the room server
msgs, err := eventsToMessages(builtEvents, cfg.ClientAPIOutputTopic) if err := producer.SendEvents(builtEvents); err != nil {
if err != nil {
return httputil.LogThenError(req, err)
}
if err = producer.SendMessages(msgs); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -252,32 +247,3 @@ func authEventsFromStateNeeded(eventsNeeded gomatrixserverlib.StateNeeded,
} }
return return
} }
func eventsToMessages(events []*gomatrixserverlib.Event, topic string) ([]*sarama.ProducerMessage, error) {
msgs := make([]*sarama.ProducerMessage, len(events))
for i, e := range events {
var m sarama.ProducerMessage
// map auth event references to IDs
var authEventIDs []string
for _, ref := range e.AuthEvents() {
authEventIDs = append(authEventIDs, ref.EventID)
}
ire := api.InputRoomEvent{
Kind: api.KindNew,
Event: e.JSON(),
AuthEventIDs: authEventIDs,
}
value, err := json.Marshal(ire)
if err != nil {
return nil, err
}
m.Topic = topic
m.Key = sarama.StringEncoder(e.EventID())
m.Value = sarama.ByteEncoder(value)
msgs[i] = &m
}
return msgs, nil
}

View file

@ -3,18 +3,18 @@ package writers
import ( import (
"net/http" "net/http"
"encoding/json"
"fmt" "fmt"
"time"
"github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
sarama "gopkg.in/Shopify/sarama.v1"
"time"
) )
// http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid // http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid
@ -23,7 +23,7 @@ type sendMessageResponse struct {
} }
// SendMessage implements /rooms/{roomID}/send/{eventType}/{txnID} // SendMessage implements /rooms/{roomID}/send/{eventType}/{txnID}
func SendMessage(req *http.Request, roomID, eventType, txnID string, cfg config.ClientAPI, queryAPI api.RoomserverQueryAPI, producer sarama.SyncProducer) util.JSONResponse { func SendMessage(req *http.Request, roomID, eventType, txnID string, cfg config.ClientAPI, queryAPI api.RoomserverQueryAPI, producer *producers.RoomserverProducer) util.JSONResponse {
// parse the incoming http request // parse the incoming http request
userID, resErr := auth.VerifyAccessToken(req) userID, resErr := auth.VerifyAccessToken(req)
if resErr != nil { if resErr != nil {
@ -93,7 +93,7 @@ func SendMessage(req *http.Request, roomID, eventType, txnID string, cfg config.
} }
// pass the new event to the roomserver // pass the new event to the roomserver
if err := sendToRoomserver(e, producer, cfg.ClientAPIOutputTopic); err != nil { if err := producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -103,31 +103,6 @@ func SendMessage(req *http.Request, roomID, eventType, txnID string, cfg config.
} }
} }
func sendToRoomserver(e gomatrixserverlib.Event, producer sarama.SyncProducer, topic string) error {
var authEventIDs []string
for _, ref := range e.AuthEvents() {
authEventIDs = append(authEventIDs, ref.EventID)
}
ire := api.InputRoomEvent{
Kind: api.KindNew,
Event: e.JSON(),
AuthEventIDs: authEventIDs,
}
value, err := json.Marshal(ire)
if err != nil {
return err
}
var m sarama.ProducerMessage
m.Topic = topic
m.Key = sarama.StringEncoder(e.EventID())
m.Value = sarama.ByteEncoder(value)
if _, _, err := producer.SendMessage(&m); err != nil {
return err
}
return nil
}
func stateNeeded(builder *gomatrixserverlib.EventBuilder) (requiredStateEvents []common.StateKeyTuple, err error) { func stateNeeded(builder *gomatrixserverlib.EventBuilder) (requiredStateEvents []common.StateKeyTuple, err error) {
authEvents, err := gomatrixserverlib.StateNeededForEventBuilder(builder) authEvents, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
if err != nil { if err != nil {