From 57b997c531ccc29b22c72e12a4ac0830a914d916 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 29 Apr 2020 16:29:34 +0100 Subject: [PATCH] fetch missing state on backfill to remember snapshots correctly --- federationapi/routing/events.go | 11 ++++- federationapi/routing/routing.go | 2 +- roomserver/query/query.go | 70 +++++++++++++++++++++++++++++++- 3 files changed, 79 insertions(+), 4 deletions(-) diff --git a/federationapi/routing/events.go b/federationapi/routing/events.go index a91528b3d..03492db45 100644 --- a/federationapi/routing/events.go +++ b/federationapi/routing/events.go @@ -16,7 +16,9 @@ package routing import ( "context" + "encoding/json" "net/http" + "time" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -29,13 +31,20 @@ func GetEvent( request *gomatrixserverlib.FederationRequest, query api.RoomserverQueryAPI, eventID string, + origin gomatrixserverlib.ServerName, ) util.JSONResponse { event, err := getEvent(ctx, request, query, eventID) if err != nil { return *err } - return util.JSONResponse{Code: http.StatusOK, JSON: event} + return util.JSONResponse{Code: http.StatusOK, JSON: gomatrixserverlib.Transaction{ + Origin: origin, + OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), + PDUs: []json.RawMessage{ + event.JSON(), + }, + }} } // getEvent returns the requested event, diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index 83bac5550..d61ff0bb0 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -126,7 +126,7 @@ func Setup( return util.ErrorResponse(err) } return GetEvent( - httpReq.Context(), request, query, vars["eventID"], + httpReq.Context(), request, query, vars["eventID"], cfg.Matrix.ServerName, ) }, )).Methods(http.MethodGet) diff --git a/roomserver/query/query.go b/roomserver/query/query.go index 2b80dde84..7ea81d7ea 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -572,8 +572,14 @@ func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api } var entries []types.StateEntry if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil { - logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event") - return err + // attempt to fetch the missing events + r.fetchAndStoreMissingEvents(ctx, roomVer, requester, stateIDs) + // try again + entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs) + if err != nil { + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event") + return err + } } var beforeStateSnapshotNID types.StateSnapshotNID @@ -615,6 +621,66 @@ func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, server return auth.IsAnyUserOnServerWithMembership(serverName, gmslEvents, gomatrixserverlib.Join), nil } +// fetchAndStoreMissingEvents does a best-effort fetch and store of missing events specified in stateIDs. Returns no error as it is just +// best effort. +func (r *RoomserverQueryAPI) fetchAndStoreMissingEvents(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, + backfillRequester *backfillRequester, stateIDs []string) { + + servers := backfillRequester.servers + + // work out which are missing + nidMap, err := r.DB.EventNIDs(ctx, stateIDs) + if err != nil { + util.GetLogger(ctx).WithError(err).Warn("cannot query missing events") + return + } + missingMap := make(map[string]*gomatrixserverlib.HeaderedEvent) // id -> event + for _, id := range stateIDs { + if _, ok := nidMap[id]; !ok { + missingMap[id] = nil + } + } + util.GetLogger(ctx).Infof("Fetching %d missing state events (from %d possible servers)", len(missingMap), len(servers)) + + // fetch the events from federation. Loop the servers first so if we find one that works we stick with them + for _, srv := range servers { + for id, ev := range missingMap { + if ev != nil { + continue // already found + } + logger := util.GetLogger(ctx).WithField("server", srv).WithField("event_id", id) + res, err := r.FedClient.GetEvent(ctx, srv, id) + if err != nil { + logger.WithError(err).Warn("failed to get event from server") + continue + } + loader := gomatrixserverlib.NewEventsLoader(roomVer, r.KeyRing, backfillRequester, backfillRequester.ProvideEvents, false) + result, err := loader.LoadAndVerify(ctx, res.PDUs, gomatrixserverlib.TopologicalOrderByPrevEvents) + if err != nil { + logger.WithError(err).Warn("failed to load and verify event") + continue + } + logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result) + for _, res := range result { + if res.Error != nil { + logger.WithError(err).Warn("event failed PDU checks") + continue + } + missingMap[id] = res.Event + } + } + } + + var newEvents []gomatrixserverlib.HeaderedEvent + for _, ev := range missingMap { + if ev != nil { + newEvents = append(newEvents, *ev) + } + } + util.GetLogger(ctx).Infof("Persisting %d new events", len(newEvents)) + persistEvents(ctx, r.DB, newEvents) +} + // TODO: Remove this when we have tests to assert correctness of this function // nolint:gocyclo func (r *RoomserverQueryAPI) scanEventTree(