diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index 750f02038..f67bf0e5e 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" + "github.com/matrix-org/dendrite/syncapi/types" log "github.com/Sirupsen/logrus" yaml "gopkg.in/yaml.v2" @@ -71,12 +72,13 @@ func main() { log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) } - rp, err := sync.NewRequestPool(db) + pos, err := db.SyncStreamPosition() if err != nil { - log.Panicf("startup: Failed to create request pool : %s", err) + log.Panicf("startup: failed to get latest sync stream position : %s", err) } - server, err := consumers.NewServer(cfg, rp, db) + n := sync.NewNotifier(types.StreamPosition(pos)) + server, err := consumers.NewServer(cfg, n, db) if err != nil { log.Panicf("startup: failed to create sync server: %s", err) } @@ -85,6 +87,6 @@ func main() { } log.Info("Starting sync server on ", *bindAddr) - routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, rp) + routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, sync.NewRequestPool(db, n)) log.Fatal(http.ListenAndServe(*bindAddr, nil)) } diff --git a/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go b/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go index 844a2c5ab..7d9725f72 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go @@ -39,7 +39,7 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.MediaAPI, r0mux.Handle("/download/{serverName}/{mediaId}", prometheus.InstrumentHandler("download", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - util.SetupRequestLogging(req) + req = util.RequestWithLogging(req) // Set common headers returned regardless of the outcome of the request util.SetCORSHeaders(w) diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index b9737163d..4d703ab32 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -32,11 +32,11 @@ import ( type Server struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase - rp *sync.RequestPool + notifier *sync.Notifier } // NewServer creates a new sync server. Call Start() to begin consuming from room servers. -func NewServer(cfg *config.Sync, rp *sync.RequestPool, store *storage.SyncServerDatabase) (*Server, error) { +func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*Server, error) { kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) if err != nil { return nil, err @@ -50,7 +50,7 @@ func NewServer(cfg *config.Sync, rp *sync.RequestPool, store *storage.SyncServer s := &Server{ roomServerConsumer: &consumer, db: store, - rp: rp, + notifier: n, } consumer.ProcessMessage = s.onMessage @@ -96,7 +96,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } - s.rp.OnNewEvent(&ev, types.StreamPosition(syncStreamPos)) + s.notifier.OnNewEvent(&ev, types.StreamPosition(syncStreamPos)) return nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go new file mode 100644 index 000000000..cc986579f --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -0,0 +1,67 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "sync" + + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +// Notifier will wake up sleeping requests in the request pool when there +// is some new data. It does not tell requests what that data is, only the +// stream position which they can use to get at it. +type Notifier struct { + // The latest sync stream position: guarded by 'cond'. + currPos types.StreamPosition + // A condition variable to notify all waiting goroutines of a new sync stream position + cond *sync.Cond +} + +// NewNotifier creates a new notifier set to the given stream position. +func NewNotifier(pos types.StreamPosition) *Notifier { + return &Notifier{ + pos, + sync.NewCond(&sync.Mutex{}), + } +} + +// OnNewEvent is called when a new event is received from the room server. Must only be +// called from a single goroutine, to avoid races between updates which could set the +// current position in the stream incorrectly. +func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { + // update the current position in a guard and then notify all /sync streams + n.cond.L.Lock() + n.currPos = pos + n.cond.L.Unlock() + + n.cond.Broadcast() // notify ALL waiting goroutines +} + +// WaitForEvents blocks until there are new events for this request. +func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { + // In a guard, check if the /sync request should block, and block it until we get a new position + n.cond.L.Lock() + currentPos := n.currPos + for req.since == currentPos { + // we need to wait for a new event. + // TODO: This waits for ANY new event, we need to only wait for events which we care about. + n.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock + currentPos = n.currPos + } + n.cond.L.Unlock() + return currentPos +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 124922b27..eee117e76 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -16,7 +16,6 @@ package sync import ( "net/http" - "sync" "time" log "github.com/Sirupsen/logrus" @@ -31,20 +30,13 @@ import ( // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db *storage.SyncServerDatabase - // The latest sync stream position: guarded by 'cond'. - currPos types.StreamPosition - // A condition variable to notify all waiting goroutines of a new sync stream position - cond *sync.Cond + db *storage.SyncServerDatabase + notifier *Notifier } // NewRequestPool makes a new RequestPool -func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) { - pos, err := db.SyncStreamPosition() - if err != nil { - return nil, err - } - return &RequestPool{db, types.StreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil +func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier) *RequestPool { + return &RequestPool{db, n} } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -106,34 +98,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } } -// OnNewEvent is called when a new event is received from the room server. Must only be -// called from a single goroutine, to avoid races between updates which could set the -// current position in the stream incorrectly. -func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { - // update the current position in a guard and then notify all /sync streams - rp.cond.L.Lock() - rp.currPos = pos - rp.cond.L.Unlock() - - rp.cond.Broadcast() // notify ALL waiting goroutines -} - -func (rp *RequestPool) waitForEvents(req syncRequest) types.StreamPosition { - // In a guard, check if the /sync request should block, and block it until we get a new position - rp.cond.L.Lock() - currentPos := rp.currPos - for req.since == currentPos { - // we need to wait for a new event. - // TODO: This waits for ANY new event, we need to only wait for events which we care about. - rp.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock - currentPos = rp.currPos - } - rp.cond.L.Unlock() - return currentPos -} - func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) { - currentPos := rp.waitForEvents(req) + currentPos := rp.notifier.WaitForEvents(req) if req.since == types.StreamPosition(0) { pos, data, err := rp.db.CompleteSync(req.userID, req.limit) diff --git a/vendor/manifest b/vendor/manifest index 550e04d13..c2c83088a 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -98,7 +98,7 @@ { "importpath": "github.com/matrix-org/util", "repository": "https://github.com/matrix-org/util", - "revision": "8b11d9882e131d58ff40525c8f7b9a7f4b811a43", + "revision": "53326ed5598b226681112cbd441f59f3cffc9c82", "branch": "master" }, { @@ -212,4 +212,4 @@ "branch": "v2" } ] -} \ No newline at end of file +} diff --git a/vendor/src/github.com/matrix-org/util/json.go b/vendor/src/github.com/matrix-org/util/json.go index a1b67656f..c02f08fe3 100644 --- a/vendor/src/github.com/matrix-org/util/json.go +++ b/vendor/src/github.com/matrix-org/util/json.go @@ -93,10 +93,10 @@ func Protect(handler http.HandlerFunc) http.HandlerFunc { } } -// SetupRequestLogging sets up standard logging for http.Requests. +// RequestWithLogging sets up standard logging for http.Requests. // http.Requests will have a logger (with a request ID/method/path logged) attached to the Context. // This can be accessed via GetLogger(Context). -func SetupRequestLogging(req *http.Request) { +func RequestWithLogging(req *http.Request) *http.Request { reqID := RandomString(12) // Set a Logger and request ID on the context ctx := context.WithValue(req.Context(), ctxValueLogger, log.WithFields(log.Fields{ @@ -109,6 +109,8 @@ func SetupRequestLogging(req *http.Request) { logger := GetLogger(req.Context()) logger.Print("Incoming request") + + return req } // MakeJSONAPI creates an HTTP handler which always responds to incoming requests with JSON responses. @@ -116,7 +118,7 @@ func SetupRequestLogging(req *http.Request) { // This can be accessed via GetLogger(Context). func MakeJSONAPI(handler JSONRequestHandler) http.HandlerFunc { return Protect(func(w http.ResponseWriter, req *http.Request) { - SetupRequestLogging(req) + req = RequestWithLogging(req) if req.Method == "OPTIONS" { SetCORSHeaders(w)