From 2e5667a4e209e4facc5e5aafcbe4ff815b7ea790 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Fri, 22 Jun 2018 15:59:59 +0530 Subject: [PATCH] [federation] implement get_missing_events Does not check whether server is allowed to see the events --- .../federationapi/routing/missingevents.go | 74 +++++++++++++++++++ .../dendrite/federationapi/routing/routing.go | 8 ++ .../dendrite/roomserver/api/query.go | 39 ++++++++++ .../dendrite/roomserver/query/query.go | 61 +++++++++++++++ 4 files changed, 182 insertions(+) create mode 100644 src/github.com/matrix-org/dendrite/federationapi/routing/missingevents.go diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/missingevents.go b/src/github.com/matrix-org/dendrite/federationapi/routing/missingevents.go new file mode 100644 index 000000000..e2c439f18 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationapi/routing/missingevents.go @@ -0,0 +1,74 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "encoding/json" + "net/http" + + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +type getMissingEventRequest struct { + api.QueryMissingEventsRequest + MinDepth int64 `json:"min_depth"` +} + +// GetMissingEvents returns missing event between earliest_events & latest_events. +// Events are fetched from room DAG starting from latest_events until we reach earliest_events or the limit +func GetMissingEvents( + httpReq *http.Request, + request *gomatrixserverlib.FederationRequest, + query api.RoomserverQueryAPI, + roomID string, +) util.JSONResponse { + var gme getMissingEventRequest + if err := json.Unmarshal(request.Content(), &gme); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()), + } + } + + var eventsResponse api.QueryMissingEventsResponse + if err := query.QueryMissingEvents( + httpReq.Context(), &api.QueryMissingEventsRequest{ + EarliestEvents: gme.EarliestEvents, + LatestEvents: gme.LatestEvents, + Limit: gme.Limit, + }, + &eventsResponse, + ); err != nil { + return httputil.LogThenError(httpReq, err) + } + + eventsResponse.Events = filterEvents(eventsResponse.Events, gme.MinDepth) + return util.JSONResponse{ + Code: http.StatusOK, + JSON: eventsResponse, + } +} + +func filterEvents(events []gomatrixserverlib.Event, minDepth int64) []gomatrixserverlib.Event { + ref := events[:0] + for _, ev := range events { + if ev.Depth() >= minDepth { + ref = append(ref, ev) + } + } + return ref +} diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go index f43866ea1..bbef06451 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go @@ -154,4 +154,12 @@ func Setup( return Version() }, )).Methods(http.MethodGet) + + v1fedmux.Handle("get_missing_events/{roomID}", common.MakeFedAPI( + "federation_get_missing_events", cfg.Matrix.ServerName, keys, + func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { + vars := mux.Vars(httpReq) + return GetMissingEvents(httpReq, request, query, vars["roomID"]) + }, + )).Methods(http.MethodGet) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/query.go b/src/github.com/matrix-org/dendrite/roomserver/api/query.go index aea7facef..a95495239 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/query.go @@ -154,6 +154,22 @@ type QueryServerAllowedToSeeEventResponse struct { AllowedToSeeEvent bool `json:"can_see_event"` } +// QueryMissingEventsRequest is request to QueryMissingEvents +type QueryMissingEventsRequest struct { + // Events which are known previous to the gap in timeline. + EarliestEvents []string `json:"earliest_events"` + // Latest known events. + LatestEvents []string `json:"latest_events"` + // Limit the number of events this query returns. + Limit int `json:"limit"` +} + +// QueryMissingEventsResponse is response to QueryMissingEvents +type QueryMissingEventsResponse struct { + // Missing events, arbritrary order. + Events []gomatrixserverlib.Event `json:"events"` +} + // QueryStateAndAuthChainRequest is a request to QueryStateAndAuthChain type QueryStateAndAuthChainRequest struct { // The room ID to query the state in. @@ -225,6 +241,13 @@ type RoomserverQueryAPI interface { response *QueryServerAllowedToSeeEventResponse, ) error + // Query missing events for a room from roomserver + QueryMissingEvents( + ctx context.Context, + request *QueryMissingEventsRequest, + response *QueryMissingEventsResponse, + ) error + // Query to get state and auth chain for a (potentially hypothetical) event. // Takes lists of PrevEventIDs and AuthEventsIDs and uses them to calculate // the state and auth chain to return. @@ -253,6 +276,9 @@ const RoomserverQueryInvitesForUserPath = "/api/roomserver/queryInvitesForUser" // RoomserverQueryServerAllowedToSeeEventPath is the HTTP path for the QueryServerAllowedToSeeEvent API const RoomserverQueryServerAllowedToSeeEventPath = "/api/roomserver/queryServerAllowedToSeeEvent" +// RoomserverQueryMissingEventsPath is the HTTP path for the QueryMissingEvents API +const RoomserverQueryMissingEventsPath = "/api/roomserver/queryMissingEvents" + // RoomserverQueryStateAndAuthChainPath is the HTTP path for the QueryStateAndAuthChain API const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain" @@ -348,6 +374,19 @@ func (h *httpRoomserverQueryAPI) QueryServerAllowedToSeeEvent( return postJSON(ctx, span, h.httpClient, apiURL, request, response) } +// QueryMissingEvents implements RoomServerQueryAPI +func (h *httpRoomserverQueryAPI) QueryMissingEvents( + ctx context.Context, + request *QueryMissingEventsRequest, + response *QueryMissingEventsResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryMissingEvents") + defer span.Finish() + + apiURL := h.roomserverURL + RoomserverQueryMissingEventsPath + return postJSON(ctx, span, h.httpClient, apiURL, request, response) +} + // QueryStateAndAuthChain implements RoomserverQueryAPI func (h *httpRoomserverQueryAPI) QueryStateAndAuthChain( ctx context.Context, diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index bc8a3d22e..7a03f543c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -428,6 +428,53 @@ func (r *RoomserverQueryAPI) QueryServerAllowedToSeeEvent( return nil } +// QueryMissingEvents implements api.RoomserverQueryAPI +func (r *RoomserverQueryAPI) QueryMissingEvents( + ctx context.Context, + request *api.QueryMissingEventsRequest, + response *api.QueryMissingEventsResponse, +) error { + resultIDs := make([]types.EventNID, 0, request.Limit) + var front []string + visited := make(map[string]bool, request.Limit) // request.Limit acts as a hint to size. + for _, id := range request.EarliestEvents { + visited[id] = true + } + + for _, id := range request.LatestEvents { + if !visited[id] { + front = append(front, id) + } + } + +BFSLoop: + for len(front) > 0 && len(resultIDs) <= request.Limit { + var next []string + events, err := r.DB.EventsFromIDs(ctx, front) + if err != nil { + return err + } + + for _, ev := range events { + if len(resultIDs) > request.Limit { + break BFSLoop + } + resultIDs = append(resultIDs, ev.EventNID) + for _, pre := range ev.PrevEventIDs() { + if !visited[pre] { + visited[pre] = true + next = append(next, pre) + } + } + } + front = next + } + + var err error + response.Events, err = r.loadEvents(ctx, resultIDs) + return err +} + // QueryStateAndAuthChain implements api.RoomserverQueryAPI func (r *RoomserverQueryAPI) QueryStateAndAuthChain( ctx context.Context, @@ -607,6 +654,20 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + servMux.Handle( + api.RoomserverQueryMissingEventsPath, + common.MakeInternalAPI("queryMissingEvents", func(req *http.Request) util.JSONResponse { + var request api.QueryMissingEventsRequest + var response api.QueryMissingEventsResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := r.QueryMissingEvents(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) servMux.Handle( api.RoomserverQueryStateAndAuthChainPath, common.MakeInternalAPI("queryStateAndAuthChain", func(req *http.Request) util.JSONResponse {