From de9e3e5417c86729db69724ff3fcc50e615be475 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 7 Apr 2017 14:32:42 +0100 Subject: [PATCH] Add structs for HTTP long-polling (#56) --- .../dendrite/clientapi/readers/sync.go | 20 ---- .../dendrite/clientapi/routing/routing.go | 6 +- .../storage/output_room_events_table.go | 4 +- .../dendrite/clientapi/sync/requestpool.go | 101 ++++++++++++++++++ .../dendrite/clientapi/sync/syncserver.go | 5 +- .../dendrite/cmd/dendrite-sync-server/main.go | 6 +- 6 files changed, 114 insertions(+), 28 deletions(-) delete mode 100644 src/github.com/matrix-org/dendrite/clientapi/readers/sync.go create mode 100644 src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/sync.go b/src/github.com/matrix-org/dendrite/clientapi/readers/sync.go deleted file mode 100644 index c4b3a2474..000000000 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/sync.go +++ /dev/null @@ -1,20 +0,0 @@ -package readers - -import ( - "net/http" - - "github.com/matrix-org/dendrite/clientapi/auth" - "github.com/matrix-org/util" -) - -// Sync implements /sync -func Sync(req *http.Request) util.JSONResponse { - logger := util.GetLogger(req.Context()) - userID, resErr := auth.VerifyAccessToken(req) - if resErr != nil { - return *resErr - } - - logger.WithField("userID", userID).Info("Doing stuff...") - return util.MessageResponse(404, "Not implemented yet") -} diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 499881b41..9679fab63 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -6,7 +6,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/producers" - "github.com/matrix-org/dendrite/clientapi/readers" + "github.com/matrix-org/dendrite/clientapi/sync" "github.com/matrix-org/dendrite/clientapi/writers" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/util" @@ -49,11 +49,11 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI } // SetupSyncServerListeners configures the given mux with sync-server listeners -func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync) { +func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, srp sync.RequestPool) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { - return readers.Sync(req) + return srp.OnIncomingSyncRequest(req) }))) servMux.Handle("/metrics", prometheus.Handler()) servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go index a31759c3a..9c75940cf 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go @@ -72,7 +72,7 @@ func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]g } defer rows.Close() - result := make([]gomatrixserverlib.Event, len(eventIDs)) + var result []gomatrixserverlib.Event i := 0 for ; rows.Next(); i++ { var eventBytes []byte @@ -86,7 +86,7 @@ func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]g result = append(result, ev) } if i != len(eventIDs) { - return nil, fmt.Errorf("failed to map all event IDs to events: (%d != %d)", i, len(eventIDs)) + return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", i, len(eventIDs)) } return result, nil } diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go new file mode 100644 index 000000000..62b16a1f8 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go @@ -0,0 +1,101 @@ +package sync + +import ( + "net/http" + "strconv" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/auth" + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/storage" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +const defaultSyncTimeout = time.Duration(30) * time.Second + +type syncRequest struct { + userID string + timeout time.Duration + since string + wantFullState bool +} + +// RequestPool manages HTTP long-poll connections for /sync +type RequestPool struct { + db *storage.SyncServerDatabase +} + +// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be +// called in a dedicated goroutine for this request. This function will block the goroutine +// until a response is ready, or it times out. +func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONResponse { + // Extract values from request + logger := util.GetLogger(req.Context()) + userID, resErr := auth.VerifyAccessToken(req) + if resErr != nil { + return *resErr + } + since := req.URL.Query().Get("since") + timeout := getTimeout(req.URL.Query().Get("timeout")) + fullState := req.URL.Query().Get("full_state") + wantFullState := fullState != "" && fullState != "false" + // TODO: Additional query params: set_presence, filter + syncReq := syncRequest{ + userID: userID, + timeout: timeout, + since: since, + wantFullState: wantFullState, + } + logger.WithFields(log.Fields{ + "userID": userID, + "since": since, + "timeout": timeout, + }).Info("Incoming /sync request") + + res, err := rp.currentSyncForUser(syncReq) + if err != nil { + return httputil.LogThenError(req, err) + } + return util.JSONResponse{ + Code: 200, + JSON: res, + } +} + +// OnNewEvent is called when a new event is received from the room server +func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event) { + +} + +func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) { + // https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L179 + // Check if we are going to return immediately and if so, calculate the current + // sync for this user and return. + if req.since == "" || req.timeout == time.Duration(0) || req.wantFullState { + return []gomatrixserverlib.Event{}, nil + } + + // TODO: wait for an event which affects this user or one of their rooms, then recheck for new + // sync data. + time.Sleep(req.timeout) + + return nil, nil +} + +func getTimeout(timeoutMS string) time.Duration { + if timeoutMS == "" { + return defaultSyncTimeout + } + i, err := strconv.Atoi(timeoutMS) + if err != nil { + return defaultSyncTimeout + } + return time.Duration(i) * time.Millisecond +} + +// NewRequestPool makes a new RequestPool +func NewRequestPool(db *storage.SyncServerDatabase) RequestPool { + return RequestPool{db} +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go index 1a0d0d610..5454c6985 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -16,10 +16,11 @@ import ( type Server struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase + rp RequestPool } // NewServer creates a new sync server. Call Start() to begin consuming from room servers. -func NewServer(cfg *config.Sync, store *storage.SyncServerDatabase) (*Server, error) { +func NewServer(cfg *config.Sync, rp RequestPool, store *storage.SyncServerDatabase) (*Server, error) { kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) if err != nil { return nil, err @@ -33,6 +34,7 @@ func NewServer(cfg *config.Sync, store *storage.SyncServerDatabase) (*Server, er s := &Server{ roomServerConsumer: &consumer, db: store, + rp: rp, } consumer.ProcessMessage = s.onMessage @@ -73,6 +75,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } + s.rp.OnNewEvent(&ev) return nil } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index 0c11448ad..e04797c28 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -74,7 +74,9 @@ func main() { log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) } - server, err := sync.NewServer(cfg, db) + rp := sync.NewRequestPool(db) + + server, err := sync.NewServer(cfg, rp, db) if err != nil { log.Panicf("startup: failed to create sync server: %s", err) } @@ -83,6 +85,6 @@ func main() { } log.Info("Starting sync server on ", *bindAddr) - routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg) + routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, rp) log.Fatal(http.ListenAndServe(*bindAddr, nil)) }