Refactor missing state and make sure that we really solve the problem for the new event

This commit is contained in:
Neil Alexander 2022-02-09 09:48:36 +00:00
parent ffcc04b980
commit db6f8beded
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 204 additions and 119 deletions

View file

@ -255,9 +255,16 @@ func (r *Inputer) processRoomEvent(
hadEvents: map[string]bool{}, hadEvents: map[string]bool{},
haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{}, haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{},
} }
if err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil { if override, err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
isRejected = true isRejected = true
rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err) rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err)
} else if override != nil {
missingPrev = false
input.HasState = true
input.StateEventIDs = make([]string, 0, len(override.StateEvents))
for _, e := range override.StateEvents {
input.StateEventIDs = append(input.StateEventIDs, e.EventID())
}
} else { } else {
missingPrev = false missingPrev = false
} }

View file

@ -35,9 +35,10 @@ type missingStateReq struct {
// processEventWithMissingState is the entrypoint for a missingStateReq // processEventWithMissingState is the entrypoint for a missingStateReq
// request, as called from processRoomEvent. // request, as called from processRoomEvent.
// nolint:gocyclo
func (t *missingStateReq) processEventWithMissingState( func (t *missingStateReq) processEventWithMissingState(
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
) error { ) (*gomatrixserverlib.RespState, error) {
// We are missing the previous events for this events. // We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the // This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap: // room. There two ways that we can handle such a gap:
@ -63,15 +64,15 @@ func (t *missingStateReq) processEventWithMissingState(
// - fill in the gap completely then process event `e` returning no backwards extremity // - fill in the gap completely then process event `e` returning no backwards extremity
// - fail to fill in the gap and tell us to terminate the transaction err=not nil // - fail to fill in the gap and tell us to terminate the transaction err=not nil
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction // - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
newEvents, isGapFilled, err := t.getMissingEvents(ctx, e, roomVersion) newEvents, isGapFilled, prevStatesKnown, err := t.getMissingEvents(ctx, e, roomVersion)
if err != nil { if err != nil {
return fmt.Errorf("t.getMissingEvents: %w", err) return nil, fmt.Errorf("t.getMissingEvents: %w", err)
} }
if len(newEvents) == 0 { if len(newEvents) == 0 {
return fmt.Errorf("expected to find missing events but didn't") return nil, fmt.Errorf("expected to find missing events but didn't")
} }
if isGapFilled { if isGapFilled {
logger.Infof("gap filled by /get_missing_events, injecting %d new events", len(newEvents)) logger.Infof("Gap filled by /get_missing_events, injecting %d new events", len(newEvents))
// we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled // we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled
// in the gap in the DAG // in the gap in the DAG
for _, newEvent := range newEvents { for _, newEvent := range newEvents {
@ -83,86 +84,33 @@ func (t *missingStateReq) processEventWithMissingState(
}) })
if err != nil { if err != nil {
if _, ok := err.(types.RejectedError); !ok { if _, ok := err.(types.RejectedError); !ok {
return fmt.Errorf("t.inputer.processRoomEvent (filling gap): %w", err) return nil, fmt.Errorf("t.inputer.processRoomEvent (filling gap): %w", err)
} }
} }
} }
return nil
} }
// If we filled the gap *and* we know the state before the prev events
// then there's nothing else to do, we have everything we need to deal
// with the new event.
if isGapFilled && prevStatesKnown {
logger.Infof("Gap filled and state found for all prev events")
return nil, nil
}
// At this point we have possibly filled the gap using /g_m_e, but
// we still don't actually know if that reconciled the problem with
// the event's prev events. It's possible that some of the events we
// fetched from /g_m_e didn't have state or were rejected for some
// reason. Let's double-check.
backwardsExtremity := newEvents[0] backwardsExtremity := newEvents[0]
newEvents = newEvents[1:] newEvents = newEvents[1:]
type respState struct { // Retrieve the state at the backward extremity. This may allow us
// A snapshot is considered trustworthy if it came from our own roomserver. // to roll forward and then re-check if the prev states are known.
// That's because the state will have been through state resolution once resolvedState, err := t.lookupState(ctx, backwardsExtremity, roomVersion)
// already in QueryStateAfterEvent.
trustworthy bool
*gomatrixserverlib.RespState
}
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
var states []*respState
for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
// Look up what the state is after the backward extremity. This will either
// come from the roomserver, if we know all the required events, or it will
// come from a remote server via /state_ids if not.
prevState, trustworthy, lerr := t.lookupStateAfterEvent(ctx, roomVersion, backwardsExtremity.RoomID(), prevEventID)
if lerr != nil {
logger.WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
return lerr
}
// Append the state onto the collected state. We'll run this through the
// state resolution next.
states = append(states, &respState{trustworthy, prevState})
}
// Now that we have collected all of the state from the prev_events, we'll
// run the state through the appropriate state resolution algorithm for the
// room if needed. This does a couple of things:
// 1. Ensures that the state is deduplicated fully for each state-key tuple
// 2. Ensures that we pick the latest events from both sets, in the case that
// one of the prev_events is quite a bit older than the others
resolvedState := &gomatrixserverlib.RespState{}
switch len(states) {
case 0:
extremityIsCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("")
if !extremityIsCreate {
// There are no previous states and this isn't the beginning of the
// room - this is an error condition!
logger.Errorf("Failed to lookup any state after prev_events")
return fmt.Errorf("expected %d states but got %d", len(backwardsExtremity.PrevEventIDs()), len(states))
}
case 1:
// There's only one previous state - if it's trustworthy (came from a
// local state snapshot which will already have been through state res),
// use it as-is. There's no point in resolving it again. Only trust a
// trustworthy state snapshot if it actually contains some state for all
// non-create events, otherwise we need to resolve what came from federation.
isCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("")
if states[0].trustworthy && (isCreate || len(states[0].StateEvents) > 0) {
resolvedState = states[0].RespState
logger.Infof("Found single trustworthy state snapshot for backward extremity %s", backwardsExtremity.EventID())
break
}
// Otherwise, if it isn't trustworthy (came from federation), run it through
// state resolution anyway for safety, in case there are duplicates.
fallthrough
default:
respStates := make([]*gomatrixserverlib.RespState, len(states))
for i := range states {
respStates[i] = states[i].RespState
}
// There's more than one previous state - run them all through state res
t.roomsMu.Lock(e.RoomID())
resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, backwardsExtremity)
t.roomsMu.Unlock(e.RoomID())
if err != nil { if err != nil {
logger.WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID()) return nil, fmt.Errorf("t.lookupState (backwards extremity): %w", err)
return err
}
} }
hadEvents := map[string]bool{} hadEvents := map[string]bool{}
@ -172,10 +120,11 @@ func (t *missingStateReq) processEventWithMissingState(
} }
t.hadEventsMutex.Unlock() t.hadEventsMutex.Unlock()
// Send outliers first so we can send the new backwards extremity without causing errors sendOutliers := func(resolvedState *gomatrixserverlib.RespState) error {
outliers, err := resolvedState.Events() var outliers []*gomatrixserverlib.Event
outliers, err = resolvedState.Events()
if err != nil { if err != nil {
return err return fmt.Errorf("resolvedState.Events: %w", err)
} }
var outlierRoomEvents []api.InputRoomEvent var outlierRoomEvents []api.InputRoomEvent
for _, outlier := range outliers { for _, outlier := range outliers {
@ -188,7 +137,6 @@ func (t *missingStateReq) processEventWithMissingState(
Origin: t.origin, Origin: t.origin,
}) })
} }
// TODO: we could do this concurrently?
for _, ire := range outlierRoomEvents { for _, ire := range outlierRoomEvents {
_, err = t.inputer.processRoomEvent(ctx, t.db, &ire) _, err = t.inputer.processRoomEvent(ctx, t.db, &ire)
if err != nil { if err != nil {
@ -197,6 +145,13 @@ func (t *missingStateReq) processEventWithMissingState(
} }
} }
} }
return nil
}
// Send outliers first so we can send the new backwards extremity without causing errors
if err = sendOutliers(resolvedState); err != nil {
return nil, fmt.Errorf("sendOutliers: %w", err)
}
// Now send the backward extremity into the roomserver with the // Now send the backward extremity into the roomserver with the
// newly resolved state. This marks the "oldest" point in the backfill and // newly resolved state. This marks the "oldest" point in the backfill and
@ -216,7 +171,7 @@ func (t *missingStateReq) processEventWithMissingState(
}) })
if err != nil { if err != nil {
if _, ok := err.(types.RejectedError); !ok { if _, ok := err.(types.RejectedError); !ok {
return fmt.Errorf("t.inputer.processRoomEvent (backward extremity): %w", err) return nil, fmt.Errorf("t.inputer.processRoomEvent (backward extremity): %w", err)
} }
} }
@ -233,12 +188,107 @@ func (t *missingStateReq) processEventWithMissingState(
}) })
if err != nil { if err != nil {
if _, ok := err.(types.RejectedError); !ok { if _, ok := err.(types.RejectedError); !ok {
return fmt.Errorf("t.inputer.processRoomEvent (fast forward): %w", err) return nil, fmt.Errorf("t.inputer.processRoomEvent (fast forward): %w", err)
} }
} }
} }
return nil // Finally, check again if we know everything we need to know in order to
// make forward progress. This will be true if rolling forward the fetched
// events led to correctly fast-forwarded state. Otherwise we need to /state_ids
// again, but this time for the new event.
if t.isPrevStateKnown(ctx, e) {
return nil, nil
}
// If we still haven't got the state for the prev events then we'll go and
// ask the federation for it if needed.
resolvedState, err = t.lookupState(ctx, e, roomVersion)
if err != nil {
return nil, fmt.Errorf("t.lookupState (new event): %w", err)
}
// Send the outliers.
if err = sendOutliers(resolvedState); err != nil {
return nil, fmt.Errorf("sendOutliers: %w", err)
}
// Then return the resolved state, for which the caller can replace the
// HasState with the event IDs to create a new state snapshot.
return resolvedState, nil
}
func (t *missingStateReq) lookupState(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (*gomatrixserverlib.RespState, error) {
type respState struct {
// A snapshot is considered trustworthy if it came from our own roomserver.
// That's because the state will have been through state resolution once
// already in QueryStateAfterEvent.
trustworthy bool
*gomatrixserverlib.RespState
}
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
var states []*respState
for _, prevEventID := range e.PrevEventIDs() {
// Look up what the state is after the backward extremity. This will either
// come from the roomserver, if we know all the required events, or it will
// come from a remote server via /state_ids if not.
prevState, trustworthy, err := t.lookupStateAfterEvent(ctx, roomVersion, e.RoomID(), prevEventID)
if err != nil {
return nil, fmt.Errorf("t.lookupStateAfterEvent: %w", err)
}
// Append the state onto the collected state. We'll run this through the
// state resolution next.
states = append(states, &respState{trustworthy, prevState})
}
// Now that we have collected all of the state from the prev_events, we'll
// run the state through the appropriate state resolution algorithm for the
// room if needed. This does a couple of things:
// 1. Ensures that the state is deduplicated fully for each state-key tuple
// 2. Ensures that we pick the latest events from both sets, in the case that
// one of the prev_events is quite a bit older than the others
resolvedState := &gomatrixserverlib.RespState{}
switch len(states) {
case 0:
extremityIsCreate := e.Type() == gomatrixserverlib.MRoomCreate && e.StateKeyEquals("")
if !extremityIsCreate {
// There are no previous states and this isn't the beginning of the
// room - this is an error condition!
return nil, fmt.Errorf("expected %d states but got %d", len(e.PrevEventIDs()), len(states))
}
case 1:
// There's only one previous state - if it's trustworthy (came from a
// local state snapshot which will already have been through state res),
// use it as-is. There's no point in resolving it again. Only trust a
// trustworthy state snapshot if it actually contains some state for all
// non-create events, otherwise we need to resolve what came from federation.
isCreate := e.Type() == gomatrixserverlib.MRoomCreate && e.StateKeyEquals("")
if states[0].trustworthy && (isCreate || len(states[0].StateEvents) > 0) {
resolvedState = states[0].RespState
break
}
// Otherwise, if it isn't trustworthy (came from federation), run it through
// state resolution anyway for safety, in case there are duplicates.
fallthrough
default:
respStates := make([]*gomatrixserverlib.RespState, len(states))
for i := range states {
respStates[i] = states[i].RespState
}
// There's more than one previous state - run them all through state res
var err error
t.roomsMu.Lock(e.RoomID())
resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, e)
t.roomsMu.Unlock(e.RoomID())
if err != nil {
return nil, fmt.Errorf("t.resolveStatesAndCheck: %w", err)
}
}
return resolvedState, nil
} }
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
@ -407,7 +457,7 @@ retryAllowedState:
// get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject, // get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject,
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events // without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled bool, err error) { func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) {
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
latest := t.db.LatestEvents() latest := t.db.LatestEvents()
@ -434,7 +484,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
if errors.Is(err, context.DeadlineExceeded) { if errors.Is(err, context.DeadlineExceeded) {
select { select {
case <-ctx.Done(): // the parent request context timed out case <-ctx.Done(): // the parent request context timed out
return nil, false, context.DeadlineExceeded return nil, false, false, context.DeadlineExceeded
default: // this request exceed its own timeout default: // this request exceed its own timeout
continue continue
} }
@ -447,7 +497,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", "%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
t.origin, len(t.servers), t.origin, len(t.servers),
) )
return nil, false, missingPrevEventsError{ return nil, false, false, missingPrevEventsError{
eventID: e.EventID(), eventID: e.EventID(),
err: err, err: err,
} }
@ -479,29 +529,51 @@ Event:
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
t.origin, t.origin,
) )
return nil, false, missingPrevEventsError{ return nil, false, false, missingPrevEventsError{
eventID: e.EventID(), eventID: e.EventID(),
err: err, err: err,
} }
} }
if len(newEvents) == 0 { if len(newEvents) == 0 {
return nil, false, nil // TODO: error instead? return nil, false, false, nil // TODO: error instead?
} }
// now check if we can fill the gap. Look to see if we have state snapshot IDs for the earliest event
earliestNewEvent := newEvents[0] earliestNewEvent := newEvents[0]
if state, err := t.db.StateAtEventIDs(ctx, []string{earliestNewEvent.EventID()}); err != nil || len(state) == 0 || state[0].BeforeStateSnapshotNID == 0 {
if earliestNewEvent.Type() == gomatrixserverlib.MRoomCreate && earliestNewEvent.StateKeyEquals("") { // If we retrieved back to the beginning of the room then there's nothing else
// we got to the beginning of the room so there will be no state! It's all good we can process this // to do - we closed the gap.
return newEvents, true, nil if len(earliestNewEvent.PrevEventIDs()) == 0 && earliestNewEvent.Type() == gomatrixserverlib.MRoomCreate && earliestNewEvent.StateKeyEquals("") {
return newEvents, true, t.isPrevStateKnown(ctx, e), nil
} }
// we don't have the state at this earliest event from /g_m_e so we won't have state for later events either
logger.Warnf("State unknown for backward extremity %s", earliestNewEvent.EventID()) // If our backward extremity was not a known event to us then we obviously didn't
return newEvents, false, nil // close the gap.
if state, err := t.db.StateAtEventIDs(ctx, []string{earliestNewEvent.EventID()}); err != nil || len(state) == 0 && state[0].BeforeStateSnapshotNID == 0 {
return newEvents, false, false, nil
} }
// StateAtEventIDs returned some kind of state for the earliest event so we can fill in the gap!
logger.Infof("State known for backward extremity %s", earliestNewEvent.EventID()) // At this point we are satisfied that we know the state both at the earliest
return newEvents, true, nil // retrieved event and at the prev events of the new event.
return newEvents, true, t.isPrevStateKnown(ctx, e), nil
}
func (t *missingStateReq) isPrevStateKnown(ctx context.Context, e *gomatrixserverlib.Event) bool {
expected := len(e.PrevEventIDs())
state, err := t.db.StateAtEventIDs(ctx, e.PrevEventIDs())
if err != nil || len(state) != expected {
// We didn't get as many state snapshots as we expected, or there was an error,
// so we haven't completely solved the problem for the new event.
return false
}
// Check to see if we have a populated state snapshot for all of the prev events.
for _, stateAtEvent := range state {
if stateAtEvent.BeforeStateSnapshotNID == 0 {
// One of the prev events still has unknown state, so we haven't really
// solved the problem.
return false
}
}
return true
} }
func (t *missingStateReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( func (t *missingStateReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (

View file

@ -187,6 +187,12 @@ func (u *RoomUpdater) EventIDs(
return u.d.EventsTable.BulkSelectEventID(ctx, u.txn, eventNIDs) return u.d.EventsTable.BulkSelectEventID(ctx, u.txn, eventNIDs)
} }
func (u *RoomUpdater) EventNIDs(
ctx context.Context, eventIDs []string,
) (map[string]types.EventNID, error) {
return u.d.eventNIDs(ctx, u.txn, eventIDs)
}
func (u *RoomUpdater) StateAtEventIDs( func (u *RoomUpdater) StateAtEventIDs(
ctx context.Context, eventIDs []string, ctx context.Context, eventIDs []string,
) ([]types.StateAtEvent, error) { ) ([]types.StateAtEvent, error) {