WIP get_missing_events work

This commit is contained in:
Kegan Dougal 2020-05-07 17:51:38 +01:00
parent 3b98535dc5
commit 30dfe0a64b
4 changed files with 252 additions and 18 deletions

View file

@ -226,6 +226,28 @@ func Setup(
},
)).Methods(http.MethodGet)
v1fedmux.Handle("/send_join/{roomID}/{eventID}", common.MakeFedAPI(
"federation_send_join", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars, err := common.URLDecodeMapValues(mux.Vars(httpReq))
if err != nil {
return util.ErrorResponse(err)
}
roomID := vars["roomID"]
eventID := vars["eventID"]
res := SendJoin(
httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID,
)
return util.JSONResponse{
Headers: res.Headers,
Code: res.Code,
JSON: []interface{}{
res.Code, res.JSON,
},
}
},
)).Methods(http.MethodPut)
v2fedmux.Handle("/send_join/{roomID}/{eventID}", common.MakeFedAPI(
"federation_send_join", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {

View file

@ -105,6 +105,7 @@ type txnReq struct {
eduProducer *producers.EDUServerProducer
keys gomatrixserverlib.JSONVerifier
federation txnFederationClient
getMissingEventRecursionCount int
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
@ -114,6 +115,8 @@ type txnFederationClient interface {
)
LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error)
GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error)
LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents,
roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error)
}
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
@ -148,7 +151,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
// Process the events.
for _, e := range pdus {
err := t.processEvent(e.Unwrap())
err := t.processEvent(e.Unwrap(), true)
if err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
@ -168,6 +171,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
switch err.(type) {
case roomNotFoundError:
case *gomatrixserverlib.NotAllowed:
case missingPrevEventsError:
default:
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
@ -197,12 +201,19 @@ type verifySigError struct {
eventID string
err error
}
type missingPrevEventsError struct {
eventID string
err error
}
func (e roomNotFoundError) Error() string { return fmt.Sprintf("room %q not found", e.roomID) }
func (e unmarshalError) Error() string { return fmt.Sprintf("unable to parse event: %s", e.err) }
func (e verifySigError) Error() string {
return fmt.Sprintf("unable to verify signature of event %q: %s", e.eventID, e.err)
}
func (e missingPrevEventsError) Error() string {
return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err)
}
func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
for _, e := range edus {
@ -227,8 +238,9 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
}
}
func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) error {
prevEventIDs := e.PrevEventIDs()
util.GetLogger(t.context).Infof("%s processEvent %s with prev_events %v", e.RoomID(), e.EventID(), prevEventIDs)
// Fetch the state needed to authenticate the event.
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
@ -241,6 +253,8 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
if err := t.rsAPI.QueryStateAfterEvents(t.context, &stateReq, &stateResp); err != nil {
return err
}
util.GetLogger(t.context).Infof("processEvent %s stateResp.PrevEventsExist: %v", e.EventID(), stateResp.PrevEventsExist)
util.GetLogger(t.context).Infof("NEEDED TUPLES: %+v", needed.Tuples())
if !stateResp.RoomExists {
// TODO: When synapse receives a message for a room it is not in it
@ -253,13 +267,14 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
}
if !stateResp.PrevEventsExist {
return t.processEventWithMissingState(e, stateResp.RoomVersion)
return t.processEventWithMissingState(e, stateResp.RoomVersion, isInboundTxn)
}
// Check that the event is allowed by the state at the event.
var events []gomatrixserverlib.Event
for _, headeredEvent := range stateResp.StateEvents {
events = append(events, headeredEvent.Unwrap())
util.GetLogger(t.context).Infof("Room state: %s -> %s", headeredEvent.Type(), string(headeredEvent.Content()))
}
if err := checkAllowedByState(e, events); err != nil {
return err
@ -291,7 +306,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver
return gomatrixserverlib.Allowed(e, &authUsingState)
}
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) error {
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
@ -306,7 +321,19 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
// event ids and then use /event to fetch the individual events.
// However not all version of synapse support /state_ids so you may
// need to fallback to /state.
// TODO: Attempt to fill in the gap using /get_missing_events
// Attempt to fill in the gap using /get_missing_events
// This will either:
// - fill in the gap completely then process event `e` returning ok=true err=nil
// - fail to fill in the gap and tell us to terminate the transaction ok=false, err=not nil
// - fail to fill in the gap and tell us to fetch state, and to not terminate the transaction, ok=false, err=nil
ok, err := t.getMissingEvents(e, roomVersion, isInboundTxn)
if err != nil {
return err
}
if ok {
return nil
}
// Attempt to fetch the missing state using /state_ids and /events
respState, haveEventIDs, err := t.lookupMissingStateViaStateIDs(e, roomVersion)
@ -319,6 +346,11 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
}
}
// security: every time we fetch state from a remote server we have to do state resolution.
// Say you have a forward extremity F with state you trust, and a malicious event M comes in which points to a prev event P,
// then if you take the state at M you don't do any state res. If you fetch state for P then to get the new current state of
// the room you have to do state res across F and P
// Check that the event is allowed by the state.
retryAllowedState:
if err := checkAllowedByState(e, respState.StateEvents); err != nil {
@ -329,7 +361,7 @@ retryAllowedState:
if s.EventID() != missing.AuthEventID {
continue
}
err = t.processEventWithMissingState(s, roomVersion)
err = t.processEventWithMissingState(s, roomVersion, isInboundTxn)
// If there was no error retrieving the event from federation then
// we assume that it succeeded, so retry the original state check
if err == nil {
@ -346,6 +378,99 @@ retryAllowedState:
return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion), haveEventIDs)
}
// getMissingEvents returns ok=true if missing events were fetched and handled, else false. Returns an error only if we should
// terminate the transaction which initiated /get_missing_events
// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
// This means that we may recursively call this function, as we spider back up prev_events to the min depth.
func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (ok bool, err error) {
if !isInboundTxn {
// we've recursed here, so just take a state snapshot please!
return false, nil
}
logger := util.GetLogger(t.context).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
// query latest events (our trusted forward extremities)
req := api.QueryLatestEventsAndStateRequest{
RoomID: e.RoomID(),
StateToFetch: needed.Tuples(),
}
var res api.QueryLatestEventsAndStateResponse
if err = t.rsAPI.QueryLatestEventsAndState(t.context, &req, &res); err != nil {
logger.WithError(err).Warn("Failed to query latest events")
return false, nil
}
latestEvents := make([]string, len(res.LatestEvents))
for i := range res.LatestEvents {
latestEvents[i] = res.LatestEvents[i].EventID
}
// this server just sent us an event for which we do not know its prev_events - ask that server for those prev_events.
missingResp, err := t.federation.LookupMissingEvents(t.context, t.Origin, e.RoomID(), gomatrixserverlib.MissingEvents{
Limit: 20,
// synapse uses the min depth they've ever seen in that room
MinDepth: int(res.Depth) - 20,
// The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events.
EarliestEvents: latestEvents,
// The event IDs to retrieve the previous events for.
LatestEvents: []string{e.EventID()},
}, roomVersion)
// security: how we handle failures depends on whether or not this event will become the new forward extremity for the room.
// There's 2 scenarios to consider:
// - Case A: We got pushed an event and are now fetching missing prev_events. (isInboundTxn=true)
// - Case B: We are fetching missing prev_events already and now fetching some more (isInboundTxn=false)
// In Case B, we know for sure that the event we are currently processing will not become the new forward extremity for the room,
// as it was called in response to an inbound txn which had it as a prev_event.
// In Case A, the event is a forward extremity, and could eventually become the _only_ forward extremity in the room. This is bad
// because it means we would trust the state at that event to be the state for the entire room, and allows rooms to be hijacked.
// https://github.com/matrix-org/synapse/pull/3456
// https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335
// For now, we do not allow Case B, so reject the event.
if err != nil {
logger.WithError(err).Errorf(
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
t.Origin,
)
return false, missingPrevEventsError{
eventID: e.EventID(),
err: err,
}
}
// topologically sort and sanity check that we are making forward progress
newEvents := gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
shouldHaveSomeEventIDs := e.PrevEventIDs()
hasPrevEvent := false
Event:
for _, pe := range shouldHaveSomeEventIDs {
for _, ev := range newEvents {
if ev.EventID() == pe {
hasPrevEvent = true
break Event
}
}
}
if !hasPrevEvent {
err = fmt.Errorf("called /get_missing_events but server %s didn't return any prev_events with IDs %v", t.Origin, shouldHaveSomeEventIDs)
logger.WithError(err).Errorf(
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
t.Origin,
)
return false, missingPrevEventsError{
eventID: e.EventID(),
err: err,
}
}
// process the missing events then the event which started this whole thing
for _, ev := range append(newEvents, e) {
err := t.processEvent(ev, false)
if err != nil {
return false, err
}
}
return true, nil
}
func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
respState *gomatrixserverlib.RespState, err error) {
state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion)

