WIP waiting on Cond

This commit is contained in:
Kegan Dougal 2017-04-07 14:33:28 +01:00
parent 3c43e514fb
commit 99b2923ce7

View file

@ -1,9 +1,11 @@
package sync
import (
"context"
"fmt"
"net/http"
"strconv"
"sync"
"time"
log "github.com/Sirupsen/logrus"
@ -26,8 +28,11 @@ type syncRequest struct {
// RequestPool manages HTTP long-poll connections for /sync
type RequestPool struct {
db *storage.SyncServerDatabase
db *storage.SyncServerDatabase
// The latest sync stream position: guarded by 'cond'.
currPos syncStreamPosition
// A condition variable to notify all waiting goroutines of a new sync stream position
cond *sync.Cond
}
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
@ -61,9 +66,14 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
"userID": userID,
"since": since,
"timeout": timeout,
"current": rp.currPos,
}).Info("Incoming /sync request")
res, err := rp.currentSyncForUser(syncReq)
// set the timeout going
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
res, err := rp.currentSyncForUser(ctx, syncReq)
if err != nil {
return httputil.LogThenError(req, err)
}
@ -76,16 +86,32 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
// OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine.
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) {
// update the current position in a guard and then notify all /sync streams
rp.cond.L.Lock()
fmt.Println("OnNewEvent =>", ev.EventID(), " pos=", pos, " old_pos=", rp.currPos)
rp.currPos = pos
rp.cond.L.Unlock()
rp.cond.Broadcast() // notify ALL waiting goroutines
}
func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) {
if req.since == rp.currPos {
// wait for new event
func (rp *RequestPool) waitForEvents(req syncRequest) syncStreamPosition {
// In a guard, check if the /sync request should block, and block it until we get a new position
rp.cond.L.Lock()
currentPos := rp.currPos
for req.since == currentPos {
// we need to wait for a new event.
// TODO: This waits for ANY new event, we need to only wait for events which we care about.
rp.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock
currentPos = rp.currPos
}
rp.cond.L.Unlock()
return currentPos
}
return rp.db.EventsInRange(int64(req.since), int64(rp.currPos))
func (rp *RequestPool) currentSyncForUser(ctx context.Context, req syncRequest) ([]gomatrixserverlib.Event, error) {
currentPos := rp.waitForEvents(req)
return rp.db.EventsInRange(int64(req.since), int64(currentPos))
// https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L179
// Check if we are going to return immediately and if so, calculate the current
@ -112,12 +138,6 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.
// Steps: (partial token, more than threshold (too expensive to do the above))
// - Ignore for now, meaning this code path will be horrendously slow.
// TODO: wait for an event which affects this user or one of their rooms, then recheck for new
// sync data.
time.Sleep(req.timeout)
return nil, nil
}
func getTimeout(timeoutMS string) time.Duration {
@ -148,5 +168,5 @@ func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) {
if err != nil {
return nil, err
}
return &RequestPool{db, syncStreamPosition(pos)}, nil
return &RequestPool{db, syncStreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil
}