mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-24 07:13:09 -06:00
Adjust backfill to send backward extremity with state before other backfilled events, include prev_events with no state amongst missing events
This commit is contained in:
parent
0804594a61
commit
9ebea8ecce
|
|
@ -342,7 +342,11 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, is
|
||||||
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
||||||
|
|
||||||
// Work out if the roomserver knows everything it needs to know to auth
|
// Work out if the roomserver knows everything it needs to know to auth
|
||||||
// the event.
|
// the event. This includes the prev_events and auth_events.
|
||||||
|
// NOTE! This is going to include prev_events that have an empty state
|
||||||
|
// snapshot. This is because we will need to re-request the event, and
|
||||||
|
// it's /state_ids, in order for it to exist in the roomserver correctly
|
||||||
|
// before the roomserver tries to work out
|
||||||
stateReq := api.QueryMissingAuthPrevEventsRequest{
|
stateReq := api.QueryMissingAuthPrevEventsRequest{
|
||||||
RoomID: e.RoomID(),
|
RoomID: e.RoomID(),
|
||||||
AuthEventIDs: e.AuthEventIDs(),
|
AuthEventIDs: e.AuthEventIDs(),
|
||||||
|
|
@ -464,7 +468,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
|
||||||
// - fill in the gap completely then process event `e` returning no backwards extremity
|
// - fill in the gap completely then process event `e` returning no backwards extremity
|
||||||
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
|
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
|
||||||
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
|
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
|
||||||
backwardsExtremity, err := t.getMissingEvents(gmectx, e, roomVersion, isInboundTxn)
|
newEvents, backwardsExtremity, err := t.getMissingEvents(gmectx, e, roomVersion, isInboundTxn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -472,31 +476,75 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
|
||||||
// we filled in the gap!
|
// we filled in the gap!
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if len(newEvents) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
|
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
|
||||||
// security: we have to do state resolution on the new backwards extremity (TODO: WHY)
|
|
||||||
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
|
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
|
||||||
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
|
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
|
||||||
var states []*gomatrixserverlib.RespState
|
var states []*gomatrixserverlib.RespState
|
||||||
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples()
|
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples()
|
||||||
for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
|
for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
|
||||||
|
// Look up what the state is after the backward extremity. This will either
|
||||||
|
// come from the roomserver, if we know all the required events, or it will
|
||||||
|
// come from a remote server via /state_ids if not.
|
||||||
var prevState *gomatrixserverlib.RespState
|
var prevState *gomatrixserverlib.RespState
|
||||||
prevState, err = t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID, needed)
|
prevState, err = t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID, needed)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(ctx).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
|
util.GetLogger(ctx).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Append the state onto the collected state. We'll run this through the
|
||||||
|
// state resolution next.
|
||||||
states = append(states, prevState)
|
states = append(states, prevState)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Now that we have collected all of the state from the prev_events, we'll
|
||||||
|
// run the state through the appropriate state resolution algorithm for the
|
||||||
|
// room. This does a couple of things:
|
||||||
|
// 1. Ensures that the state is deduplicated fully for each state-key tuple
|
||||||
|
// 2. Ensures that we pick the latest events from both sets, in the case that
|
||||||
|
// one of the prev_events is quite a bit older than the others
|
||||||
resolvedState, err := t.resolveStatesAndCheck(gmectx, roomVersion, states, backwardsExtremity)
|
resolvedState, err := t.resolveStatesAndCheck(gmectx, roomVersion, states, backwardsExtremity)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
|
util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// pass the event along with the state to the roomserver using a background context so we don't
|
// First of all, send the backward extremity into the roomserver with the
|
||||||
// needlessly expire
|
// newly resolved state. This marks the "oldest" point in the backfill and
|
||||||
return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, e.Headered(roomVersion), t.haveEventIDs())
|
// sets the baseline state for any new events after this.
|
||||||
|
err = api.SendEventWithState(
|
||||||
|
context.Background(),
|
||||||
|
t.rsAPI,
|
||||||
|
resolvedState,
|
||||||
|
backwardsExtremity.Headered(roomVersion),
|
||||||
|
t.haveEventIDs(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("api.SendEventWithState: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then send all of the newer backfilled events, of which will all be newer
|
||||||
|
// than the backward extremity, into the roomserver without state. This way
|
||||||
|
// they will automatically fast-forward based on the room state at the
|
||||||
|
// extremity in the last step.
|
||||||
|
headeredNewEvents := make([]gomatrixserverlib.HeaderedEvent, len(newEvents))
|
||||||
|
for i, newEvent := range newEvents {
|
||||||
|
headeredNewEvents[i] = newEvent.Headered(roomVersion)
|
||||||
|
}
|
||||||
|
if err = api.SendEvents(
|
||||||
|
context.Background(),
|
||||||
|
t.rsAPI,
|
||||||
|
headeredNewEvents,
|
||||||
|
api.DoNotSendToOtherServers,
|
||||||
|
nil,
|
||||||
|
); err != nil {
|
||||||
|
return fmt.Errorf("api.SendEvents: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
|
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
|
||||||
|
|
@ -652,10 +700,10 @@ retryAllowedState:
|
||||||
// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
|
// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
|
||||||
// This means that we may recursively call this function, as we spider back up prev_events.
|
// This means that we may recursively call this function, as we spider back up prev_events.
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (backwardsExtremity *gomatrixserverlib.Event, err error) {
|
func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (newEvents []gomatrixserverlib.Event, backwardsExtremity *gomatrixserverlib.Event, err error) {
|
||||||
if !isInboundTxn {
|
if !isInboundTxn {
|
||||||
// we've recursed here, so just take a state snapshot please!
|
// we've recursed here, so just take a state snapshot please!
|
||||||
return &e, nil
|
return nil, &e, nil
|
||||||
}
|
}
|
||||||
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
||||||
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
|
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
|
||||||
|
|
@ -667,7 +715,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event
|
||||||
var res api.QueryLatestEventsAndStateResponse
|
var res api.QueryLatestEventsAndStateResponse
|
||||||
if err = t.rsAPI.QueryLatestEventsAndState(ctx, &req, &res); err != nil {
|
if err = t.rsAPI.QueryLatestEventsAndState(ctx, &req, &res); err != nil {
|
||||||
logger.WithError(err).Warn("Failed to query latest events")
|
logger.WithError(err).Warn("Failed to query latest events")
|
||||||
return &e, nil
|
return nil, &e, nil
|
||||||
}
|
}
|
||||||
latestEvents := make([]string, len(res.LatestEvents))
|
latestEvents := make([]string, len(res.LatestEvents))
|
||||||
for i := range res.LatestEvents {
|
for i := range res.LatestEvents {
|
||||||
|
|
@ -706,7 +754,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event
|
||||||
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
||||||
t.Origin, len(servers),
|
t.Origin, len(servers),
|
||||||
)
|
)
|
||||||
return nil, missingPrevEventsError{
|
return nil, nil, missingPrevEventsError{
|
||||||
eventID: e.EventID(),
|
eventID: e.EventID(),
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
|
|
@ -726,7 +774,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event
|
||||||
logger.Infof("get_missing_events returned %d events", len(missingResp.Events))
|
logger.Infof("get_missing_events returned %d events", len(missingResp.Events))
|
||||||
|
|
||||||
// topologically sort and sanity check that we are making forward progress
|
// topologically sort and sanity check that we are making forward progress
|
||||||
newEvents := gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
||||||
shouldHaveSomeEventIDs := e.PrevEventIDs()
|
shouldHaveSomeEventIDs := e.PrevEventIDs()
|
||||||
hasPrevEvent := false
|
hasPrevEvent := false
|
||||||
Event:
|
Event:
|
||||||
|
|
@ -744,21 +792,14 @@ Event:
|
||||||
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
||||||
t.Origin,
|
t.Origin,
|
||||||
)
|
)
|
||||||
return nil, missingPrevEventsError{
|
return nil, nil, missingPrevEventsError{
|
||||||
eventID: e.EventID(),
|
eventID: e.EventID(),
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// process the missing events then the event which started this whole thing
|
|
||||||
for _, ev := range append(newEvents, e) {
|
|
||||||
err := t.processEvent(ctx, ev, false)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// we processed everything!
|
// we processed everything!
|
||||||
return nil, nil
|
return newEvents, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
||||||
|
|
|
||||||
|
|
@ -122,7 +122,7 @@ func (r *Queryer) QueryMissingAuthPrevEvents(
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, prevEventID := range request.PrevEventIDs {
|
for _, prevEventID := range request.PrevEventIDs {
|
||||||
if nids, err := r.DB.EventNIDs(ctx, []string{prevEventID}); err != nil || len(nids) == 0 {
|
if state, err := r.DB.StateAtEventIDs(ctx, []string{prevEventID}); err != nil || len(state) == 0 {
|
||||||
response.MissingPrevEventIDs = append(response.MissingPrevEventIDs, prevEventID)
|
response.MissingPrevEventIDs = append(response.MissingPrevEventIDs, prevEventID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue