mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-31 10:43:10 -06:00
Fill in gaps again in /gme code
This commit is contained in:
parent
ffe93f103f
commit
c83851b9b0
|
|
@ -51,19 +51,42 @@ func (t *missingStateReq) processEventWithMissingState(
|
||||||
// event ids and then use /event to fetch the individual events.
|
// event ids and then use /event to fetch the individual events.
|
||||||
// However not all version of synapse support /state_ids so you may
|
// However not all version of synapse support /state_ids so you may
|
||||||
// need to fallback to /state.
|
// need to fallback to /state.
|
||||||
|
logger := util.GetLogger(ctx).WithFields(map[string]interface{}{
|
||||||
|
"txn_event": e.EventID(),
|
||||||
|
"room_id": e.RoomID(),
|
||||||
|
"txn_prev_events": e.PrevEventIDs(),
|
||||||
|
})
|
||||||
|
|
||||||
// Attempt to fill in the gap using /get_missing_events
|
// Attempt to fill in the gap using /get_missing_events
|
||||||
// This will either:
|
// This will either:
|
||||||
// - fill in the gap completely then process event `e` returning no backwards extremity
|
// - fill in the gap completely then process event `e` returning no backwards extremity
|
||||||
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
|
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
|
||||||
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
|
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
|
||||||
newEvents, err := t.getMissingEvents(ctx, e, roomVersion)
|
newEvents, isGapFilled, err := t.getMissingEvents(ctx, e, roomVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("t.getMissingEvents: %w", err)
|
return fmt.Errorf("t.getMissingEvents: %w", err)
|
||||||
}
|
}
|
||||||
if len(newEvents) == 0 {
|
if len(newEvents) == 0 {
|
||||||
return fmt.Errorf("expected to find missing events but didn't")
|
return fmt.Errorf("expected to find missing events but didn't")
|
||||||
}
|
}
|
||||||
|
if isGapFilled {
|
||||||
|
logger.Infof("gap filled by /get_missing_events, injecting %d new events", len(newEvents))
|
||||||
|
// we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled
|
||||||
|
// in the gap in the DAG
|
||||||
|
for _, newEvent := range newEvents {
|
||||||
|
err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
|
||||||
|
Kind: api.KindNew,
|
||||||
|
Event: newEvent.Headered(roomVersion),
|
||||||
|
Origin: t.origin,
|
||||||
|
AuthEventIDs: newEvent.AuthEventIDs(),
|
||||||
|
SendAsServer: api.DoNotSendToOtherServers,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("t.inputer.processRoomEvent: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
backwardsExtremity := newEvents[0]
|
backwardsExtremity := newEvents[0]
|
||||||
newEvents = newEvents[1:]
|
newEvents = newEvents[1:]
|
||||||
|
|
@ -86,7 +109,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
||||||
// come from a remote server via /state_ids if not.
|
// come from a remote server via /state_ids if not.
|
||||||
prevState, trustworthy, lerr := t.lookupStateAfterEvent(ctx, roomVersion, backwardsExtremity.RoomID(), prevEventID)
|
prevState, trustworthy, lerr := t.lookupStateAfterEvent(ctx, roomVersion, backwardsExtremity.RoomID(), prevEventID)
|
||||||
if lerr != nil {
|
if lerr != nil {
|
||||||
util.GetLogger(ctx).WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
|
logger.WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
|
||||||
return lerr
|
return lerr
|
||||||
}
|
}
|
||||||
// Append the state onto the collected state. We'll run this through the
|
// Append the state onto the collected state. We'll run this through the
|
||||||
|
|
@ -107,7 +130,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
||||||
if !extremityIsCreate {
|
if !extremityIsCreate {
|
||||||
// There are no previous states and this isn't the beginning of the
|
// There are no previous states and this isn't the beginning of the
|
||||||
// room - this is an error condition!
|
// room - this is an error condition!
|
||||||
util.GetLogger(ctx).Errorf("Failed to lookup any state after prev_events")
|
logger.Errorf("Failed to lookup any state after prev_events")
|
||||||
return fmt.Errorf("expected %d states but got %d", len(backwardsExtremity.PrevEventIDs()), len(states))
|
return fmt.Errorf("expected %d states but got %d", len(backwardsExtremity.PrevEventIDs()), len(states))
|
||||||
}
|
}
|
||||||
case 1:
|
case 1:
|
||||||
|
|
@ -131,7 +154,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
||||||
resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, backwardsExtremity)
|
resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, backwardsExtremity)
|
||||||
t.roomsMu.Unlock(e.RoomID())
|
t.roomsMu.Unlock(e.RoomID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
|
logger.WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -197,7 +220,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
||||||
Kind: api.KindOld,
|
Kind: api.KindOld,
|
||||||
Event: newEvent.Headered(roomVersion),
|
Event: newEvent.Headered(roomVersion),
|
||||||
Origin: t.origin,
|
Origin: t.origin,
|
||||||
AuthEventIDs: backwardsExtremity.AuthEventIDs(),
|
AuthEventIDs: newEvent.AuthEventIDs(),
|
||||||
SendAsServer: api.DoNotSendToOtherServers,
|
SendAsServer: api.DoNotSendToOtherServers,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -372,7 +395,9 @@ retryAllowedState:
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, err error) {
|
// get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject,
|
||||||
|
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
|
||||||
|
func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled bool, err error) {
|
||||||
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
||||||
needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{e})
|
needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{e})
|
||||||
// query latest events (our trusted forward extremities)
|
// query latest events (our trusted forward extremities)
|
||||||
|
|
@ -383,7 +408,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
|
||||||
var res api.QueryLatestEventsAndStateResponse
|
var res api.QueryLatestEventsAndStateResponse
|
||||||
if err = t.queryer.QueryLatestEventsAndState(ctx, &req, &res); err != nil {
|
if err = t.queryer.QueryLatestEventsAndState(ctx, &req, &res); err != nil {
|
||||||
logger.WithError(err).Warn("Failed to query latest events")
|
logger.WithError(err).Warn("Failed to query latest events")
|
||||||
return nil, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
latestEvents := make([]string, len(res.LatestEvents))
|
latestEvents := make([]string, len(res.LatestEvents))
|
||||||
for i, ev := range res.LatestEvents {
|
for i, ev := range res.LatestEvents {
|
||||||
|
|
@ -408,7 +433,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
|
||||||
if errors.Is(err, context.DeadlineExceeded) {
|
if errors.Is(err, context.DeadlineExceeded) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done(): // the parent request context timed out
|
case <-ctx.Done(): // the parent request context timed out
|
||||||
return nil, context.DeadlineExceeded
|
return nil, false, context.DeadlineExceeded
|
||||||
default: // this request exceed its own timeout
|
default: // this request exceed its own timeout
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -421,7 +446,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
|
||||||
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
||||||
t.origin, len(t.servers),
|
t.origin, len(t.servers),
|
||||||
)
|
)
|
||||||
return nil, missingPrevEventsError{
|
return nil, false, missingPrevEventsError{
|
||||||
eventID: e.EventID(),
|
eventID: e.EventID(),
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
|
|
@ -453,13 +478,27 @@ Event:
|
||||||
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
||||||
t.origin,
|
t.origin,
|
||||||
)
|
)
|
||||||
return nil, missingPrevEventsError{
|
return nil, false, missingPrevEventsError{
|
||||||
eventID: e.EventID(),
|
eventID: e.EventID(),
|
||||||
err: err,
|
err: err,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if len(newEvents) == 0 {
|
||||||
|
return nil, false, nil // TODO: error instead?
|
||||||
|
}
|
||||||
|
|
||||||
return newEvents, nil
|
// now check if we can fill the gap. Look to see if we have state snapshot IDs for the earliest event
|
||||||
|
earliestNewEvent := newEvents[0]
|
||||||
|
if state, err := t.db.StateAtEventIDs(ctx, []string{earliestNewEvent.EventID()}); err != nil || len(state) == 0 {
|
||||||
|
if earliestNewEvent.Type() == gomatrixserverlib.MRoomCreate && earliestNewEvent.StateKeyEquals("") {
|
||||||
|
// we got to the beginning of the room so there will be no state! It's all good we can process this
|
||||||
|
return newEvents, true, nil
|
||||||
|
}
|
||||||
|
// we don't have the state at this earliest event from /g_m_e so we won't have state for later events either
|
||||||
|
return newEvents, false, nil
|
||||||
|
}
|
||||||
|
// StateAtEventIDs returned some kind of state for the earliest event so we can fill in the gap!
|
||||||
|
return newEvents, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *missingStateReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
func (t *missingStateReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue