Introduced new Kafka topic and producer

This commit is contained in:
Brendan Abolivier 2017-07-10 11:08:48 +01:00
parent 89b792cfad
commit 6a320d6ab6
No known key found for this signature in database
GPG key ID: 8EF1500759F70623
4 changed files with 12 additions and 4 deletions

View file

@ -22,7 +22,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
// "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/util"
)
@ -70,6 +70,7 @@ func GetProfile(
// AvatarURL implements GET and PUT /profile/{userID}/avatar_url
func AvatarURL(
req *http.Request, accountDB *accounts.Database, userID string,
userUpdatesProducer *producers.RoomserverProducer,
) util.JSONResponse {
if req.Method == "GET" {
localpart := getLocalPart(userID)
@ -120,6 +121,7 @@ func AvatarURL(
// DisplayName implements GET and PUT /profile/{userID}/displayname
func DisplayName(
req *http.Request, accountDB *accounts.Database, userID string,
userUpdatesProducer *producers.RoomserverProducer,
) util.JSONResponse {
if req.Method == "GET" {
localpart := getLocalPart(userID)

View file

@ -46,6 +46,7 @@ func Setup(
deviceDB *devices.Database,
federation *gomatrixserverlib.FederationClient,
keyRing gomatrixserverlib.KeyRing,
userUpdatesProducer *producers.RoomserverProducer,
) {
apiMux := mux.NewRouter()
@ -171,14 +172,14 @@ func Setup(
r0mux.Handle("/profile/{userID}/avatar_url",
common.MakeAPI("profile_avatar_url", func(req *http.Request) util.JSONResponse {
vars := mux.Vars(req)
return readers.AvatarURL(req, accountDB, vars["userID"])
return readers.AvatarURL(req, accountDB, vars["userID"], userUpdatesProducer)
}),
)
r0mux.Handle("/profile/{userID}/displayname",
common.MakeAPI("profile_displayname", func(req *http.Request) util.JSONResponse {
vars := mux.Vars(req)
return readers.DisplayName(req, accountDB, vars["userID"])
return readers.DisplayName(req, accountDB, vars["userID"], userUpdatesProducer)
}),
)

View file

@ -53,6 +53,9 @@ func main() {
roomserverProducer, err := producers.NewRoomserverProducer(
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent),
)
userUpdatesProducer, err := producers.NewRoomserverProducer(
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates),
)
if err != nil {
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
}
@ -86,7 +89,7 @@ func main() {
log.Info("Starting client API server on ", cfg.Listen.ClientAPI)
routing.Setup(
http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer,
queryAPI, accountDB, deviceDB, federation, keyRing,
queryAPI, accountDB, deviceDB, federation, keyRing, userUpdatesProducer,
)
log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil))
}

View file

@ -99,6 +99,8 @@ type Dendrite struct {
InputRoomEvent Topic `yaml:"input_room_event"`
// Topic for roomserver/api.OutputRoomEvent events.
OutputRoomEvent Topic `yaml:"output_room_event"`
// Topic for user updates (profile, presence)
UserUpdates Topic `yaml:"user_updates"`
}
} `yaml:"kafka"`