From db6f8beded771c1b823d9285549bfb16c722454b Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 9 Feb 2022 09:48:36 +0000 Subject: [PATCH] Refactor missing state and make sure that we really solve the problem for the new event --- roomserver/internal/input/input_events.go | 9 +- roomserver/internal/input/input_missing.go | 308 +++++++++++++-------- roomserver/storage/shared/room_updater.go | 6 + 3 files changed, 204 insertions(+), 119 deletions(-) diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index b44602270..b4afaf601 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -255,9 +255,16 @@ func (r *Inputer) processRoomEvent( hadEvents: map[string]bool{}, haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{}, } - if err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil { + if override, err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil { isRejected = true rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err) + } else if override != nil { + missingPrev = false + input.HasState = true + input.StateEventIDs = make([]string, 0, len(override.StateEvents)) + for _, e := range override.StateEvents { + input.StateEventIDs = append(input.StateEventIDs, e.EventID()) + } } else { missingPrev = false } diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index c53a2b2d5..e3ef72051 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -35,9 +35,10 @@ type missingStateReq struct { // processEventWithMissingState is the entrypoint for a missingStateReq // request, as called from processRoomEvent. +// nolint:gocyclo func (t *missingStateReq) processEventWithMissingState( ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, -) error { +) (*gomatrixserverlib.RespState, error) { // We are missing the previous events for this events. // This means that there is a gap in our view of the history of the // room. There two ways that we can handle such a gap: @@ -63,15 +64,15 @@ func (t *missingStateReq) processEventWithMissingState( // - fill in the gap completely then process event `e` returning no backwards extremity // - fail to fill in the gap and tell us to terminate the transaction err=not nil // - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction - newEvents, isGapFilled, err := t.getMissingEvents(ctx, e, roomVersion) + newEvents, isGapFilled, prevStatesKnown, err := t.getMissingEvents(ctx, e, roomVersion) if err != nil { - return fmt.Errorf("t.getMissingEvents: %w", err) + return nil, fmt.Errorf("t.getMissingEvents: %w", err) } if len(newEvents) == 0 { - return fmt.Errorf("expected to find missing events but didn't") + return nil, fmt.Errorf("expected to find missing events but didn't") } if isGapFilled { - logger.Infof("gap filled by /get_missing_events, injecting %d new events", len(newEvents)) + logger.Infof("Gap filled by /get_missing_events, injecting %d new events", len(newEvents)) // we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled // in the gap in the DAG for _, newEvent := range newEvents { @@ -83,86 +84,33 @@ func (t *missingStateReq) processEventWithMissingState( }) if err != nil { if _, ok := err.(types.RejectedError); !ok { - return fmt.Errorf("t.inputer.processRoomEvent (filling gap): %w", err) + return nil, fmt.Errorf("t.inputer.processRoomEvent (filling gap): %w", err) } } } - return nil } + // If we filled the gap *and* we know the state before the prev events + // then there's nothing else to do, we have everything we need to deal + // with the new event. + if isGapFilled && prevStatesKnown { + logger.Infof("Gap filled and state found for all prev events") + return nil, nil + } + + // At this point we have possibly filled the gap using /g_m_e, but + // we still don't actually know if that reconciled the problem with + // the event's prev events. It's possible that some of the events we + // fetched from /g_m_e didn't have state or were rejected for some + // reason. Let's double-check. backwardsExtremity := newEvents[0] newEvents = newEvents[1:] - type respState struct { - // A snapshot is considered trustworthy if it came from our own roomserver. - // That's because the state will have been through state resolution once - // already in QueryStateAfterEvent. - trustworthy bool - *gomatrixserverlib.RespState - } - - // at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity. - // Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query - // the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event. - var states []*respState - for _, prevEventID := range backwardsExtremity.PrevEventIDs() { - // Look up what the state is after the backward extremity. This will either - // come from the roomserver, if we know all the required events, or it will - // come from a remote server via /state_ids if not. - prevState, trustworthy, lerr := t.lookupStateAfterEvent(ctx, roomVersion, backwardsExtremity.RoomID(), prevEventID) - if lerr != nil { - logger.WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID) - return lerr - } - // Append the state onto the collected state. We'll run this through the - // state resolution next. - states = append(states, &respState{trustworthy, prevState}) - } - - // Now that we have collected all of the state from the prev_events, we'll - // run the state through the appropriate state resolution algorithm for the - // room if needed. This does a couple of things: - // 1. Ensures that the state is deduplicated fully for each state-key tuple - // 2. Ensures that we pick the latest events from both sets, in the case that - // one of the prev_events is quite a bit older than the others - resolvedState := &gomatrixserverlib.RespState{} - switch len(states) { - case 0: - extremityIsCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("") - if !extremityIsCreate { - // There are no previous states and this isn't the beginning of the - // room - this is an error condition! - logger.Errorf("Failed to lookup any state after prev_events") - return fmt.Errorf("expected %d states but got %d", len(backwardsExtremity.PrevEventIDs()), len(states)) - } - case 1: - // There's only one previous state - if it's trustworthy (came from a - // local state snapshot which will already have been through state res), - // use it as-is. There's no point in resolving it again. Only trust a - // trustworthy state snapshot if it actually contains some state for all - // non-create events, otherwise we need to resolve what came from federation. - isCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("") - if states[0].trustworthy && (isCreate || len(states[0].StateEvents) > 0) { - resolvedState = states[0].RespState - logger.Infof("Found single trustworthy state snapshot for backward extremity %s", backwardsExtremity.EventID()) - break - } - // Otherwise, if it isn't trustworthy (came from federation), run it through - // state resolution anyway for safety, in case there are duplicates. - fallthrough - default: - respStates := make([]*gomatrixserverlib.RespState, len(states)) - for i := range states { - respStates[i] = states[i].RespState - } - // There's more than one previous state - run them all through state res - t.roomsMu.Lock(e.RoomID()) - resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, backwardsExtremity) - t.roomsMu.Unlock(e.RoomID()) - if err != nil { - logger.WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID()) - return err - } + // Retrieve the state at the backward extremity. This may allow us + // to roll forward and then re-check if the prev states are known. + resolvedState, err := t.lookupState(ctx, backwardsExtremity, roomVersion) + if err != nil { + return nil, fmt.Errorf("t.lookupState (backwards extremity): %w", err) } hadEvents := map[string]bool{} @@ -172,30 +120,37 @@ func (t *missingStateReq) processEventWithMissingState( } t.hadEventsMutex.Unlock() - // Send outliers first so we can send the new backwards extremity without causing errors - outliers, err := resolvedState.Events() - if err != nil { - return err - } - var outlierRoomEvents []api.InputRoomEvent - for _, outlier := range outliers { - if hadEvents[outlier.EventID()] { - continue - } - outlierRoomEvents = append(outlierRoomEvents, api.InputRoomEvent{ - Kind: api.KindOutlier, - Event: outlier.Headered(roomVersion), - Origin: t.origin, - }) - } - // TODO: we could do this concurrently? - for _, ire := range outlierRoomEvents { - _, err = t.inputer.processRoomEvent(ctx, t.db, &ire) + sendOutliers := func(resolvedState *gomatrixserverlib.RespState) error { + var outliers []*gomatrixserverlib.Event + outliers, err = resolvedState.Events() if err != nil { - if _, ok := err.(types.RejectedError); !ok { - return fmt.Errorf("t.inputer.processRoomEvent (outlier): %w", err) + return fmt.Errorf("resolvedState.Events: %w", err) + } + var outlierRoomEvents []api.InputRoomEvent + for _, outlier := range outliers { + if hadEvents[outlier.EventID()] { + continue + } + outlierRoomEvents = append(outlierRoomEvents, api.InputRoomEvent{ + Kind: api.KindOutlier, + Event: outlier.Headered(roomVersion), + Origin: t.origin, + }) + } + for _, ire := range outlierRoomEvents { + _, err = t.inputer.processRoomEvent(ctx, t.db, &ire) + if err != nil { + if _, ok := err.(types.RejectedError); !ok { + return fmt.Errorf("t.inputer.processRoomEvent (outlier): %w", err) + } } } + return nil + } + + // Send outliers first so we can send the new backwards extremity without causing errors + if err = sendOutliers(resolvedState); err != nil { + return nil, fmt.Errorf("sendOutliers: %w", err) } // Now send the backward extremity into the roomserver with the @@ -216,7 +171,7 @@ func (t *missingStateReq) processEventWithMissingState( }) if err != nil { if _, ok := err.(types.RejectedError); !ok { - return fmt.Errorf("t.inputer.processRoomEvent (backward extremity): %w", err) + return nil, fmt.Errorf("t.inputer.processRoomEvent (backward extremity): %w", err) } } @@ -233,12 +188,107 @@ func (t *missingStateReq) processEventWithMissingState( }) if err != nil { if _, ok := err.(types.RejectedError); !ok { - return fmt.Errorf("t.inputer.processRoomEvent (fast forward): %w", err) + return nil, fmt.Errorf("t.inputer.processRoomEvent (fast forward): %w", err) } } } - return nil + // Finally, check again if we know everything we need to know in order to + // make forward progress. This will be true if rolling forward the fetched + // events led to correctly fast-forwarded state. Otherwise we need to /state_ids + // again, but this time for the new event. + if t.isPrevStateKnown(ctx, e) { + return nil, nil + } + + // If we still haven't got the state for the prev events then we'll go and + // ask the federation for it if needed. + resolvedState, err = t.lookupState(ctx, e, roomVersion) + if err != nil { + return nil, fmt.Errorf("t.lookupState (new event): %w", err) + } + + // Send the outliers. + if err = sendOutliers(resolvedState); err != nil { + return nil, fmt.Errorf("sendOutliers: %w", err) + } + + // Then return the resolved state, for which the caller can replace the + // HasState with the event IDs to create a new state snapshot. + return resolvedState, nil +} + +func (t *missingStateReq) lookupState(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (*gomatrixserverlib.RespState, error) { + type respState struct { + // A snapshot is considered trustworthy if it came from our own roomserver. + // That's because the state will have been through state resolution once + // already in QueryStateAfterEvent. + trustworthy bool + *gomatrixserverlib.RespState + } + + // at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity. + // Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query + // the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event. + var states []*respState + for _, prevEventID := range e.PrevEventIDs() { + // Look up what the state is after the backward extremity. This will either + // come from the roomserver, if we know all the required events, or it will + // come from a remote server via /state_ids if not. + prevState, trustworthy, err := t.lookupStateAfterEvent(ctx, roomVersion, e.RoomID(), prevEventID) + if err != nil { + return nil, fmt.Errorf("t.lookupStateAfterEvent: %w", err) + } + // Append the state onto the collected state. We'll run this through the + // state resolution next. + states = append(states, &respState{trustworthy, prevState}) + } + + // Now that we have collected all of the state from the prev_events, we'll + // run the state through the appropriate state resolution algorithm for the + // room if needed. This does a couple of things: + // 1. Ensures that the state is deduplicated fully for each state-key tuple + // 2. Ensures that we pick the latest events from both sets, in the case that + // one of the prev_events is quite a bit older than the others + resolvedState := &gomatrixserverlib.RespState{} + switch len(states) { + case 0: + extremityIsCreate := e.Type() == gomatrixserverlib.MRoomCreate && e.StateKeyEquals("") + if !extremityIsCreate { + // There are no previous states and this isn't the beginning of the + // room - this is an error condition! + return nil, fmt.Errorf("expected %d states but got %d", len(e.PrevEventIDs()), len(states)) + } + case 1: + // There's only one previous state - if it's trustworthy (came from a + // local state snapshot which will already have been through state res), + // use it as-is. There's no point in resolving it again. Only trust a + // trustworthy state snapshot if it actually contains some state for all + // non-create events, otherwise we need to resolve what came from federation. + isCreate := e.Type() == gomatrixserverlib.MRoomCreate && e.StateKeyEquals("") + if states[0].trustworthy && (isCreate || len(states[0].StateEvents) > 0) { + resolvedState = states[0].RespState + break + } + // Otherwise, if it isn't trustworthy (came from federation), run it through + // state resolution anyway for safety, in case there are duplicates. + fallthrough + default: + respStates := make([]*gomatrixserverlib.RespState, len(states)) + for i := range states { + respStates[i] = states[i].RespState + } + // There's more than one previous state - run them all through state res + var err error + t.roomsMu.Lock(e.RoomID()) + resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, e) + t.roomsMu.Unlock(e.RoomID()) + if err != nil { + return nil, fmt.Errorf("t.resolveStatesAndCheck: %w", err) + } + } + + return resolvedState, nil } // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) @@ -407,7 +457,7 @@ retryAllowedState: // get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject, // without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events -func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled bool, err error) { +func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) { logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) latest := t.db.LatestEvents() @@ -434,7 +484,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve if errors.Is(err, context.DeadlineExceeded) { select { case <-ctx.Done(): // the parent request context timed out - return nil, false, context.DeadlineExceeded + return nil, false, false, context.DeadlineExceeded default: // this request exceed its own timeout continue } @@ -447,7 +497,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve "%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", t.origin, len(t.servers), ) - return nil, false, missingPrevEventsError{ + return nil, false, false, missingPrevEventsError{ eventID: e.EventID(), err: err, } @@ -479,29 +529,51 @@ Event: "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", t.origin, ) - return nil, false, missingPrevEventsError{ + return nil, false, false, missingPrevEventsError{ eventID: e.EventID(), err: err, } } if len(newEvents) == 0 { - return nil, false, nil // TODO: error instead? + return nil, false, false, nil // TODO: error instead? } - // now check if we can fill the gap. Look to see if we have state snapshot IDs for the earliest event earliestNewEvent := newEvents[0] - if state, err := t.db.StateAtEventIDs(ctx, []string{earliestNewEvent.EventID()}); err != nil || len(state) == 0 || state[0].BeforeStateSnapshotNID == 0 { - if earliestNewEvent.Type() == gomatrixserverlib.MRoomCreate && earliestNewEvent.StateKeyEquals("") { - // we got to the beginning of the room so there will be no state! It's all good we can process this - return newEvents, true, nil - } - // we don't have the state at this earliest event from /g_m_e so we won't have state for later events either - logger.Warnf("State unknown for backward extremity %s", earliestNewEvent.EventID()) - return newEvents, false, nil + + // If we retrieved back to the beginning of the room then there's nothing else + // to do - we closed the gap. + if len(earliestNewEvent.PrevEventIDs()) == 0 && earliestNewEvent.Type() == gomatrixserverlib.MRoomCreate && earliestNewEvent.StateKeyEquals("") { + return newEvents, true, t.isPrevStateKnown(ctx, e), nil } - // StateAtEventIDs returned some kind of state for the earliest event so we can fill in the gap! - logger.Infof("State known for backward extremity %s", earliestNewEvent.EventID()) - return newEvents, true, nil + + // If our backward extremity was not a known event to us then we obviously didn't + // close the gap. + if state, err := t.db.StateAtEventIDs(ctx, []string{earliestNewEvent.EventID()}); err != nil || len(state) == 0 && state[0].BeforeStateSnapshotNID == 0 { + return newEvents, false, false, nil + } + + // At this point we are satisfied that we know the state both at the earliest + // retrieved event and at the prev events of the new event. + return newEvents, true, t.isPrevStateKnown(ctx, e), nil +} + +func (t *missingStateReq) isPrevStateKnown(ctx context.Context, e *gomatrixserverlib.Event) bool { + expected := len(e.PrevEventIDs()) + state, err := t.db.StateAtEventIDs(ctx, e.PrevEventIDs()) + if err != nil || len(state) != expected { + // We didn't get as many state snapshots as we expected, or there was an error, + // so we haven't completely solved the problem for the new event. + return false + } + // Check to see if we have a populated state snapshot for all of the prev events. + for _, stateAtEvent := range state { + if stateAtEvent.BeforeStateSnapshotNID == 0 { + // One of the prev events still has unknown state, so we haven't really + // solved the problem. + return false + } + } + return true } func (t *missingStateReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( diff --git a/roomserver/storage/shared/room_updater.go b/roomserver/storage/shared/room_updater.go index bb9f5dc62..fc75a2606 100644 --- a/roomserver/storage/shared/room_updater.go +++ b/roomserver/storage/shared/room_updater.go @@ -187,6 +187,12 @@ func (u *RoomUpdater) EventIDs( return u.d.EventsTable.BulkSelectEventID(ctx, u.txn, eventNIDs) } +func (u *RoomUpdater) EventNIDs( + ctx context.Context, eventIDs []string, +) (map[string]types.EventNID, error) { + return u.d.eventNIDs(ctx, u.txn, eventIDs) +} + func (u *RoomUpdater) StateAtEventIDs( ctx context.Context, eventIDs []string, ) ([]types.StateAtEvent, error) {