diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index a5b8ce24e..67f3a957c 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -226,6 +226,28 @@ func Setup( }, )).Methods(http.MethodGet) + v1fedmux.Handle("/send_join/{roomID}/{eventID}", common.MakeFedAPI( + "federation_send_join", cfg.Matrix.ServerName, keys, + func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { + vars, err := common.URLDecodeMapValues(mux.Vars(httpReq)) + if err != nil { + return util.ErrorResponse(err) + } + roomID := vars["roomID"] + eventID := vars["eventID"] + res := SendJoin( + httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID, + ) + return util.JSONResponse{ + Headers: res.Headers, + Code: res.Code, + JSON: []interface{}{ + res.Code, res.JSON, + }, + } + }, + )).Methods(http.MethodPut) + v2fedmux.Handle("/send_join/{roomID}/{eventID}", common.MakeFedAPI( "federation_send_join", cfg.Matrix.ServerName, keys, func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index e6f91d94a..b16dbd0fa 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -99,12 +99,13 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - context context.Context - rsAPI api.RoomserverInternalAPI - producer *producers.RoomserverProducer - eduProducer *producers.EDUServerProducer - keys gomatrixserverlib.JSONVerifier - federation txnFederationClient + context context.Context + rsAPI api.RoomserverInternalAPI + producer *producers.RoomserverProducer + eduProducer *producers.EDUServerProducer + keys gomatrixserverlib.JSONVerifier + federation txnFederationClient + getMissingEventRecursionCount int } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -114,6 +115,8 @@ type txnFederationClient interface { ) LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error) GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error) + LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents, + roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) } func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { @@ -148,7 +151,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { // Process the events. for _, e := range pdus { - err := t.processEvent(e.Unwrap()) + err := t.processEvent(e.Unwrap(), true) if err != nil { // If the error is due to the event itself being bad then we skip // it and move onto the next event. We report an error so that the @@ -168,6 +171,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { switch err.(type) { case roomNotFoundError: case *gomatrixserverlib.NotAllowed: + case missingPrevEventsError: default: // Any other error should be the result of a temporary error in // our server so we should bail processing the transaction entirely. @@ -197,12 +201,19 @@ type verifySigError struct { eventID string err error } +type missingPrevEventsError struct { + eventID string + err error +} func (e roomNotFoundError) Error() string { return fmt.Sprintf("room %q not found", e.roomID) } func (e unmarshalError) Error() string { return fmt.Sprintf("unable to parse event: %s", e.err) } func (e verifySigError) Error() string { return fmt.Sprintf("unable to verify signature of event %q: %s", e.eventID, e.err) } +func (e missingPrevEventsError) Error() string { + return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err) +} func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { for _, e := range edus { @@ -227,8 +238,9 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { } } -func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { +func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) error { prevEventIDs := e.PrevEventIDs() + util.GetLogger(t.context).Infof("%s processEvent %s with prev_events %v", e.RoomID(), e.EventID(), prevEventIDs) // Fetch the state needed to authenticate the event. needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e}) @@ -241,6 +253,8 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { if err := t.rsAPI.QueryStateAfterEvents(t.context, &stateReq, &stateResp); err != nil { return err } + util.GetLogger(t.context).Infof("processEvent %s stateResp.PrevEventsExist: %v", e.EventID(), stateResp.PrevEventsExist) + util.GetLogger(t.context).Infof("NEEDED TUPLES: %+v", needed.Tuples()) if !stateResp.RoomExists { // TODO: When synapse receives a message for a room it is not in it @@ -253,13 +267,14 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { } if !stateResp.PrevEventsExist { - return t.processEventWithMissingState(e, stateResp.RoomVersion) + return t.processEventWithMissingState(e, stateResp.RoomVersion, isInboundTxn) } // Check that the event is allowed by the state at the event. var events []gomatrixserverlib.Event for _, headeredEvent := range stateResp.StateEvents { events = append(events, headeredEvent.Unwrap()) + util.GetLogger(t.context).Infof("Room state: %s -> %s", headeredEvent.Type(), string(headeredEvent.Content())) } if err := checkAllowedByState(e, events); err != nil { return err @@ -291,7 +306,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver return gomatrixserverlib.Allowed(e, &authUsingState) } -func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error { +func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) error { // We are missing the previous events for this events. // This means that there is a gap in our view of the history of the // room. There two ways that we can handle such a gap: @@ -306,7 +321,19 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer // event ids and then use /event to fetch the individual events. // However not all version of synapse support /state_ids so you may // need to fallback to /state. - // TODO: Attempt to fill in the gap using /get_missing_events + + // Attempt to fill in the gap using /get_missing_events + // This will either: + // - fill in the gap completely then process event `e` returning ok=true err=nil + // - fail to fill in the gap and tell us to terminate the transaction ok=false, err=not nil + // - fail to fill in the gap and tell us to fetch state, and to not terminate the transaction, ok=false, err=nil + ok, err := t.getMissingEvents(e, roomVersion, isInboundTxn) + if err != nil { + return err + } + if ok { + return nil + } // Attempt to fetch the missing state using /state_ids and /events respState, haveEventIDs, err := t.lookupMissingStateViaStateIDs(e, roomVersion) @@ -319,6 +346,11 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer } } + // security: every time we fetch state from a remote server we have to do state resolution. + // Say you have a forward extremity F with state you trust, and a malicious event M comes in which points to a prev event P, + // then if you take the state at M you don't do any state res. If you fetch state for P then to get the new current state of + // the room you have to do state res across F and P + // Check that the event is allowed by the state. retryAllowedState: if err := checkAllowedByState(e, respState.StateEvents); err != nil { @@ -329,7 +361,7 @@ retryAllowedState: if s.EventID() != missing.AuthEventID { continue } - err = t.processEventWithMissingState(s, roomVersion) + err = t.processEventWithMissingState(s, roomVersion, isInboundTxn) // If there was no error retrieving the event from federation then // we assume that it succeeded, so retry the original state check if err == nil { @@ -346,6 +378,99 @@ retryAllowedState: return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion), haveEventIDs) } +// getMissingEvents returns ok=true if missing events were fetched and handled, else false. Returns an error only if we should +// terminate the transaction which initiated /get_missing_events +// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns. +// This means that we may recursively call this function, as we spider back up prev_events to the min depth. +func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (ok bool, err error) { + if !isInboundTxn { + // we've recursed here, so just take a state snapshot please! + return false, nil + } + logger := util.GetLogger(t.context).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) + needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e}) + // query latest events (our trusted forward extremities) + req := api.QueryLatestEventsAndStateRequest{ + RoomID: e.RoomID(), + StateToFetch: needed.Tuples(), + } + var res api.QueryLatestEventsAndStateResponse + if err = t.rsAPI.QueryLatestEventsAndState(t.context, &req, &res); err != nil { + logger.WithError(err).Warn("Failed to query latest events") + return false, nil + } + latestEvents := make([]string, len(res.LatestEvents)) + for i := range res.LatestEvents { + latestEvents[i] = res.LatestEvents[i].EventID + } + // this server just sent us an event for which we do not know its prev_events - ask that server for those prev_events. + missingResp, err := t.federation.LookupMissingEvents(t.context, t.Origin, e.RoomID(), gomatrixserverlib.MissingEvents{ + Limit: 20, + // synapse uses the min depth they've ever seen in that room + MinDepth: int(res.Depth) - 20, + // The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events. + EarliestEvents: latestEvents, + // The event IDs to retrieve the previous events for. + LatestEvents: []string{e.EventID()}, + }, roomVersion) + + // security: how we handle failures depends on whether or not this event will become the new forward extremity for the room. + // There's 2 scenarios to consider: + // - Case A: We got pushed an event and are now fetching missing prev_events. (isInboundTxn=true) + // - Case B: We are fetching missing prev_events already and now fetching some more (isInboundTxn=false) + // In Case B, we know for sure that the event we are currently processing will not become the new forward extremity for the room, + // as it was called in response to an inbound txn which had it as a prev_event. + // In Case A, the event is a forward extremity, and could eventually become the _only_ forward extremity in the room. This is bad + // because it means we would trust the state at that event to be the state for the entire room, and allows rooms to be hijacked. + // https://github.com/matrix-org/synapse/pull/3456 + // https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335 + // For now, we do not allow Case B, so reject the event. + if err != nil { + logger.WithError(err).Errorf( + "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", + t.Origin, + ) + return false, missingPrevEventsError{ + eventID: e.EventID(), + err: err, + } + } + + // topologically sort and sanity check that we are making forward progress + newEvents := gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents) + shouldHaveSomeEventIDs := e.PrevEventIDs() + hasPrevEvent := false +Event: + for _, pe := range shouldHaveSomeEventIDs { + for _, ev := range newEvents { + if ev.EventID() == pe { + hasPrevEvent = true + break Event + } + } + } + if !hasPrevEvent { + err = fmt.Errorf("called /get_missing_events but server %s didn't return any prev_events with IDs %v", t.Origin, shouldHaveSomeEventIDs) + logger.WithError(err).Errorf( + "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", + t.Origin, + ) + return false, missingPrevEventsError{ + eventID: e.EventID(), + err: err, + } + } + // process the missing events then the event which started this whole thing + for _, ev := range append(newEvents, e) { + err := t.processEvent(ev, false) + if err != nil { + return false, err + } + } + + return true, nil +} + func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) ( respState *gomatrixserverlib.RespState, err error) { state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion) diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 89d28aa1c..d0da1ac1f 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -79,9 +79,10 @@ func (p *testEDUProducer) InputTypingEvent( } type testRoomserverAPI struct { - inputRoomEvents []api.InputRoomEvent - queryStateAfterEvents func(*api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse - queryEventsByID func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse + inputRoomEvents []api.InputRoomEvent + queryStateAfterEvents func(*api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse + queryEventsByID func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse + queryLatestEventsAndState func(*api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse } func (t *testRoomserverAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) {} @@ -117,6 +118,13 @@ func (t *testRoomserverAPI) QueryLatestEventsAndState( request *api.QueryLatestEventsAndStateRequest, response *api.QueryLatestEventsAndStateResponse, ) error { + r := t.queryLatestEventsAndState(request) + response.QueryLatestEventsAndStateRequest = *request + response.RoomExists = r.RoomExists + response.RoomVersion = testRoomVersion + response.LatestEvents = r.LatestEvents + response.StateEvents = r.StateEvents + response.Depth = r.Depth return nil } @@ -276,9 +284,10 @@ func (t *testRoomserverAPI) RemoveRoomAlias( } type txnFedClient struct { - state map[string]gomatrixserverlib.RespState // event_id to response - stateIDs map[string]gomatrixserverlib.RespStateIDs // event_id to response - getEvent map[string]gomatrixserverlib.Transaction // event_id to response + state map[string]gomatrixserverlib.RespState // event_id to response + stateIDs map[string]gomatrixserverlib.RespStateIDs // event_id to response + getEvent map[string]gomatrixserverlib.Transaction // event_id to response + getMissingEvents func(gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error) } func (c *txnFedClient) LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( @@ -310,6 +319,10 @@ func (c *txnFedClient) GetEvent(ctx context.Context, s gomatrixserverlib.ServerN res = r return } +func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents, + roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) { + return c.getMissingEvents(missing) +} func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq { t := &txnReq{ @@ -424,6 +437,77 @@ func TestTransactionFailAuthChecks(t *testing.T) { assertInputRoomEvents(t, rsAPI.inputRoomEvents, nil) // expect no messages to be sent to the roomserver } +// The purpose of this test is to make sure that when an event is received for which we do not know the prev_events, +// we request them from /get_missing_events. It works by setting PrevEventsExist=false in the roomserver query response, +// resulting in a call to /get_missing_events which returns the missing prev event. Both events should be processed in +// topological order and sent to the roomserver. +func TestTransactionFetchMissingPrevEvents(t *testing.T) { + haveEvent := testEvents[len(testEvents)-3] + prevEvent := testEvents[len(testEvents)-2] + inputEvent := testEvents[len(testEvents)-1] + + var rsAPI *testRoomserverAPI // ref here so we can refer to inputRoomEvents inside these functions + rsAPI = &testRoomserverAPI{ + queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse { + // we expect this to be called three times: + // - first with input event to realise there's a gap + // - second with the prevEvent to realise there is no gap + // - third with the input event to realise there is no longer a gap + prevEventsExist := false + if len(req.PrevEventIDs) == 1 { + switch req.PrevEventIDs[0] { + case haveEvent.EventID(): + prevEventsExist = true + case prevEvent.EventID(): + // we only have this event if we've been send prevEvent + if len(rsAPI.inputRoomEvents) == 1 && rsAPI.inputRoomEvents[0].Event.EventID() == prevEvent.EventID() { + prevEventsExist = true + } + } + } + + return api.QueryStateAfterEventsResponse{ + PrevEventsExist: prevEventsExist, + RoomExists: true, + StateEvents: fromStateTuples(req.StateToFetch, nil), + } + }, + queryLatestEventsAndState: func(req *api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse { + return api.QueryLatestEventsAndStateResponse{ + RoomExists: true, + Depth: haveEvent.Depth(), + LatestEvents: []gomatrixserverlib.EventReference{ + haveEvent.EventReference(), + }, + StateEvents: fromStateTuples(req.StateToFetch, nil), + } + }, + } + + cli := &txnFedClient{ + getMissingEvents: func(missing gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error) { + if !reflect.DeepEqual(missing.EarliestEvents, []string{haveEvent.EventID()}) { + t.Errorf("call to /get_missing_events wrong earliest events: got %v want %v", missing.EarliestEvents, haveEvent.EventID()) + } + if !reflect.DeepEqual(missing.LatestEvents, []string{inputEvent.EventID()}) { + t.Errorf("call to /get_missing_events wrong latest events: got %v want %v", missing.LatestEvents, inputEvent.EventID()) + } + return gomatrixserverlib.RespMissingEvents{ + Events: []gomatrixserverlib.Event{ + prevEvent.Unwrap(), + }, + }, nil + }, + } + + pdus := []json.RawMessage{ + inputEvent.JSON(), + } + txn := mustCreateTransaction(rsAPI, cli, pdus) + mustProcessTransaction(t, txn, nil) + assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{prevEvent, inputEvent}) +} + // The purpose of this test is to check that when there are missing prev_events that state is fetched via /state_ids // and /event and not /state. It works by setting PrevEventsExist=false in the roomserver query response, resulting in // a call to /state_ids which returns the whole room state. It should attempt to fetch as many of these events from the diff --git a/sytest-whitelist b/sytest-whitelist index c957021cb..6e741611e 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -264,3 +264,6 @@ User can invite local user to room with version 5 remote user can join room with version 5 User can invite remote user to room with version 5 Remote user can backfill in a room with version 5 +Federation rejects inbound events where the prev_events cannot be found +Outbound federation requests missing prev_events and then asks for /state_ids and resolves the state +Should not be able to take over the room by pretending there is no PL event