View file

@ -82,6 +82,7 @@ type testRoomserverAPI struct {
inputRoomEvents []api.InputRoomEvent
queryStateAfterEvents func(*api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse
queryEventsByID func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse
queryLatestEventsAndState func(*api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse
}
func (t *testRoomserverAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) {}
@ -117,6 +118,13 @@ func (t *testRoomserverAPI) QueryLatestEventsAndState(
request *api.QueryLatestEventsAndStateRequest,
response *api.QueryLatestEventsAndStateResponse,
) error {
r := t.queryLatestEventsAndState(request)
response.QueryLatestEventsAndStateRequest = *request
response.RoomExists = r.RoomExists
response.RoomVersion = testRoomVersion
response.LatestEvents = r.LatestEvents
response.StateEvents = r.StateEvents
response.Depth = r.Depth
return nil
}
@ -279,6 +287,7 @@ type txnFedClient struct {
state map[string]gomatrixserverlib.RespState // event_id to response
stateIDs map[string]gomatrixserverlib.RespStateIDs // event_id to response
getEvent map[string]gomatrixserverlib.Transaction // event_id to response
getMissingEvents func(gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error)
}
func (c *txnFedClient) LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
@ -310,6 +319,10 @@ func (c *txnFedClient) GetEvent(ctx context.Context, s gomatrixserverlib.ServerN
res = r
return
}
func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents,
roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) {
return c.getMissingEvents(missing)
}
func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq {
t := &txnReq{
@ -424,6 +437,77 @@ func TestTransactionFailAuthChecks(t *testing.T) {
assertInputRoomEvents(t, rsAPI.inputRoomEvents, nil) // expect no messages to be sent to the roomserver
}
// The purpose of this test is to make sure that when an event is received for which we do not know the prev_events,
// we request them from /get_missing_events. It works by setting PrevEventsExist=false in the roomserver query response,
// resulting in a call to /get_missing_events which returns the missing prev event. Both events should be processed in
// topological order and sent to the roomserver.
func TestTransactionFetchMissingPrevEvents(t *testing.T) {
haveEvent := testEvents[len(testEvents)-3]
prevEvent := testEvents[len(testEvents)-2]
inputEvent := testEvents[len(testEvents)-1]
var rsAPI *testRoomserverAPI // ref here so we can refer to inputRoomEvents inside these functions
rsAPI = &testRoomserverAPI{
queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse {
// we expect this to be called three times:
// - first with input event to realise there's a gap
// - second with the prevEvent to realise there is no gap
// - third with the input event to realise there is no longer a gap
prevEventsExist := false
if len(req.PrevEventIDs) == 1 {
switch req.PrevEventIDs[0] {
case haveEvent.EventID():
prevEventsExist = true
case prevEvent.EventID():
// we only have this event if we've been send prevEvent
if len(rsAPI.inputRoomEvents) == 1 && rsAPI.inputRoomEvents[0].Event.EventID() == prevEvent.EventID() {
prevEventsExist = true
}
}
}
return api.QueryStateAfterEventsResponse{
PrevEventsExist: prevEventsExist,
RoomExists: true,
StateEvents: fromStateTuples(req.StateToFetch, nil),
}
},
queryLatestEventsAndState: func(req *api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse {
return api.QueryLatestEventsAndStateResponse{
RoomExists: true,
Depth: haveEvent.Depth(),
LatestEvents: []gomatrixserverlib.EventReference{
haveEvent.EventReference(),
},
StateEvents: fromStateTuples(req.StateToFetch, nil),
}
},
}
cli := &txnFedClient{
getMissingEvents: func(missing gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error) {
if !reflect.DeepEqual(missing.EarliestEvents, []string{haveEvent.EventID()}) {
t.Errorf("call to /get_missing_events wrong earliest events: got %v want %v", missing.EarliestEvents, haveEvent.EventID())
}
if !reflect.DeepEqual(missing.LatestEvents, []string{inputEvent.EventID()}) {
t.Errorf("call to /get_missing_events wrong latest events: got %v want %v", missing.LatestEvents, inputEvent.EventID())
}
return gomatrixserverlib.RespMissingEvents{
Events: []gomatrixserverlib.Event{
prevEvent.Unwrap(),
},
}, nil
},
}
pdus := []json.RawMessage{
inputEvent.JSON(),
}
txn := mustCreateTransaction(rsAPI, cli, pdus)
mustProcessTransaction(t, txn, nil)
assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{prevEvent, inputEvent})
}
// The purpose of this test is to check that when there are missing prev_events that state is fetched via /state_ids
// and /event and not /state. It works by setting PrevEventsExist=false in the roomserver query response, resulting in
// a call to /state_ids which returns the whole room state. It should attempt to fetch as many of these events from the

View file

@ -264,3 +264,6 @@ User can invite local user to room with version 5
remote user can join room with version 5
User can invite remote user to room with version 5
Remote user can backfill in a room with version 5
Federation rejects inbound events where the prev_events cannot be found
Outbound federation requests missing prev_events and then asks for /state_ids and resolves the state
Should not be able to take over the room by pretending there is no PL event