mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-17 03:43:11 -06:00
fetch missing state on backfill to remember snapshots correctly
This commit is contained in:
parent
9bdf8465e3
commit
57b997c531
|
|
@ -16,7 +16,9 @@ package routing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
@ -29,13 +31,20 @@ func GetEvent(
|
||||||
request *gomatrixserverlib.FederationRequest,
|
request *gomatrixserverlib.FederationRequest,
|
||||||
query api.RoomserverQueryAPI,
|
query api.RoomserverQueryAPI,
|
||||||
eventID string,
|
eventID string,
|
||||||
|
origin gomatrixserverlib.ServerName,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
event, err := getEvent(ctx, request, query, eventID)
|
event, err := getEvent(ctx, request, query, eventID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return *err
|
return *err
|
||||||
}
|
}
|
||||||
|
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: event}
|
return util.JSONResponse{Code: http.StatusOK, JSON: gomatrixserverlib.Transaction{
|
||||||
|
Origin: origin,
|
||||||
|
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
|
||||||
|
PDUs: []json.RawMessage{
|
||||||
|
event.JSON(),
|
||||||
|
},
|
||||||
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getEvent returns the requested event,
|
// getEvent returns the requested event,
|
||||||
|
|
|
||||||
|
|
@ -126,7 +126,7 @@ func Setup(
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return GetEvent(
|
return GetEvent(
|
||||||
httpReq.Context(), request, query, vars["eventID"],
|
httpReq.Context(), request, query, vars["eventID"], cfg.Matrix.ServerName,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
)).Methods(http.MethodGet)
|
)).Methods(http.MethodGet)
|
||||||
|
|
|
||||||
|
|
@ -572,9 +572,15 @@ func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api
|
||||||
}
|
}
|
||||||
var entries []types.StateEntry
|
var entries []types.StateEntry
|
||||||
if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil {
|
if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil {
|
||||||
|
// attempt to fetch the missing events
|
||||||
|
r.fetchAndStoreMissingEvents(ctx, roomVer, requester, stateIDs)
|
||||||
|
// try again
|
||||||
|
entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs)
|
||||||
|
if err != nil {
|
||||||
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event")
|
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var beforeStateSnapshotNID types.StateSnapshotNID
|
var beforeStateSnapshotNID types.StateSnapshotNID
|
||||||
if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
|
if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
|
||||||
|
|
@ -615,6 +621,66 @@ func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, server
|
||||||
return auth.IsAnyUserOnServerWithMembership(serverName, gmslEvents, gomatrixserverlib.Join), nil
|
return auth.IsAnyUserOnServerWithMembership(serverName, gmslEvents, gomatrixserverlib.Join), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fetchAndStoreMissingEvents does a best-effort fetch and store of missing events specified in stateIDs. Returns no error as it is just
|
||||||
|
// best effort.
|
||||||
|
func (r *RoomserverQueryAPI) fetchAndStoreMissingEvents(ctx context.Context, roomVer gomatrixserverlib.RoomVersion,
|
||||||
|
backfillRequester *backfillRequester, stateIDs []string) {
|
||||||
|
|
||||||
|
servers := backfillRequester.servers
|
||||||
|
|
||||||
|
// work out which are missing
|
||||||
|
nidMap, err := r.DB.EventNIDs(ctx, stateIDs)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(ctx).WithError(err).Warn("cannot query missing events")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
missingMap := make(map[string]*gomatrixserverlib.HeaderedEvent) // id -> event
|
||||||
|
for _, id := range stateIDs {
|
||||||
|
if _, ok := nidMap[id]; !ok {
|
||||||
|
missingMap[id] = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
util.GetLogger(ctx).Infof("Fetching %d missing state events (from %d possible servers)", len(missingMap), len(servers))
|
||||||
|
|
||||||
|
// fetch the events from federation. Loop the servers first so if we find one that works we stick with them
|
||||||
|
for _, srv := range servers {
|
||||||
|
for id, ev := range missingMap {
|
||||||
|
if ev != nil {
|
||||||
|
continue // already found
|
||||||
|
}
|
||||||
|
logger := util.GetLogger(ctx).WithField("server", srv).WithField("event_id", id)
|
||||||
|
res, err := r.FedClient.GetEvent(ctx, srv, id)
|
||||||
|
if err != nil {
|
||||||
|
logger.WithError(err).Warn("failed to get event from server")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
loader := gomatrixserverlib.NewEventsLoader(roomVer, r.KeyRing, backfillRequester, backfillRequester.ProvideEvents, false)
|
||||||
|
result, err := loader.LoadAndVerify(ctx, res.PDUs, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
||||||
|
if err != nil {
|
||||||
|
logger.WithError(err).Warn("failed to load and verify event")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result)
|
||||||
|
for _, res := range result {
|
||||||
|
if res.Error != nil {
|
||||||
|
logger.WithError(err).Warn("event failed PDU checks")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
missingMap[id] = res.Event
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var newEvents []gomatrixserverlib.HeaderedEvent
|
||||||
|
for _, ev := range missingMap {
|
||||||
|
if ev != nil {
|
||||||
|
newEvents = append(newEvents, *ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
util.GetLogger(ctx).Infof("Persisting %d new events", len(newEvents))
|
||||||
|
persistEvents(ctx, r.DB, newEvents)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Remove this when we have tests to assert correctness of this function
|
// TODO: Remove this when we have tests to assert correctness of this function
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func (r *RoomserverQueryAPI) scanEventTree(
|
func (r *RoomserverQueryAPI) scanEventTree(
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue