This commit is contained in:
Kegan Dougal 2020-05-12 11:44:07 +01:00
parent 3acdc3a0c3
commit e8665bae69
2 changed files with 65 additions and 59 deletions

View file

@ -351,7 +351,8 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
var states []*gomatrixserverlib.RespState var states []*gomatrixserverlib.RespState
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples() needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples()
for _, prevEventID := range backwardsExtremity.PrevEventIDs() { for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
prevState, err := t.lookupStateAfterEvent(roomVersion, backwardsExtremity.RoomID(), prevEventID, needed) var prevState *gomatrixserverlib.RespState
prevState, err = t.lookupStateAfterEvent(roomVersion, backwardsExtremity.RoomID(), prevEventID, needed)
if err != nil { if err != nil {
util.GetLogger(t.context).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID) util.GetLogger(t.context).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
return err return err
@ -379,55 +380,10 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
// added into the mix. // added into the mix.
func (t *txnReq) lookupStateAfterEvent(roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) (*gomatrixserverlib.RespState, error) { func (t *txnReq) lookupStateAfterEvent(roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) (*gomatrixserverlib.RespState, error) {
var res api.QueryStateAfterEventsResponse // try doing all this locally before we resort to querying federation
err := t.rsAPI.QueryStateAfterEvents(t.context, &api.QueryStateAfterEventsRequest{ respState := t.lookupStateAfterEventLocally(roomID, eventID, needed)
RoomID: roomID, if respState != nil {
PrevEventIDs: []string{eventID}, return respState, nil
StateToFetch: needed,
}, &res)
if err != nil || !res.PrevEventsExist {
util.GetLogger(t.context).WithError(err).Warnf("failed to query state after %s locally; trying remotely", eventID)
} else {
for i, ev := range res.StateEvents {
t.haveEvents[ev.EventID()] = &res.StateEvents[i]
}
var authEvents []gomatrixserverlib.Event
missingAuthEvents := make(map[string]bool)
for _, ev := range res.StateEvents {
for _, ae := range ev.AuthEventIDs() {
aev, ok := t.haveEvents[ae]
if ok {
authEvents = append(authEvents, aev.Unwrap())
} else {
missingAuthEvents[ae] = true
}
}
}
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
// have stored the event.
var missingEventList []string
for evID := range missingAuthEvents {
missingEventList = append(missingEventList, evID)
}
queryReq := api.QueryEventsByIDRequest{
EventIDs: missingEventList,
}
util.GetLogger(t.context).Infof("Fetching missing auth events: %v", missingEventList)
var queryRes api.QueryEventsByIDResponse
if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
return nil, err
}
for i := range queryRes.Events {
evID := queryRes.Events[i].EventID()
t.haveEvents[evID] = &queryRes.Events[i]
authEvents = append(authEvents, queryRes.Events[i].Unwrap())
}
evs := gomatrixserverlib.UnwrapEventHeaders(res.StateEvents)
return &gomatrixserverlib.RespState{
StateEvents: evs,
AuthEvents: authEvents,
}, nil
} }
respState, err := t.lookupStateBeforeEvent(roomVersion, roomID, eventID) respState, err := t.lookupStateBeforeEvent(roomVersion, roomID, eventID)
@ -459,6 +415,59 @@ func (t *txnReq) lookupStateAfterEvent(roomVersion gomatrixserverlib.RoomVersion
return respState, nil return respState, nil
} }
func (t *txnReq) lookupStateAfterEventLocally(roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) *gomatrixserverlib.RespState {
var res api.QueryStateAfterEventsResponse
err := t.rsAPI.QueryStateAfterEvents(t.context, &api.QueryStateAfterEventsRequest{
RoomID: roomID,
PrevEventIDs: []string{eventID},
StateToFetch: needed,
}, &res)
if err != nil || !res.PrevEventsExist {
util.GetLogger(t.context).WithError(err).Warnf("failed to query state after %s locally", eventID)
return nil
}
for i, ev := range res.StateEvents {
t.haveEvents[ev.EventID()] = &res.StateEvents[i]
}
var authEvents []gomatrixserverlib.Event
missingAuthEvents := make(map[string]bool)
for _, ev := range res.StateEvents {
for _, ae := range ev.AuthEventIDs() {
aev, ok := t.haveEvents[ae]
if ok {
authEvents = append(authEvents, aev.Unwrap())
} else {
missingAuthEvents[ae] = true
}
}
}
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
// have stored the event.
var missingEventList []string
for evID := range missingAuthEvents {
missingEventList = append(missingEventList, evID)
}
queryReq := api.QueryEventsByIDRequest{
EventIDs: missingEventList,
}
util.GetLogger(t.context).Infof("Fetching missing auth events: %v", missingEventList)
var queryRes api.QueryEventsByIDResponse
if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
return nil
}
for i := range queryRes.Events {
evID := queryRes.Events[i].EventID()
t.haveEvents[evID] = &queryRes.Events[i]
authEvents = append(authEvents, queryRes.Events[i].Unwrap())
}
evs := gomatrixserverlib.UnwrapEventHeaders(res.StateEvents)
return &gomatrixserverlib.RespState{
StateEvents: evs,
AuthEvents: authEvents,
}
}
func (t *txnReq) lookupCurrentState(newEvent *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) { func (t *txnReq) lookupCurrentState(newEvent *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) {
// Ask the roomserver for information about this room // Ask the roomserver for information about this room
queryReq := api.QueryLatestEventsAndStateRequest{ queryReq := api.QueryLatestEventsAndStateRequest{
@ -497,12 +506,8 @@ func (t *txnReq) resolveStatesAndCheck(roomVersion gomatrixserverlib.RoomVersion
var authEventList []gomatrixserverlib.Event var authEventList []gomatrixserverlib.Event
var stateEventList []gomatrixserverlib.Event var stateEventList []gomatrixserverlib.Event
for _, state := range states { for _, state := range states {
for _, ae := range state.AuthEvents { authEventList = append(authEventList, state.AuthEvents...)
authEventList = append(authEventList, ae) stateEventList = append(stateEventList, state.StateEvents...)
}
for _, se := range state.StateEvents {
stateEventList = append(stateEventList, se)
}
} }
resolvedStateEvents, err := gomatrixserverlib.ResolveConflicts(roomVersion, stateEventList, authEventList) resolvedStateEvents, err := gomatrixserverlib.ResolveConflicts(roomVersion, stateEventList, authEventList)
if err != nil { if err != nil {
@ -513,9 +518,9 @@ retryAllowedState:
if err = checkAllowedByState(*backwardsExtremity, resolvedStateEvents); err != nil { if err = checkAllowedByState(*backwardsExtremity, resolvedStateEvents); err != nil {
switch missing := err.(type) { switch missing := err.(type) {
case gomatrixserverlib.MissingAuthEventError: case gomatrixserverlib.MissingAuthEventError:
h, err := t.lookupEvent(roomVersion, missing.AuthEventID, true) h, err2 := t.lookupEvent(roomVersion, missing.AuthEventID, true)
if err != nil { if err2 != nil {
return nil, fmt.Errorf("missing auth event %s and failed to look it up: %w", missing.AuthEventID, err) return nil, fmt.Errorf("missing auth event %s and failed to look it up: %w", missing.AuthEventID, err2)
} }
util.GetLogger(t.context).Infof("fetched event %s", missing.AuthEventID) util.GetLogger(t.context).Infof("fetched event %s", missing.AuthEventID)
resolvedStateEvents = append(resolvedStateEvents, h.Unwrap()) resolvedStateEvents = append(resolvedStateEvents, h.Unwrap())

View file

@ -264,6 +264,7 @@ User can invite local user to room with version 5
remote user can join room with version 5 remote user can join room with version 5
User can invite remote user to room with version 5 User can invite remote user to room with version 5
Remote user can backfill in a room with version 5 Remote user can backfill in a room with version 5
Inbound federation can receive v1 /send_join
Inbound federation can get state for a room Inbound federation can get state for a room
Inbound federation of state requires event_id as a mandatory paramater Inbound federation of state requires event_id as a mandatory paramater
Inbound federation can get state_ids for a room Inbound federation can get state_ids for a room