From 99b2923ce754580fdf436ce631eaa218142f1e84 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 7 Apr 2017 14:33:28 +0100 Subject: [PATCH] WIP waiting on Cond --- .../dendrite/clientapi/sync/requestpool.go | 46 +++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go index 2465be0c5..c97d011a5 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go @@ -1,9 +1,11 @@ package sync import ( + "context" "fmt" "net/http" "strconv" + "sync" "time" log "github.com/Sirupsen/logrus" @@ -26,8 +28,11 @@ type syncRequest struct { // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db *storage.SyncServerDatabase + db *storage.SyncServerDatabase + // The latest sync stream position: guarded by 'cond'. currPos syncStreamPosition + // A condition variable to notify all waiting goroutines of a new sync stream position + cond *sync.Cond } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -61,9 +66,14 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons "userID": userID, "since": since, "timeout": timeout, + "current": rp.currPos, }).Info("Incoming /sync request") - res, err := rp.currentSyncForUser(syncReq) + // set the timeout going + ctx, cancel := context.WithTimeout(req.Context(), timeout) + defer cancel() + + res, err := rp.currentSyncForUser(ctx, syncReq) if err != nil { return httputil.LogThenError(req, err) } @@ -76,16 +86,32 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons // OnNewEvent is called when a new event is received from the room server. Must only be // called from a single goroutine. func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) { + // update the current position in a guard and then notify all /sync streams + rp.cond.L.Lock() fmt.Println("OnNewEvent =>", ev.EventID(), " pos=", pos, " old_pos=", rp.currPos) rp.currPos = pos + rp.cond.L.Unlock() + + rp.cond.Broadcast() // notify ALL waiting goroutines } -func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) { - if req.since == rp.currPos { - // wait for new event +func (rp *RequestPool) waitForEvents(req syncRequest) syncStreamPosition { + // In a guard, check if the /sync request should block, and block it until we get a new position + rp.cond.L.Lock() + currentPos := rp.currPos + for req.since == currentPos { + // we need to wait for a new event. + // TODO: This waits for ANY new event, we need to only wait for events which we care about. + rp.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock + currentPos = rp.currPos } + rp.cond.L.Unlock() + return currentPos +} - return rp.db.EventsInRange(int64(req.since), int64(rp.currPos)) +func (rp *RequestPool) currentSyncForUser(ctx context.Context, req syncRequest) ([]gomatrixserverlib.Event, error) { + currentPos := rp.waitForEvents(req) + return rp.db.EventsInRange(int64(req.since), int64(currentPos)) // https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L179 // Check if we are going to return immediately and if so, calculate the current @@ -112,12 +138,6 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib. // Steps: (partial token, more than threshold (too expensive to do the above)) // - Ignore for now, meaning this code path will be horrendously slow. - - // TODO: wait for an event which affects this user or one of their rooms, then recheck for new - // sync data. - time.Sleep(req.timeout) - - return nil, nil } func getTimeout(timeoutMS string) time.Duration { @@ -148,5 +168,5 @@ func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) { if err != nil { return nil, err } - return &RequestPool{db, syncStreamPosition(pos)}, nil + return &RequestPool{db, syncStreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil }