From 37c804fa90a3b528f1790024913722960133a400 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 5 Apr 2017 11:53:15 +0100 Subject: [PATCH] Add sync.RequestPool and glue in callbacks in the right places --- .../dendrite/clientapi/routing/routing.go | 6 ++-- .../clientapi/sync/syncrequestpool.go | 33 +++++++++++++++++++ .../dendrite/clientapi/sync/syncserver.go | 5 ++- .../dendrite/cmd/dendrite-sync-server/main.go | 6 ++-- 4 files changed, 44 insertions(+), 6 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/clientapi/sync/syncrequestpool.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 499881b41..9679fab63 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -6,7 +6,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/producers" - "github.com/matrix-org/dendrite/clientapi/readers" + "github.com/matrix-org/dendrite/clientapi/sync" "github.com/matrix-org/dendrite/clientapi/writers" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/util" @@ -49,11 +49,11 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI } // SetupSyncServerListeners configures the given mux with sync-server listeners -func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync) { +func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, srp sync.RequestPool) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { - return readers.Sync(req) + return srp.OnIncomingSyncRequest(req) }))) servMux.Handle("/metrics", prometheus.Handler()) servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncrequestpool.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncrequestpool.go new file mode 100644 index 000000000..066026d1e --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncrequestpool.go @@ -0,0 +1,33 @@ +package sync + +import ( + "net/http" + + "github.com/matrix-org/dendrite/clientapi/storage" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +// RequestPool manages HTTP long-poll connections for /sync +type RequestPool struct { + db *storage.SyncServerDatabase + conns map[string]chan interface{} +} + +// OnIncomingSyncRequest is called when a client makes a /sync request +func (srp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONResponse { + return util.JSONResponse{ + Code: 200, + JSON: struct{}{}, + } +} + +// OnNewEvent is called when a new event is received from the room server +func (srp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event) { + +} + +// NewRequestPool makes a new RequestPool +func NewRequestPool(db *storage.SyncServerDatabase) RequestPool { + return RequestPool{db, make(map[string]chan interface{})} +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go index 1a0d0d610..5454c6985 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -16,10 +16,11 @@ import ( type Server struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase + rp RequestPool } // NewServer creates a new sync server. Call Start() to begin consuming from room servers. -func NewServer(cfg *config.Sync, store *storage.SyncServerDatabase) (*Server, error) { +func NewServer(cfg *config.Sync, rp RequestPool, store *storage.SyncServerDatabase) (*Server, error) { kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) if err != nil { return nil, err @@ -33,6 +34,7 @@ func NewServer(cfg *config.Sync, store *storage.SyncServerDatabase) (*Server, er s := &Server{ roomServerConsumer: &consumer, db: store, + rp: rp, } consumer.ProcessMessage = s.onMessage @@ -73,6 +75,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } + s.rp.OnNewEvent(&ev) return nil } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index 0c11448ad..e04797c28 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -74,7 +74,9 @@ func main() { log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) } - server, err := sync.NewServer(cfg, db) + rp := sync.NewRequestPool(db) + + server, err := sync.NewServer(cfg, rp, db) if err != nil { log.Panicf("startup: failed to create sync server: %s", err) } @@ -83,6 +85,6 @@ func main() { } log.Info("Starting sync server on ", *bindAddr) - routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg) + routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, rp) log.Fatal(http.ListenAndServe(*bindAddr, nil)) }