Implement the /backfill federation endpoint

This commit is contained in:
Brendan Abolivier 2018-11-06 19:23:19 +00:00
parent 8ff136e595
commit fed468dcef
No known key found for this signature in database
GPG key ID: 8EF1500759F70623
4 changed files with 227 additions and 0 deletions

View file

@ -0,0 +1,92 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"net/http"
"strconv"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// Backfill implements the /backfill federation endpoint.
// https://matrix.org/docs/spec/server_server/unstable.html#get-matrix-federation-v1-backfill-roomid
func Backfill(
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
query api.RoomserverQueryAPI,
roomID string,
) util.JSONResponse {
var res api.QueryPreviousEventsResponse
var eIDs []string
var limit string
var exists bool
var err error
// Check the room ID's format.
if _, _, err = gomatrixserverlib.SplitID('!', roomID); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("Bad room ID: " + err.Error()),
}
}
// Check if all of the required parameters are there.
eIDs, exists = httpReq.URL.Query()["v"]
if !exists {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("v is missing"),
}
}
limit = httpReq.URL.Query().Get("limit")
if len(limit) == 0 {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("limit is missing"),
}
}
// Populate the request.
req := api.QueryPreviousEventsRequest{
EarliestEventsIDs: eIDs,
ServerName: request.Origin(),
}
if req.Limit, err = strconv.Atoi(limit); err != nil {
return httputil.LogThenError(httpReq, err)
}
// Query the roomserver.
if err = query.QueryPreviousEvents(httpReq.Context(), &req, &res); err != nil {
return httputil.LogThenError(httpReq, err)
}
// Filter any event that's not from the requested room out.
evs := make([]gomatrixserverlib.Event, 0)
for _, ev := range res.Events {
if ev.RoomID() == roomID {
evs = append(evs, ev)
}
}
res.Events = evs
// Send the events to the client.
return util.JSONResponse{
Code: http.StatusOK,
JSON: res,
}
}

View file

@ -219,4 +219,12 @@ func Setup(
return GetMissingEvents(httpReq, request, query, vars["roomID"])
},
)).Methods(http.MethodGet)
v1fedmux.Handle("/backfill/{roomID}", common.MakeFedAPI(
"federation_backfill", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars := mux.Vars(httpReq)
return Backfill(httpReq, request, query, vars["roomID"])
},
)).Methods(http.MethodGet)
}

View file

@ -214,6 +214,22 @@ type QueryStateAndAuthChainResponse struct {
AuthChainEvents []gomatrixserverlib.Event `json:"auth_chain_events"`
}
// QueryPreviousEventsRequest is a request to QueryPreviousEvents.
type QueryPreviousEventsRequest struct {
// Events to start paginating from.
EarliestEventsIDs []string `json:"earliest_event_ids"`
// The maximum number of events to retrieve.
Limit int `json:"limit"`
// The server interested in the events.
ServerName gomatrixserverlib.ServerName `json:"server_name"`
}
// QueryPreviousEventsResponse is a response to QueryPreviousEvents.
type QueryPreviousEventsResponse struct {
// Missing events, arbritrary order.
Events []gomatrixserverlib.Event `json:"events"`
}
// RoomserverQueryAPI is used to query information from the room server.
type RoomserverQueryAPI interface {
// Query the latest events and state for a room from the room server.
@ -280,6 +296,13 @@ type RoomserverQueryAPI interface {
request *QueryStateAndAuthChainRequest,
response *QueryStateAndAuthChainResponse,
) error
// Query a given amount (or less) of events prior to a given set of events.
QueryPreviousEvents(
ctx context.Context,
request *QueryPreviousEventsRequest,
response *QueryPreviousEventsResponse,
) error
}
// RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API.
@ -309,6 +332,9 @@ const RoomserverQueryMissingEventsPath = "/api/roomserver/queryMissingEvents"
// RoomserverQueryStateAndAuthChainPath is the HTTP path for the QueryStateAndAuthChain API
const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain"
// RoomserverQueryPreviousEventsPath is the HTTP path for the QueryMissingEvents API
const RoomserverQueryPreviousEventsPath = "/api/roomserver/queryPreviousEvents"
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
// If httpClient is nil then it uses the http.DefaultClient
func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI {
@ -439,3 +465,16 @@ func (h *httpRoomserverQueryAPI) QueryStateAndAuthChain(
apiURL := h.roomserverURL + RoomserverQueryStateAndAuthChainPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryPreviousEvents implements RoomServerQueryAPI
func (h *httpRoomserverQueryAPI) QueryPreviousEvents(
ctx context.Context,
request *QueryPreviousEventsRequest,
response *QueryPreviousEventsResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryPreviousEvents")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryMissingEventsPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -487,6 +487,80 @@ BFSLoop:
return err
}
// QueryPreviousEvents implements api.RoomServerQueryAPI
func (r *RoomserverQueryAPI) QueryPreviousEvents(
ctx context.Context,
request *api.QueryPreviousEventsRequest,
response *api.QueryPreviousEventsResponse,
) error {
var allowed bool
var events []types.Event
var err error
var front []string
// The limit defines the maximum number of events to retrieve, so it also
// defines the highest number of elements in the slice and the map below.
resultNIDs := make([]types.EventNID, 0, request.Limit)
visited := make(map[string]bool, request.Limit)
// The provided event IDs have already been seen by the request's emitter,
// and will be retrieved anyway, so there's no need to care about them if
// they appear in our exploration of the event tree.
for _, id := range request.EarliestEventsIDs {
visited[id] = true
}
front = request.EarliestEventsIDs
// Loop through the event IDs to retrieve the related events and go through
// the whole tree (up to the provided limit) using the events' "prev_event"
// key.
BFSLoop:
for len(front) > 0 {
var next []string
// Retrieve the events to process from the database.
events, err = r.DB.EventsFromIDs(ctx, front)
if err != nil {
return err
}
for _, ev := range events {
// Break out of the loop if the provided limit is reached.
if len(resultNIDs) == request.Limit {
break BFSLoop
}
// Update the list of events to retrieve.
resultNIDs = append(resultNIDs, ev.EventNID)
// Loop through the event's parents.
for _, pre := range ev.PrevEventIDs() {
// Only add an event to the list of next events to process if it
// hasn't been seen before.
if !visited[pre] {
visited[pre] = true
allowed, err = r.checkServerAllowedToSeeEvent(
ctx, pre, request.ServerName,
)
if err != nil {
return err
}
// If the event hasn't been seen before and the HS
// requesting to retrieve it is allowed to do so, add it to
// the list of events to retrieve.
if allowed {
next = append(next, pre)
}
}
}
}
front = next
}
// Retrieve events from the list that was filled previously.
response.Events, err = r.loadEvents(ctx, resultNIDs)
return err
}
// QueryStateAndAuthChain implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryStateAndAuthChain(
ctx context.Context,
@ -708,4 +782,18 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(
api.RoomserverQueryPreviousEventsPath,
common.MakeInternalAPI("queryPreviousEvents", func(req *http.Request) util.JSONResponse {
var request api.QueryPreviousEventsRequest
var response api.QueryPreviousEventsResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := r.QueryPreviousEvents(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}