diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index 5296d9213..e1e2d597c 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -51,19 +51,42 @@ func (t *missingStateReq) processEventWithMissingState( // event ids and then use /event to fetch the individual events. // However not all version of synapse support /state_ids so you may // need to fallback to /state. + logger := util.GetLogger(ctx).WithFields(map[string]interface{}{ + "txn_event": e.EventID(), + "room_id": e.RoomID(), + "txn_prev_events": e.PrevEventIDs(), + }) // Attempt to fill in the gap using /get_missing_events // This will either: // - fill in the gap completely then process event `e` returning no backwards extremity // - fail to fill in the gap and tell us to terminate the transaction err=not nil // - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction - newEvents, err := t.getMissingEvents(ctx, e, roomVersion) + newEvents, isGapFilled, err := t.getMissingEvents(ctx, e, roomVersion) if err != nil { return fmt.Errorf("t.getMissingEvents: %w", err) } if len(newEvents) == 0 { return fmt.Errorf("expected to find missing events but didn't") } + if isGapFilled { + logger.Infof("gap filled by /get_missing_events, injecting %d new events", len(newEvents)) + // we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled + // in the gap in the DAG + for _, newEvent := range newEvents { + err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{ + Kind: api.KindNew, + Event: newEvent.Headered(roomVersion), + Origin: t.origin, + AuthEventIDs: newEvent.AuthEventIDs(), + SendAsServer: api.DoNotSendToOtherServers, + }) + if err != nil { + return fmt.Errorf("t.inputer.processRoomEvent: %w", err) + } + } + return nil + } backwardsExtremity := newEvents[0] newEvents = newEvents[1:] @@ -86,7 +109,7 @@ func (t *missingStateReq) processEventWithMissingState( // come from a remote server via /state_ids if not. prevState, trustworthy, lerr := t.lookupStateAfterEvent(ctx, roomVersion, backwardsExtremity.RoomID(), prevEventID) if lerr != nil { - util.GetLogger(ctx).WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID) + logger.WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID) return lerr } // Append the state onto the collected state. We'll run this through the @@ -107,7 +130,7 @@ func (t *missingStateReq) processEventWithMissingState( if !extremityIsCreate { // There are no previous states and this isn't the beginning of the // room - this is an error condition! - util.GetLogger(ctx).Errorf("Failed to lookup any state after prev_events") + logger.Errorf("Failed to lookup any state after prev_events") return fmt.Errorf("expected %d states but got %d", len(backwardsExtremity.PrevEventIDs()), len(states)) } case 1: @@ -131,7 +154,7 @@ func (t *missingStateReq) processEventWithMissingState( resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, backwardsExtremity) t.roomsMu.Unlock(e.RoomID()) if err != nil { - util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID()) + logger.WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID()) return err } } @@ -197,7 +220,7 @@ func (t *missingStateReq) processEventWithMissingState( Kind: api.KindOld, Event: newEvent.Headered(roomVersion), Origin: t.origin, - AuthEventIDs: backwardsExtremity.AuthEventIDs(), + AuthEventIDs: newEvent.AuthEventIDs(), SendAsServer: api.DoNotSendToOtherServers, }) if err != nil { @@ -372,7 +395,9 @@ retryAllowedState: }, nil } -func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, err error) { +// get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject, +// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events +func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled bool, err error) { logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{e}) // query latest events (our trusted forward extremities) @@ -383,7 +408,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve var res api.QueryLatestEventsAndStateResponse if err = t.queryer.QueryLatestEventsAndState(ctx, &req, &res); err != nil { logger.WithError(err).Warn("Failed to query latest events") - return nil, err + return nil, false, err } latestEvents := make([]string, len(res.LatestEvents)) for i, ev := range res.LatestEvents { @@ -408,7 +433,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve if errors.Is(err, context.DeadlineExceeded) { select { case <-ctx.Done(): // the parent request context timed out - return nil, context.DeadlineExceeded + return nil, false, context.DeadlineExceeded default: // this request exceed its own timeout continue } @@ -421,7 +446,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve "%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", t.origin, len(t.servers), ) - return nil, missingPrevEventsError{ + return nil, false, missingPrevEventsError{ eventID: e.EventID(), err: err, } @@ -453,13 +478,27 @@ Event: "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", t.origin, ) - return nil, missingPrevEventsError{ + return nil, false, missingPrevEventsError{ eventID: e.EventID(), err: err, } } + if len(newEvents) == 0 { + return nil, false, nil // TODO: error instead? + } - return newEvents, nil + // now check if we can fill the gap. Look to see if we have state snapshot IDs for the earliest event + earliestNewEvent := newEvents[0] + if state, err := t.db.StateAtEventIDs(ctx, []string{earliestNewEvent.EventID()}); err != nil || len(state) == 0 { + if earliestNewEvent.Type() == gomatrixserverlib.MRoomCreate && earliestNewEvent.StateKeyEquals("") { + // we got to the beginning of the room so there will be no state! It's all good we can process this + return newEvents, true, nil + } + // we don't have the state at this earliest event from /g_m_e so we won't have state for later events either + return newEvents, false, nil + } + // StateAtEventIDs returned some kind of state for the earliest event so we can fill in the gap! + return newEvents, true, nil } func (t *missingStateReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (