From 04f3c154b872b1ac5f09e86ef19358f47983c415 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 10 May 2017 10:42:00 +0100 Subject: [PATCH 1/2] Split out notifying /sync requests and calculating sync responses (#96) * Split out notifying /sync requests and calculating sync responses The logic for notifying /sync requests is about to get really complicated as we optimise when to wake up requests, so split out that code into a separate struct to isolate it and make it easier to unit test. --- .../cmd/dendrite-sync-api-server/main.go | 10 +-- .../dendrite/syncapi/consumers/roomserver.go | 8 +-- .../dendrite/syncapi/sync/notifier.go | 67 +++++++++++++++++++ .../dendrite/syncapi/sync/requestpool.go | 44 ++---------- 4 files changed, 82 insertions(+), 47 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index 750f02038..f67bf0e5e 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" + "github.com/matrix-org/dendrite/syncapi/types" log "github.com/Sirupsen/logrus" yaml "gopkg.in/yaml.v2" @@ -71,12 +72,13 @@ func main() { log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) } - rp, err := sync.NewRequestPool(db) + pos, err := db.SyncStreamPosition() if err != nil { - log.Panicf("startup: Failed to create request pool : %s", err) + log.Panicf("startup: failed to get latest sync stream position : %s", err) } - server, err := consumers.NewServer(cfg, rp, db) + n := sync.NewNotifier(types.StreamPosition(pos)) + server, err := consumers.NewServer(cfg, n, db) if err != nil { log.Panicf("startup: failed to create sync server: %s", err) } @@ -85,6 +87,6 @@ func main() { } log.Info("Starting sync server on ", *bindAddr) - routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, rp) + routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, sync.NewRequestPool(db, n)) log.Fatal(http.ListenAndServe(*bindAddr, nil)) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index b9737163d..4d703ab32 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -32,11 +32,11 @@ import ( type Server struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase - rp *sync.RequestPool + notifier *sync.Notifier } // NewServer creates a new sync server. Call Start() to begin consuming from room servers. -func NewServer(cfg *config.Sync, rp *sync.RequestPool, store *storage.SyncServerDatabase) (*Server, error) { +func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*Server, error) { kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) if err != nil { return nil, err @@ -50,7 +50,7 @@ func NewServer(cfg *config.Sync, rp *sync.RequestPool, store *storage.SyncServer s := &Server{ roomServerConsumer: &consumer, db: store, - rp: rp, + notifier: n, } consumer.ProcessMessage = s.onMessage @@ -96,7 +96,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } - s.rp.OnNewEvent(&ev, types.StreamPosition(syncStreamPos)) + s.notifier.OnNewEvent(&ev, types.StreamPosition(syncStreamPos)) return nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go new file mode 100644 index 000000000..cc986579f --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -0,0 +1,67 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "sync" + + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +// Notifier will wake up sleeping requests in the request pool when there +// is some new data. It does not tell requests what that data is, only the +// stream position which they can use to get at it. +type Notifier struct { + // The latest sync stream position: guarded by 'cond'. + currPos types.StreamPosition + // A condition variable to notify all waiting goroutines of a new sync stream position + cond *sync.Cond +} + +// NewNotifier creates a new notifier set to the given stream position. +func NewNotifier(pos types.StreamPosition) *Notifier { + return &Notifier{ + pos, + sync.NewCond(&sync.Mutex{}), + } +} + +// OnNewEvent is called when a new event is received from the room server. Must only be +// called from a single goroutine, to avoid races between updates which could set the +// current position in the stream incorrectly. +func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { + // update the current position in a guard and then notify all /sync streams + n.cond.L.Lock() + n.currPos = pos + n.cond.L.Unlock() + + n.cond.Broadcast() // notify ALL waiting goroutines +} + +// WaitForEvents blocks until there are new events for this request. +func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { + // In a guard, check if the /sync request should block, and block it until we get a new position + n.cond.L.Lock() + currentPos := n.currPos + for req.since == currentPos { + // we need to wait for a new event. + // TODO: This waits for ANY new event, we need to only wait for events which we care about. + n.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock + currentPos = n.currPos + } + n.cond.L.Unlock() + return currentPos +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 124922b27..eee117e76 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -16,7 +16,6 @@ package sync import ( "net/http" - "sync" "time" log "github.com/Sirupsen/logrus" @@ -31,20 +30,13 @@ import ( // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db *storage.SyncServerDatabase - // The latest sync stream position: guarded by 'cond'. - currPos types.StreamPosition - // A condition variable to notify all waiting goroutines of a new sync stream position - cond *sync.Cond + db *storage.SyncServerDatabase + notifier *Notifier } // NewRequestPool makes a new RequestPool -func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) { - pos, err := db.SyncStreamPosition() - if err != nil { - return nil, err - } - return &RequestPool{db, types.StreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil +func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier) *RequestPool { + return &RequestPool{db, n} } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -106,34 +98,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } } -// OnNewEvent is called when a new event is received from the room server. Must only be -// called from a single goroutine, to avoid races between updates which could set the -// current position in the stream incorrectly. -func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { - // update the current position in a guard and then notify all /sync streams - rp.cond.L.Lock() - rp.currPos = pos - rp.cond.L.Unlock() - - rp.cond.Broadcast() // notify ALL waiting goroutines -} - -func (rp *RequestPool) waitForEvents(req syncRequest) types.StreamPosition { - // In a guard, check if the /sync request should block, and block it until we get a new position - rp.cond.L.Lock() - currentPos := rp.currPos - for req.since == currentPos { - // we need to wait for a new event. - // TODO: This waits for ANY new event, we need to only wait for events which we care about. - rp.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock - currentPos = rp.currPos - } - rp.cond.L.Unlock() - return currentPos -} - func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) { - currentPos := rp.waitForEvents(req) + currentPos := rp.notifier.WaitForEvents(req) if req.since == types.StreamPosition(0) { pos, data, err := rp.db.CompleteSync(req.userID, req.limit) From 042d636e3491c6dd4ca6a1e26fdf796d31a1a72d Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Wed, 10 May 2017 15:43:58 +0200 Subject: [PATCH 2/2] Update github.com/matrix-org/util for request context fix for logging --- vendor/manifest | 4 +-- vendor/src/github.com/matrix-org/util/json.go | 33 ++++++++++++------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/vendor/manifest b/vendor/manifest index 35d58db8b..c2c83088a 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -98,7 +98,7 @@ { "importpath": "github.com/matrix-org/util", "repository": "https://github.com/matrix-org/util", - "revision": "bc9d5e2d2f68a2ca279fce0fa2f28a91ecf301ed", + "revision": "53326ed5598b226681112cbd441f59f3cffc9c82", "branch": "master" }, { @@ -212,4 +212,4 @@ "branch": "v2" } ] -} \ No newline at end of file +} diff --git a/vendor/src/github.com/matrix-org/util/json.go b/vendor/src/github.com/matrix-org/util/json.go index a30d73f8e..c02f08fe3 100644 --- a/vendor/src/github.com/matrix-org/util/json.go +++ b/vendor/src/github.com/matrix-org/util/json.go @@ -93,23 +93,32 @@ func Protect(handler http.HandlerFunc) http.HandlerFunc { } } +// RequestWithLogging sets up standard logging for http.Requests. +// http.Requests will have a logger (with a request ID/method/path logged) attached to the Context. +// This can be accessed via GetLogger(Context). +func RequestWithLogging(req *http.Request) *http.Request { + reqID := RandomString(12) + // Set a Logger and request ID on the context + ctx := context.WithValue(req.Context(), ctxValueLogger, log.WithFields(log.Fields{ + "req.method": req.Method, + "req.path": req.URL.Path, + "req.id": reqID, + })) + ctx = context.WithValue(ctx, ctxValueRequestID, reqID) + req = req.WithContext(ctx) + + logger := GetLogger(req.Context()) + logger.Print("Incoming request") + + return req +} + // MakeJSONAPI creates an HTTP handler which always responds to incoming requests with JSON responses. // Incoming http.Requests will have a logger (with a request ID/method/path logged) attached to the Context. // This can be accessed via GetLogger(Context). func MakeJSONAPI(handler JSONRequestHandler) http.HandlerFunc { return Protect(func(w http.ResponseWriter, req *http.Request) { - reqID := RandomString(12) - // Set a Logger and request ID on the context - ctx := context.WithValue(req.Context(), ctxValueLogger, log.WithFields(log.Fields{ - "req.method": req.Method, - "req.path": req.URL.Path, - "req.id": reqID, - })) - ctx = context.WithValue(ctx, ctxValueRequestID, reqID) - req = req.WithContext(ctx) - - logger := GetLogger(req.Context()) - logger.Print("Incoming request") + req = RequestWithLogging(req) if req.Method == "OPTIONS" { SetCORSHeaders(w)