diff --git a/clientapi/producers/roomserver.go b/clientapi/producers/roomserver.go index f0733db9c..e9f2d7d01 100644 --- a/clientapi/producers/roomserver.go +++ b/clientapi/producers/roomserver.go @@ -16,6 +16,7 @@ package producers import ( "context" + "fmt" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -67,6 +68,7 @@ func (c *RoomserverProducer) SendEventWithState( if haveEventIDs[outlier.EventID()] { continue } + fmt.Println("append outlier ", outlier.EventID()) ires = append(ires, api.InputRoomEvent{ Kind: api.KindOutlier, Event: outlier.Headered(event.RoomVersion), diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index b16dbd0fa..2ee303d68 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -99,13 +99,12 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - context context.Context - rsAPI api.RoomserverInternalAPI - producer *producers.RoomserverProducer - eduProducer *producers.EDUServerProducer - keys gomatrixserverlib.JSONVerifier - federation txnFederationClient - getMissingEventRecursionCount int + context context.Context + rsAPI api.RoomserverInternalAPI + producer *producers.RoomserverProducer + eduProducer *producers.EDUServerProducer + keys gomatrixserverlib.JSONVerifier + federation txnFederationClient } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -240,7 +239,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) error { prevEventIDs := e.PrevEventIDs() - util.GetLogger(t.context).Infof("%s processEvent %s with prev_events %v", e.RoomID(), e.EventID(), prevEventIDs) + util.GetLogger(t.context).Infof("processEvent %s with prev_events %v", e.EventID(), prevEventIDs) // Fetch the state needed to authenticate the event. needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e}) @@ -254,7 +253,6 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro return err } util.GetLogger(t.context).Infof("processEvent %s stateResp.PrevEventsExist: %v", e.EventID(), stateResp.PrevEventsExist) - util.GetLogger(t.context).Infof("NEEDED TUPLES: %+v", needed.Tuples()) if !stateResp.RoomExists { // TODO: When synapse receives a message for a room it is not in it @@ -324,68 +322,171 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer // Attempt to fill in the gap using /get_missing_events // This will either: - // - fill in the gap completely then process event `e` returning ok=true err=nil - // - fail to fill in the gap and tell us to terminate the transaction ok=false, err=not nil - // - fail to fill in the gap and tell us to fetch state, and to not terminate the transaction, ok=false, err=nil - ok, err := t.getMissingEvents(e, roomVersion, isInboundTxn) + // - fill in the gap completely then process event `e` returning no backwards extremity + // - fail to fill in the gap and tell us to terminate the transaction err=not nil + // - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction + backwardsExtremity, err := t.getMissingEvents(e, roomVersion, isInboundTxn) if err != nil { return err } - if ok { + if backwardsExtremity == nil { + // we filled in the gap! return nil } - // Attempt to fetch the missing state using /state_ids and /events - respState, haveEventIDs, err := t.lookupMissingStateViaStateIDs(e, roomVersion) + // fetch the state BEFORE the event then check that the event is allowed + respState, haveEventIDs, err := t.lookupStateAfterEvent(roomVersion, *backwardsExtremity) if err != nil { - // Fallback to /state - util.GetLogger(t.context).WithError(err).Warn("processEventWithMissingState failed to /state_ids, falling back to /state") - respState, err = t.lookupMissingStateViaState(e, roomVersion) - if err != nil { - return err - } - } - - // security: every time we fetch state from a remote server we have to do state resolution. - // Say you have a forward extremity F with state you trust, and a malicious event M comes in which points to a prev event P, - // then if you take the state at M you don't do any state res. If you fetch state for P then to get the new current state of - // the room you have to do state res across F and P - - // Check that the event is allowed by the state. -retryAllowedState: - if err := checkAllowedByState(e, respState.StateEvents); err != nil { - switch missing := err.(type) { - case gomatrixserverlib.MissingAuthEventError: - // An auth event was missing so let's look up that event over federation - for _, s := range respState.StateEvents { - if s.EventID() != missing.AuthEventID { - continue - } - err = t.processEventWithMissingState(s, roomVersion, isInboundTxn) - // If there was no error retrieving the event from federation then - // we assume that it succeeded, so retry the original state check - if err == nil { - goto retryAllowedState - } - } - default: - } return err } + fmt.Println("Calcuated lookupStateAfterEvent") // pass the event along with the state to the roomserver using a background context so we don't // needlessly expire return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion), haveEventIDs) } -// getMissingEvents returns ok=true if missing events were fetched and handled, else false. Returns an error only if we should -// terminate the transaction which initiated /get_missing_events +// lookupStateAfterEvent returns the room state after the event e, which is all the states before e resolved via state resolution +// then having e applied to the resulting state. +func (t *txnReq) lookupStateAfterEvent(roomVersion gomatrixserverlib.RoomVersion, e gomatrixserverlib.Event) (*gomatrixserverlib.RespState, map[string]bool, error) { + // de-dupe all the events + authEvents := make(map[string]*gomatrixserverlib.Event) + stateEvents := make(map[string]*gomatrixserverlib.Event) + haveEventIDs := make(map[string]bool) + for _, prevEventID := range e.PrevEventIDs() { + // don't do auth checks on this RespState as we're just interested in grabbing state/auth events and putting it into the pot + respState, haveIDs, err := t.lookupStateBeforeEvent(roomVersion, false, e.RoomID(), prevEventID) + if err != nil { + return nil, nil, err + } + for i := range respState.StateEvents { + stateEvents[respState.StateEvents[i].EventID()] = &respState.StateEvents[i] + } + for i := range respState.AuthEvents { + authEvents[respState.AuthEvents[i].EventID()] = &respState.AuthEvents[i] + } + for id := range haveIDs { + haveEventIDs[id] = true + } + // fetch the event we're missing and add it to the pile + h, err := t.lookupEvent(roomVersion, prevEventID) + if err != nil { + return nil, nil, err + } + if h.StateKey() != nil { + he := h.Unwrap() + stateEvents[h.EventID()] = &he + } + } + authEventList := make([]gomatrixserverlib.Event, len(authEvents)) + i := 0 + for _, ev := range authEvents { + authEventList[i] = *ev + i++ + } + stateEventList := make([]gomatrixserverlib.Event, len(stateEvents)) + i = 0 + for _, ev := range stateEvents { + stateEventList[i] = *ev + i++ + } + resolvedStateEvents, err := gomatrixserverlib.ResolveConflicts(roomVersion, stateEventList, authEventList) + if err != nil { + return nil, nil, err + } + // apply the current event + if err = checkAllowedByState(e, resolvedStateEvents); err != nil { + return nil, nil, err + } + // roll forward state if this event is a state event + if e.StateKey() != nil { + for i := range resolvedStateEvents { + if resolvedStateEvents[i].Type() == e.Type() && resolvedStateEvents[i].StateKeyEquals(*e.StateKey()) { + resolvedStateEvents[i] = e + break + } + } + } + for _, s := range resolvedStateEvents { + util.GetLogger(t.context).Infof("resolved: %s -> %s", s.Type(), string(s.Content())) + } + for _, s := range authEventList { + util.GetLogger(t.context).Infof("authEventList: %s -> %s", s.Type(), string(s.Content())) + } + + resp := &gomatrixserverlib.RespState{ + AuthEvents: authEventList, + StateEvents: resolvedStateEvents, + } + if err = resp.Check(t.context, t.keys); err != nil { + return nil, nil, fmt.Errorf("lookupStateAfterEvent: resolved state is not valid: %w", err) + } + + return resp, haveEventIDs, nil +} + +// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what +// the server supports. +func (t *txnReq) lookupStateBeforeEvent(roomVersion gomatrixserverlib.RoomVersion, doAuthCheck bool, roomID, eventID string) ( + respState *gomatrixserverlib.RespState, haveEventIDs map[string]bool, err error) { + + util.GetLogger(t.context).Infof("lookupStateBeforeEvent %s", eventID) + // It's entirely possible that we know this state, as QueryStateAfterEventsRequest only returns success if ALL prev_events + // exist, so query the roomserver for the state with just this prev event + stateReq := api.QueryStateAfterEventsRequest{ + RoomID: roomID, + StateToFetch: nil, // TODO: do we need everything? + PrevEventIDs: []string{eventID}, + } + var stateResp api.QueryStateAfterEventsResponse + if err = t.rsAPI.QueryStateAfterEvents(t.context, &stateReq, &stateResp); err != nil || stateResp.StateEvents == nil { + util.GetLogger(t.context).WithError(err).Warnf("Failed to lookup state before event %s via roomserver - asking remote", eventID) + // fallthrough to remote lookup + } else { + util.GetLogger(t.context).Infof("lookupStateBeforeEvent %s returned locally", eventID) + // we have all the events + haveEvents := make(map[string]*gomatrixserverlib.HeaderedEvent) + haveEventIDs = make(map[string]bool) + for i, ev := range stateResp.StateEvents { + haveEventIDs[ev.EventID()] = true + haveEvents[ev.EventID()] = &stateResp.StateEvents[i] + } + var authEvents []gomatrixserverlib.Event + for _, ev := range stateResp.StateEvents { + for _, ae := range ev.AuthEventIDs() { + aev, ok := haveEvents[ae] + if ok { + authEvents = append(authEvents, aev.Unwrap()) + } + } + } + + respState = &gomatrixserverlib.RespState{ + AuthEvents: authEvents, + StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateResp.StateEvents), + } + return + } + + // Attempt to fetch the missing state using /state_ids and /events + respState, haveEventIDs, err = t.lookupMissingStateViaStateIDs(roomID, eventID, doAuthCheck, roomVersion) + if err != nil { + // Fallback to /state + util.GetLogger(t.context).WithError(err).Warn("lookupStateBeforeEvent failed to /state_ids, falling back to /state") + respState, err = t.lookupMissingStateViaState(roomID, eventID, roomVersion) + } + return +} + +// getMissingEvents returns a nil backwardsExtremity if missing events were fetched and handled, else returns the new backwards extremity which we should +// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events // This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns. // This means that we may recursively call this function, as we spider back up prev_events to the min depth. -func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (ok bool, err error) { +func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (backwardsExtremity *gomatrixserverlib.Event, err error) { if !isInboundTxn { // we've recursed here, so just take a state snapshot please! - return false, nil + fmt.Println("backwards extremity is now ", e.EventID()) + return &e, nil } logger := util.GetLogger(t.context).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e}) @@ -397,7 +498,7 @@ func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatri var res api.QueryLatestEventsAndStateResponse if err = t.rsAPI.QueryLatestEventsAndState(t.context, &req, &res); err != nil { logger.WithError(err).Warn("Failed to query latest events") - return false, nil + return &e, nil } latestEvents := make([]string, len(res.LatestEvents)) for i := range res.LatestEvents { @@ -430,11 +531,12 @@ func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatri "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", t.Origin, ) - return false, missingPrevEventsError{ + return nil, missingPrevEventsError{ eventID: e.EventID(), err: err, } } + logger.Infof("get_missing_events returned %d events", len(missingResp.Events)) // topologically sort and sanity check that we are making forward progress newEvents := gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents) @@ -455,7 +557,7 @@ Event: "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", t.Origin, ) - return false, missingPrevEventsError{ + return nil, missingPrevEventsError{ eventID: e.EventID(), err: err, } @@ -464,16 +566,17 @@ Event: for _, ev := range append(newEvents, e) { err := t.processEvent(ev, false) if err != nil { - return false, err + return nil, err } } - return true, nil + // we processed everything! + return nil, nil } -func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) ( +func (t *txnReq) lookupMissingStateViaState(roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( respState *gomatrixserverlib.RespState, err error) { - state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion) + state, err := t.federation.LookupState(t.context, t.Origin, roomID, eventID, roomVersion) if err != nil { return nil, err } @@ -484,11 +587,11 @@ func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersi return &state, nil } -func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) ( +func (t *txnReq) lookupMissingStateViaStateIDs(roomID, eventID string, doAuthCheck bool, roomVersion gomatrixserverlib.RoomVersion) ( *gomatrixserverlib.RespState, map[string]bool, error) { - + util.GetLogger(t.context).Infof("lookupMissingStateViaStateIDs %s", eventID) // fetch the state event IDs at the time of the event - stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, e.RoomID(), e.EventID()) + stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, roomID, eventID) if err != nil { return nil, nil, err } @@ -522,46 +625,33 @@ func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVe } util.GetLogger(t.context).WithFields(logrus.Fields{ "missing": len(missing), - "event_id": e.EventID(), - "room_id": e.RoomID(), + "event_id": eventID, + "room_id": roomID, "already_have": len(haveEventMap), "total_state": len(stateIDs.StateEventIDs), "total_auth_events": len(stateIDs.AuthEventIDs), }).Info("Fetching missing state at event") for missingEventID := range missing { - var txn gomatrixserverlib.Transaction - txn, err = t.federation.GetEvent(t.context, t.Origin, missingEventID) + var h *gomatrixserverlib.HeaderedEvent + h, err = t.lookupEvent(roomVersion, missingEventID) if err != nil { - util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID") return nil, nil, err } - for _, pdu := range txn.PDUs { - var event gomatrixserverlib.Event - event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion) - if err != nil { - util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID()) - return nil, nil, unmarshalError{err} - } - if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { - util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID()) - return nil, nil, verifySigError{event.EventID(), err} - } - h := event.Headered(roomVersion) - haveEventMap[event.EventID()] = &h - } + haveEventMap[h.EventID()] = h } - resp, err := t.createRespStateFromStateIDs(stateIDs, haveEventMap) + resp, err := t.createRespStateFromStateIDs(stateIDs, doAuthCheck, haveEventMap) return resp, haveEventIDs, err } -func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) ( +func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, doAuthCheck bool, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) ( *gomatrixserverlib.RespState, error) { // create a RespState response using the response to /state_ids as a guide respState := gomatrixserverlib.RespState{ AuthEvents: make([]gomatrixserverlib.Event, len(stateIDs.AuthEventIDs)), StateEvents: make([]gomatrixserverlib.Event, len(stateIDs.StateEventIDs)), } + var roomVer gomatrixserverlib.RoomVersion for i := range stateIDs.StateEventIDs { ev, ok := haveEventMap[stateIDs.StateEventIDs[i]] @@ -569,6 +659,7 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat return nil, fmt.Errorf("missing state event %s", stateIDs.StateEventIDs[i]) } respState.StateEvents[i] = ev.Unwrap() + roomVer = ev.RoomVersion } for i := range stateIDs.AuthEventIDs { ev, ok := haveEventMap[stateIDs.AuthEventIDs[i]] @@ -578,8 +669,46 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat respState.AuthEvents[i] = ev.Unwrap() } // Check that the returned state is valid. +retryCheck: if err := respState.Check(t.context, t.keys); err != nil { - return nil, err + switch missing := err.(type) { + case gomatrixserverlib.MissingAuthEventError: + // An auth event was missing so let's look up that event over federation + var newEv *gomatrixserverlib.HeaderedEvent + newEv, err = t.lookupEvent(roomVer, missing.AuthEventID) + if err != nil { + // we can't find this event, fail + return nil, fmt.Errorf("missing auth event %s and cannot find it: %w", missing.AuthEventID, err) + } + respState.AuthEvents = append(respState.AuthEvents, newEv.Unwrap()) + goto retryCheck + } + if doAuthCheck { + return nil, err + } else { + return &respState, nil + } } return &respState, nil } + +func (t *txnReq) lookupEvent(roomVersion gomatrixserverlib.RoomVersion, missingEventID string) (*gomatrixserverlib.HeaderedEvent, error) { + txn, err := t.federation.GetEvent(t.context, t.Origin, missingEventID) + if err != nil || len(txn.PDUs) == 0 { + util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID") + return nil, err + } + pdu := txn.PDUs[0] + var event gomatrixserverlib.Event + event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion) + if err != nil { + util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID()) + return nil, unmarshalError{err} + } + if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { + util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID()) + return nil, verifySigError{event.EventID(), err} + } + h := event.Headered(roomVersion) + return &h, nil +} diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index d0da1ac1f..14fe0a820 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "reflect" - "sort" "testing" "time" @@ -160,7 +159,7 @@ func (t *testRoomserverAPI) QueryMembershipForUser( request *api.QueryMembershipForUserRequest, response *api.QueryMembershipForUserResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query a list of membership events for a room @@ -169,7 +168,7 @@ func (t *testRoomserverAPI) QueryMembershipsForRoom( request *api.QueryMembershipsForRoomRequest, response *api.QueryMembershipsForRoomResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query a list of invite event senders for a user in a room. @@ -178,7 +177,7 @@ func (t *testRoomserverAPI) QueryInvitesForUser( request *api.QueryInvitesForUserRequest, response *api.QueryInvitesForUserResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query whether a server is allowed to see an event @@ -187,7 +186,7 @@ func (t *testRoomserverAPI) QueryServerAllowedToSeeEvent( request *api.QueryServerAllowedToSeeEventRequest, response *api.QueryServerAllowedToSeeEventResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query missing events for a room from roomserver @@ -196,7 +195,7 @@ func (t *testRoomserverAPI) QueryMissingEvents( request *api.QueryMissingEventsRequest, response *api.QueryMissingEventsResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query to get state and auth chain for a (potentially hypothetical) event. @@ -207,7 +206,7 @@ func (t *testRoomserverAPI) QueryStateAndAuthChain( request *api.QueryStateAndAuthChainRequest, response *api.QueryStateAndAuthChainResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query a given amount (or less) of events prior to a given set of events. @@ -216,7 +215,7 @@ func (t *testRoomserverAPI) QueryBackfill( request *api.QueryBackfillRequest, response *api.QueryBackfillResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Asks for the default room version as preferred by the server. @@ -225,7 +224,7 @@ func (t *testRoomserverAPI) QueryRoomVersionCapabilities( request *api.QueryRoomVersionCapabilitiesRequest, response *api.QueryRoomVersionCapabilitiesResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Asks for the room version for a given room. @@ -244,7 +243,7 @@ func (t *testRoomserverAPI) SetRoomAlias( req *api.SetRoomAliasRequest, response *api.SetRoomAliasResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Get the room ID for an alias @@ -253,7 +252,7 @@ func (t *testRoomserverAPI) GetRoomIDForAlias( req *api.GetRoomIDForAliasRequest, response *api.GetRoomIDForAliasResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Get all known aliases for a room ID @@ -262,7 +261,7 @@ func (t *testRoomserverAPI) GetAliasesForRoomID( req *api.GetAliasesForRoomIDRequest, response *api.GetAliasesForRoomIDResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Get the user ID of the creator of an alias @@ -271,7 +270,7 @@ func (t *testRoomserverAPI) GetCreatorIDForAlias( req *api.GetCreatorIDForAliasRequest, response *api.GetCreatorIDForAliasResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Remove a room alias @@ -280,7 +279,7 @@ func (t *testRoomserverAPI) RemoveRoomAlias( req *api.RemoveRoomAliasRequest, response *api.RemoveRoomAliasResponse, ) error { - return nil + return fmt.Errorf("not implemented") } type txnFedClient struct { @@ -381,6 +380,9 @@ NextTuple: } func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []gomatrixserverlib.HeaderedEvent) { + for _, g := range got { + fmt.Println("GOT ", g.Event.EventID()) + } if len(got) != len(want) { t.Errorf("wrong number of InputRoomEvents: got %d want %d", len(got), len(want)) return @@ -508,32 +510,91 @@ func TestTransactionFetchMissingPrevEvents(t *testing.T) { assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{prevEvent, inputEvent}) } -// The purpose of this test is to check that when there are missing prev_events that state is fetched via /state_ids -// and /event and not /state. It works by setting PrevEventsExist=false in the roomserver query response, resulting in -// a call to /state_ids which returns the whole room state. It should attempt to fetch as many of these events from the -// roomserver FIRST, resulting in a call to QueryEventsByID. However, this will be missing the m.room.power_levels event which -// should then be requested via /event. The net result is that the transaction should succeed and there should be 2 -// new events, first the m.room.power_levels event we were missing, then the transaction PDU. +// The purpose of this test is to check that when there are missing prev_events and we still haven't been able to fill +// in the hole with /get_missing_events that the state BEFORE the events we want to persist is fetched via /state_ids +// and /event. It works by setting PrevEventsExist=false in the roomserver query response, resulting in +// a call to /get_missing_events which returns 1 out of the 2 events it needs to fill in the gap. Synapse and Dendrite +// both give up after 1x /get_missing_events call, relying on requesting the state AFTER the missing event in order to +// continue. The DAG looks something like: +// FE GME TXN +// A ---> B ---> C ---> D +// TXN=event in the txn, GME=response to /get_missing_events, FE=roomserver's forward extremity. Should result in: +// - /state_ids?event=B is requested, then /event/B to get the state AFTER B. B is a state event. +// - state resolution is done to check C is allowed. +// This results in B being sent as an outlier FIRST, then C,D. func TestTransactionFetchMissingStateByStateIDs(t *testing.T) { - missingStateEvent := testStateEvents[gomatrixserverlib.StateKeyTuple{ + eventA := testEvents[len(testEvents)-5] + // this is also len(testEvents)-4 + eventB := testStateEvents[gomatrixserverlib.StateKeyTuple{ EventType: gomatrixserverlib.MRoomPowerLevels, StateKey: "", }] - rsAPI := &testRoomserverAPI{ + eventC := testEvents[len(testEvents)-3] + eventD := testEvents[len(testEvents)-2] + fmt.Println("a:", eventA.EventID()) + fmt.Println("b:", eventB.EventID()) + fmt.Println("c:", eventC.EventID()) + fmt.Println("d:", eventD.EventID()) + var rsAPI *testRoomserverAPI + rsAPI = &testRoomserverAPI{ queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse { + // if we have event C from GME, then PrevEventsExist: True, else it is false + prevEventExists := false + omitTuples := []gomatrixserverlib.StateKeyTuple{ + gomatrixserverlib.StateKeyTuple{ + EventType: gomatrixserverlib.MRoomPowerLevels, + StateKey: "", + }, + } + for _, ev := range rsAPI.inputRoomEvents { + if ev.Event.EventID() == eventC.EventID() && len(req.PrevEventIDs) == 1 && req.PrevEventIDs[0] == eventC.EventID() { + prevEventExists = true + } + if ev.Event.EventID() == eventB.EventID() { + omitTuples = nil + } + } + var stateEvents []gomatrixserverlib.HeaderedEvent + if prevEventExists { + stateEvents = fromStateTuples(req.StateToFetch, omitTuples) + } return api.QueryStateAfterEventsResponse{ - // setting this to false should trigger a call to /state_ids - PrevEventsExist: false, + // setting this to false should trigger a call to /get_missing_events or /state_ids depending + // on far back we've gone. The first time should trigger /get_missing_events but we should + // give up on subsequent calls and just use the /state_ids + PrevEventsExist: prevEventExists, RoomExists: true, - StateEvents: nil, + StateEvents: stateEvents, + } + }, + queryLatestEventsAndState: func(req *api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse { + omitTuples := []gomatrixserverlib.StateKeyTuple{ + {EventType: gomatrixserverlib.MRoomPowerLevels, StateKey: ""}, + } + return api.QueryLatestEventsAndStateResponse{ + RoomExists: true, + Depth: eventA.Depth(), + LatestEvents: []gomatrixserverlib.EventReference{ + eventA.EventReference(), + }, + StateEvents: fromStateTuples(req.StateToFetch, omitTuples), } }, queryEventsByID: func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse { var res api.QueryEventsByIDResponse + fmt.Println("queryEventsByID ", req.EventIDs) for _, wantEventID := range req.EventIDs { for _, ev := range testStateEvents { - // roomserver is missing the power levels event - if wantEventID == missingStateEvent.EventID() { + // roomserver is missing the power levels event unless it's been sent to us recently as an outlier + if wantEventID == eventB.EventID() { + fmt.Println("Asked for pl event") + for _, inEv := range rsAPI.inputRoomEvents { + fmt.Println("recv ", inEv.Event.EventID()) + if inEv.Event.EventID() == wantEventID { + res.Events = append(res.Events, inEv.Event) + break + } + } continue } if ev.EventID() == wantEventID { @@ -545,91 +606,55 @@ func TestTransactionFetchMissingStateByStateIDs(t *testing.T) { return res }, } - inputEvent := testEvents[len(testEvents)-1] + // /state_ids for event B returns every state event but B (it's the state before) + var authEventIDs []string var stateEventIDs []string for _, ev := range testStateEvents { + if ev.EventID() == eventB.EventID() { + continue + } + // state res checks what auth events you give it, and this isn't a valid auth event + if ev.Type() != gomatrixserverlib.MRoomHistoryVisibility { + authEventIDs = append(authEventIDs, ev.EventID()) + } stateEventIDs = append(stateEventIDs, ev.EventID()) } cli := &txnFedClient{ - // /state_ids returns all the state events stateIDs: map[string]gomatrixserverlib.RespStateIDs{ - inputEvent.EventID(): gomatrixserverlib.RespStateIDs{ + eventB.EventID(): gomatrixserverlib.RespStateIDs{ StateEventIDs: stateEventIDs, - AuthEventIDs: stateEventIDs, + AuthEventIDs: authEventIDs, }, }, - // /event for the missing state event returns it + // /event for event B returns it getEvent: map[string]gomatrixserverlib.Transaction{ - missingStateEvent.EventID(): gomatrixserverlib.Transaction{ + eventB.EventID(): gomatrixserverlib.Transaction{ PDUs: []json.RawMessage{ - missingStateEvent.JSON(), + eventB.JSON(), }, }, }, - } - - pdus := []json.RawMessage{ - testData[len(testData)-1], // a message event - } - txn := mustCreateTransaction(rsAPI, cli, pdus) - mustProcessTransaction(t, txn, nil) - assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{missingStateEvent, inputEvent}) -} - -// The purpose of this test is to check that when there are missing prev_events and /state_ids fails, that we fallback to -// calling /state which returns the entire room state at that event. It works by setting PrevEventsExist=false in the -// roomserver query response, resulting in a call to /state_ids which fails (unset). It should then fetch via /state. -func TestTransactionFetchMissingStateByFallbackState(t *testing.T) { - rsAPI := &testRoomserverAPI{ - queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse { - return api.QueryStateAfterEventsResponse{ - // setting this to false should trigger a call to /state_ids - PrevEventsExist: false, - RoomExists: true, - StateEvents: nil, + // /get_missing_events should be done exactly once + getMissingEvents: func(missing gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error) { + if !reflect.DeepEqual(missing.EarliestEvents, []string{eventA.EventID()}) { + t.Errorf("call to /get_missing_events wrong earliest events: got %v want %v", missing.EarliestEvents, eventA.EventID()) } - }, - } - inputEvent := testEvents[len(testEvents)-1] - // first 5 events are the state events, in auth event order. - stateEvents := testEvents[:5] - - cli := &txnFedClient{ - // /state_ids purposefully unset - stateIDs: nil, - // /state returns the state at that event (which is the current state) - state: map[string]gomatrixserverlib.RespState{ - inputEvent.EventID(): gomatrixserverlib.RespState{ - AuthEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents), - StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents), - }, + if !reflect.DeepEqual(missing.LatestEvents, []string{eventD.EventID()}) { + t.Errorf("call to /get_missing_events wrong latest events: got %v want %v", missing.LatestEvents, eventD.EventID()) + } + // just return event C, not event B so /state_ids logic kicks in as there will STILL be missing prev_events + return gomatrixserverlib.RespMissingEvents{ + Events: []gomatrixserverlib.Event{ + eventC.Unwrap(), + }, + }, nil }, } pdus := []json.RawMessage{ - testData[len(testData)-1], // a message event + eventD.JSON(), } txn := mustCreateTransaction(rsAPI, cli, pdus) mustProcessTransaction(t, txn, nil) - // the roomserver should get all state events and the new input event - // TODO: it should really be only giving the missing ones - got := rsAPI.inputRoomEvents - if len(got) != len(stateEvents)+1 { - t.Fatalf("wrong number of InputRoomEvents: got %d want %d", len(got), len(stateEvents)+1) - } - last := got[len(got)-1] - if last.Event.EventID() != inputEvent.EventID() { - t.Errorf("last event should be the input event but it wasn't. got %s want %s", last.Event.EventID(), inputEvent.EventID()) - } - gots := make([]string, len(stateEvents)) - wants := make([]string, len(stateEvents)) - for i := range stateEvents { - gots[i] = got[i].Event.EventID() - wants[i] = stateEvents[i].EventID() - } - sort.Strings(gots) - sort.Strings(wants) - if !reflect.DeepEqual(gots, wants) { - t.Errorf("state events returned mismatch, got (sorted): %+v want %+v", gots, wants) - } + assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{eventB, eventC, eventD}) } diff --git a/go.mod b/go.mod index fd1bb3d70..88710cc8b 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200505092542-ef8abbde3f6b + github.com/matrix-org/gomatrixserverlib v0.0.0-20200507185533-bc21abd9ca04 github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index 7ab035364..ff89049c7 100644 --- a/go.sum +++ b/go.sum @@ -367,8 +367,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200505092542-ef8abbde3f6b h1:gxLun/noFJ7DplX7rqT8E4v4NkeDJ45tqW7LXC6k4C4= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200505092542-ef8abbde3f6b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200507185533-bc21abd9ca04 h1:8+6bOm9r2TCD6cudtt0zpAY2St8sko2+Xe7fqHYAH0Y= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200507185533-bc21abd9ca04/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y=