From fad7e9541bab73c83a70687d05cdee7b35884723 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Thu, 20 Apr 2017 11:18:26 +0100 Subject: [PATCH 1/2] /sync: Handle missing state events and return events in the correct order (#72) --- .../storage/output_room_events_table.go | 49 ++++++++++++++----- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go index 5d9fba9b4..0a873c4ee 100644 --- a/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go @@ -146,19 +146,15 @@ func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos ty eventIDToEvent[ev.EventID()] = ev } - stateBetween, missingEvents := mapEventIDsToEvents(eventIDToEvent, stateNeeded) - - if len(missingEvents) > 0 { - return nil, fmt.Errorf("error StateBetween: TODO missing events") - } - return stateBetween, nil + return s.fetchStateEvents(txn, stateNeeded, eventIDToEvent) } -// convert the set of event IDs into a set of events. Mark any which are missing. -func mapEventIDsToEvents(eventIDToEvent map[string]gomatrixserverlib.Event, stateNeeded map[string]map[string]bool) (map[string][]gomatrixserverlib.Event, map[string][]string) { +// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database. +// Returns a map of room ID to list of events. +func (s *outputRoomEventsStatements) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet map[string]map[string]bool, eventIDToEvent map[string]gomatrixserverlib.Event) (map[string][]gomatrixserverlib.Event, error) { stateBetween := make(map[string][]gomatrixserverlib.Event) missingEvents := make(map[string][]string) - for roomID, ids := range stateNeeded { + for roomID, ids := range roomIDToEventIDSet { events := stateBetween[roomID] for id, need := range ids { if !need { @@ -175,7 +171,25 @@ func mapEventIDsToEvents(eventIDToEvent map[string]gomatrixserverlib.Event, stat } stateBetween[roomID] = events } - return stateBetween, missingEvents + + if len(missingEvents) > 0 { + // This happens when add_state_ids has an event ID which is not in the provided range. + // We need to explicitly fetch them. + allMissingEventIDs := []string{} + for _, missingEvIDs := range missingEvents { + allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...) + } + evs, err := s.Events(txn, allMissingEventIDs) + if err != nil { + return nil, err + } + // we know we got them all otherwise an error would've been returned, so just loop the events + for _, ev := range evs { + roomID := ev.RoomID() + stateBetween[roomID] = append(stateBetween[roomID], ev) + } + } + return stateBetween, nil } // MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied, @@ -210,7 +224,13 @@ func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID stri return nil, err } defer rows.Close() - return rowsToEvents(rows) + events, err := rowsToEvents(rows) + if err != nil { + return nil, err + } + // reverse the order because [0] is the newest event due to the ORDER BY in SQL-land. The reverse order makes [0] the oldest event, + // which is correct for /sync responses. + return reverseEvents(events), nil } // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing @@ -248,3 +268,10 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { } return result, nil } + +func reverseEvents(input []gomatrixserverlib.Event) (output []gomatrixserverlib.Event) { + for i := len(input) - 1; i >= 0; i-- { + output = append(output, input[i]) + } + return +} From db428174d2dfcf3e10d201241a64e4bac20ec426 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Thu, 20 Apr 2017 14:00:34 +0100 Subject: [PATCH 2/2] tool: Add client-api-proxy (#73) --- .../dendrite/cmd/client-api-proxy/main.go | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 src/github.com/matrix-org/dendrite/cmd/client-api-proxy/main.go diff --git a/src/github.com/matrix-org/dendrite/cmd/client-api-proxy/main.go b/src/github.com/matrix-org/dendrite/cmd/client-api-proxy/main.go new file mode 100644 index 000000000..b725e6cf2 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/cmd/client-api-proxy/main.go @@ -0,0 +1,122 @@ +package main + +import ( + "flag" + "fmt" + log "github.com/Sirupsen/logrus" + "net/http" + "net/http/httputil" + "net/url" + "os" + "strings" + "time" +) + +const usage = `Usage: %s + +Create a single endpoint URL which clients can be pointed at. + +The client-server API in Dendrite is split across multiple processes +which listen on multiple ports. You cannot point a Matrix client at +any of those ports, as there will be unimplemented functionality. +In addition, all client-server API processes start with the additional +path prefix '/api', which Matrix clients will be unaware of. + +This tool will proxy requests for all client-server URLs and forward +them to their respective process. It will also add the '/api' path +prefix to incoming requests. + +THIS TOOL IS FOR TESTING AND NOT INTENDED FOR PRODUCTION USE. + +Arguments: + +` + +var ( + syncServerURL = flag.String("sync-server-url", "", "The base URL of the listening 'dendrite-sync-server' process. E.g. 'http://localhost:4200'") + clientAPIURL = flag.String("client-api-url", "", "The base URL of the listening 'dendrite-client-api' process. E.g. 'http://localhost:4321'") + bindAddress = flag.String("bind-address", ":8008", "The listening port for the proxy.") +) + +func makeProxy(targetURL string) (*httputil.ReverseProxy, error) { + if !strings.HasSuffix(targetURL, "/") { + targetURL += "/" + } + // Check that we can parse the URL. + _, err := url.Parse(targetURL) + if err != nil { + return nil, err + } + return &httputil.ReverseProxy{ + Director: func(req *http.Request) { + // URL.Path() removes the % escaping from the path. + // The % encoding will be added back when the url is encoded + // when the request is forwarded. + // This means that we will lose any unessecary escaping from the URL. + // Pratically this means that any distinction between '%2F' and '/' + // in the URL will be lost by the time it reaches the target. + path := req.URL.Path + path = "api" + path + log.WithFields(log.Fields{ + "path": path, + "url": targetURL, + "method": req.Method, + }).Print("proxying request") + newURL, err := url.Parse(targetURL + path) + if err != nil { + // We already checked that we can parse the URL + // So this shouldn't ever get hit. + panic(err) + } + // Copy the query parameters from the request. + newURL.RawQuery = req.URL.RawQuery + req.URL = newURL + }, + }, nil +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, usage, os.Args[0]) + flag.PrintDefaults() + } + + flag.Parse() + + if *syncServerURL == "" { + flag.Usage() + fmt.Fprintln(os.Stderr, "no --sync-server-url specified.") + os.Exit(1) + } + + if *clientAPIURL == "" { + flag.Usage() + fmt.Fprintln(os.Stderr, "no --client-api-url specified.") + os.Exit(1) + } + + syncProxy, err := makeProxy(*syncServerURL) + if err != nil { + panic(err) + } + clientProxy, err := makeProxy(*clientAPIURL) + if err != nil { + panic(err) + } + + http.Handle("/_matrix/client/r0/sync", syncProxy) + http.Handle("/", clientProxy) + + srv := &http.Server{ + Addr: *bindAddress, + ReadTimeout: 1 * time.Minute, // how long we wait for the client to send the entire request (after connection accept) + WriteTimeout: 5 * time.Minute, // how long the proxy has to write the full response + } + + fmt.Println("Proxying requests to:") + fmt.Println(" /_matrix/client/r0/sync => ", *syncServerURL+"/api/_matrix/client/r0/sync") + fmt.Println(" /* => ", *clientAPIURL+"/api/*") + fmt.Println("Listening on ", *bindAddress) + srv.ListenAndServe() + +}