diff --git a/src/github.com/matrix-org/dendrite/syncserver/sync/request.go b/src/github.com/matrix-org/dendrite/syncserver/sync/request.go new file mode 100644 index 000000000..40108a2b8 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncserver/sync/request.go @@ -0,0 +1,60 @@ +package sync + +import ( + "github.com/matrix-org/dendrite/syncserver/types" + "net/http" + "strconv" + "time" +) + +const defaultSyncTimeout = time.Duration(30) * time.Second +const defaultTimelineLimit = 20 + +// syncRequest represents a /sync request, with sensible defaults/sanity checks applied. +type syncRequest struct { + userID string + limit int + timeout time.Duration + since types.StreamPosition + wantFullState bool +} + +func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) { + timeout := getTimeout(req.URL.Query().Get("timeout")) + fullState := req.URL.Query().Get("full_state") + wantFullState := fullState != "" && fullState != "false" + since, err := getSyncStreamPosition(req.URL.Query().Get("since")) + if err != nil { + return nil, err + } + // TODO: Additional query params: set_presence, filter + return &syncRequest{ + userID: userID, + timeout: timeout, + since: since, + wantFullState: wantFullState, + limit: defaultTimelineLimit, // TODO: read from filter + }, nil +} + +func getTimeout(timeoutMS string) time.Duration { + if timeoutMS == "" { + return defaultSyncTimeout + } + i, err := strconv.Atoi(timeoutMS) + if err != nil { + return defaultSyncTimeout + } + return time.Duration(i) * time.Millisecond +} + +func getSyncStreamPosition(since string) (types.StreamPosition, error) { + if since == "" { + return types.StreamPosition(0), nil + } + i, err := strconv.Atoi(since) + if err != nil { + return types.StreamPosition(0), err + } + return types.StreamPosition(i), nil +} diff --git a/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go index 5d988e2ca..15c588455 100644 --- a/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go @@ -2,7 +2,6 @@ package sync import ( "net/http" - "strconv" "sync" "time" @@ -16,15 +15,6 @@ import ( "github.com/matrix-org/util" ) -const defaultSyncTimeout = time.Duration(30) * time.Second - -type syncRequest struct { - userID string - timeout time.Duration - since types.StreamPosition - wantFullState bool -} - // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { db *storage.SyncServerDatabase @@ -53,40 +43,30 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons if resErr != nil { return *resErr } - since, err := getSyncStreamPosition(req.URL.Query().Get("since")) + syncReq, err := newSyncRequest(req, userID) if err != nil { return util.JSONResponse{ Code: 400, JSON: jsonerror.Unknown(err.Error()), } } - timeout := getTimeout(req.URL.Query().Get("timeout")) - fullState := req.URL.Query().Get("full_state") - wantFullState := fullState != "" && fullState != "false" - // TODO: Additional query params: set_presence, filter - syncReq := syncRequest{ - userID: userID, - timeout: timeout, - since: since, - wantFullState: wantFullState, - } logger.WithFields(log.Fields{ "userID": userID, - "since": since, - "timeout": timeout, + "since": syncReq.since, + "timeout": syncReq.timeout, }).Info("Incoming /sync request") // Fork off 2 goroutines: one to do the work, and one to serve as a timeout. // Whichever returns first is the one we will serve back to the client. // TODO: Currently this means that cpu work is timed, which may not be what we want long term. timeoutChan := make(chan struct{}) - timer := time.AfterFunc(timeout, func() { + timer := time.AfterFunc(syncReq.timeout, func() { close(timeoutChan) // signal that the timeout has expired }) done := make(chan util.JSONResponse) go func() { - syncData, err := rp.currentSyncForUser(syncReq) + syncData, err := rp.currentSyncForUser(*syncReq) timer.Stop() var res util.JSONResponse if err != nil { @@ -105,7 +85,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons case <-timeoutChan: // timeout fired return util.JSONResponse{ Code: 200, - JSON: []struct{}{}, // return empty array for now + JSON: types.NewResponse(syncReq.since), } case res := <-done: // received a response return res @@ -140,12 +120,11 @@ func (rp *RequestPool) waitForEvents(req syncRequest) types.StreamPosition { func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) { if req.since == types.StreamPosition(0) { - pos, data, err := rp.db.CompleteSync(req.userID, 3) + pos, data, err := rp.db.CompleteSync(req.userID, req.limit) if err != nil { return nil, err } - res := types.NewResponse() - res.NextBatch = pos.String() + res := types.NewResponse(pos) for roomID, d := range data { jr := types.NewJoinResponse() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) @@ -175,7 +154,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, err return nil, err } - res := types.NewResponse() + res := types.NewResponse(currentPos) // for now, dump everything as join timeline events for _, ev := range evs { roomData := res.Rooms.Join[ev.RoomID()] @@ -188,25 +167,3 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, err res.NextBatch = currentPos.String() return res, nil } - -func getTimeout(timeoutMS string) time.Duration { - if timeoutMS == "" { - return defaultSyncTimeout - } - i, err := strconv.Atoi(timeoutMS) - if err != nil { - return defaultSyncTimeout - } - return time.Duration(i) * time.Millisecond -} - -func getSyncStreamPosition(since string) (types.StreamPosition, error) { - if since == "" { - return types.StreamPosition(0), nil - } - i, err := strconv.Atoi(since) - if err != nil { - return types.StreamPosition(0), err - } - return types.StreamPosition(i), nil -} diff --git a/src/github.com/matrix-org/dendrite/syncserver/types/types.go b/src/github.com/matrix-org/dendrite/syncserver/types/types.go index 1e8a37a08..7d904c146 100644 --- a/src/github.com/matrix-org/dendrite/syncserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/syncserver/types/types.go @@ -36,8 +36,9 @@ type Response struct { } // NewResponse creates an empty response with initialised maps. -func NewResponse() *Response { +func NewResponse(pos StreamPosition) *Response { res := Response{} + res.NextBatch = pos.String() // Pre-initalise the maps. Synapse will return {} even if there are no rooms under a specific section, // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. res.Rooms.Join = make(map[string]JoinResponse)