Factor out syncRequest parsing to its own file (#70)

Also ensure every sync response has a `next_batch` token by forcing
it in the constructor, as previously timeouts would not have a token.
This commit is contained in:
Kegsay 2017-04-18 10:32:32 +01:00 committed by GitHub
parent 53ec4a255b
commit f18d935134
3 changed files with 71 additions and 53 deletions

View file

@ -0,0 +1,60 @@
package sync
import (
"github.com/matrix-org/dendrite/syncserver/types"
"net/http"
"strconv"
"time"
)
const defaultSyncTimeout = time.Duration(30) * time.Second
const defaultTimelineLimit = 20
// syncRequest represents a /sync request, with sensible defaults/sanity checks applied.
type syncRequest struct {
userID string
limit int
timeout time.Duration
since types.StreamPosition
wantFullState bool
}
func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) {
timeout := getTimeout(req.URL.Query().Get("timeout"))
fullState := req.URL.Query().Get("full_state")
wantFullState := fullState != "" && fullState != "false"
since, err := getSyncStreamPosition(req.URL.Query().Get("since"))
if err != nil {
return nil, err
}
// TODO: Additional query params: set_presence, filter
return &syncRequest{
userID: userID,
timeout: timeout,
since: since,
wantFullState: wantFullState,
limit: defaultTimelineLimit, // TODO: read from filter
}, nil
}
func getTimeout(timeoutMS string) time.Duration {
if timeoutMS == "" {
return defaultSyncTimeout
}
i, err := strconv.Atoi(timeoutMS)
if err != nil {
return defaultSyncTimeout
}
return time.Duration(i) * time.Millisecond
}
func getSyncStreamPosition(since string) (types.StreamPosition, error) {
if since == "" {
return types.StreamPosition(0), nil
}
i, err := strconv.Atoi(since)
if err != nil {
return types.StreamPosition(0), err
}
return types.StreamPosition(i), nil
}

View file

@ -2,7 +2,6 @@ package sync
import ( import (
"net/http" "net/http"
"strconv"
"sync" "sync"
"time" "time"
@ -16,15 +15,6 @@ import (
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
const defaultSyncTimeout = time.Duration(30) * time.Second
type syncRequest struct {
userID string
timeout time.Duration
since types.StreamPosition
wantFullState bool
}
// RequestPool manages HTTP long-poll connections for /sync // RequestPool manages HTTP long-poll connections for /sync
type RequestPool struct { type RequestPool struct {
db *storage.SyncServerDatabase db *storage.SyncServerDatabase
@ -53,40 +43,30 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
if resErr != nil { if resErr != nil {
return *resErr return *resErr
} }
since, err := getSyncStreamPosition(req.URL.Query().Get("since")) syncReq, err := newSyncRequest(req, userID)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: 400, Code: 400,
JSON: jsonerror.Unknown(err.Error()), JSON: jsonerror.Unknown(err.Error()),
} }
} }
timeout := getTimeout(req.URL.Query().Get("timeout"))
fullState := req.URL.Query().Get("full_state")
wantFullState := fullState != "" && fullState != "false"
// TODO: Additional query params: set_presence, filter
syncReq := syncRequest{
userID: userID,
timeout: timeout,
since: since,
wantFullState: wantFullState,
}
logger.WithFields(log.Fields{ logger.WithFields(log.Fields{
"userID": userID, "userID": userID,
"since": since, "since": syncReq.since,
"timeout": timeout, "timeout": syncReq.timeout,
}).Info("Incoming /sync request") }).Info("Incoming /sync request")
// Fork off 2 goroutines: one to do the work, and one to serve as a timeout. // Fork off 2 goroutines: one to do the work, and one to serve as a timeout.
// Whichever returns first is the one we will serve back to the client. // Whichever returns first is the one we will serve back to the client.
// TODO: Currently this means that cpu work is timed, which may not be what we want long term. // TODO: Currently this means that cpu work is timed, which may not be what we want long term.
timeoutChan := make(chan struct{}) timeoutChan := make(chan struct{})
timer := time.AfterFunc(timeout, func() { timer := time.AfterFunc(syncReq.timeout, func() {
close(timeoutChan) // signal that the timeout has expired close(timeoutChan) // signal that the timeout has expired
}) })
done := make(chan util.JSONResponse) done := make(chan util.JSONResponse)
go func() { go func() {
syncData, err := rp.currentSyncForUser(syncReq) syncData, err := rp.currentSyncForUser(*syncReq)
timer.Stop() timer.Stop()
var res util.JSONResponse var res util.JSONResponse
if err != nil { if err != nil {
@ -105,7 +85,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
case <-timeoutChan: // timeout fired case <-timeoutChan: // timeout fired
return util.JSONResponse{ return util.JSONResponse{
Code: 200, Code: 200,
JSON: []struct{}{}, // return empty array for now JSON: types.NewResponse(syncReq.since),
} }
case res := <-done: // received a response case res := <-done: // received a response
return res return res
@ -140,12 +120,11 @@ func (rp *RequestPool) waitForEvents(req syncRequest) types.StreamPosition {
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) { func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
if req.since == types.StreamPosition(0) { if req.since == types.StreamPosition(0) {
pos, data, err := rp.db.CompleteSync(req.userID, 3) pos, data, err := rp.db.CompleteSync(req.userID, req.limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }
res := types.NewResponse() res := types.NewResponse(pos)
res.NextBatch = pos.String()
for roomID, d := range data { for roomID, d := range data {
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync)
@ -175,7 +154,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, err
return nil, err return nil, err
} }
res := types.NewResponse() res := types.NewResponse(currentPos)
// for now, dump everything as join timeline events // for now, dump everything as join timeline events
for _, ev := range evs { for _, ev := range evs {
roomData := res.Rooms.Join[ev.RoomID()] roomData := res.Rooms.Join[ev.RoomID()]
@ -188,25 +167,3 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, err
res.NextBatch = currentPos.String() res.NextBatch = currentPos.String()
return res, nil return res, nil
} }
func getTimeout(timeoutMS string) time.Duration {
if timeoutMS == "" {
return defaultSyncTimeout
}
i, err := strconv.Atoi(timeoutMS)
if err != nil {
return defaultSyncTimeout
}
return time.Duration(i) * time.Millisecond
}
func getSyncStreamPosition(since string) (types.StreamPosition, error) {
if since == "" {
return types.StreamPosition(0), nil
}
i, err := strconv.Atoi(since)
if err != nil {
return types.StreamPosition(0), err
}
return types.StreamPosition(i), nil
}

View file

@ -36,8 +36,9 @@ type Response struct {
} }
// NewResponse creates an empty response with initialised maps. // NewResponse creates an empty response with initialised maps.
func NewResponse() *Response { func NewResponse(pos StreamPosition) *Response {
res := Response{} res := Response{}
res.NextBatch = pos.String()
// Pre-initalise the maps. Synapse will return {} even if there are no rooms under a specific section, // Pre-initalise the maps. Synapse will return {} even if there are no rooms under a specific section,
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
res.Rooms.Join = make(map[string]JoinResponse) res.Rooms.Join = make(map[string]JoinResponse)