diff --git a/clientapi/producers/roomserver.go b/clientapi/producers/roomserver.go index d5add903a..f0733db9c 100644 --- a/clientapi/producers/roomserver.go +++ b/clientapi/producers/roomserver.go @@ -51,35 +51,11 @@ func (c *RoomserverProducer) SendEvents( return c.SendInputRoomEvents(ctx, ires) } -// SendEventWithKnownMissingState sends the missing state events followed by the new event to the roomserver with the given stateEventIDs. -func (c *RoomserverProducer) SendEventWithKnownMissingState( - ctx context.Context, stateEventIDs []string, missingStateEvents []gomatrixserverlib.HeaderedEvent, event gomatrixserverlib.HeaderedEvent, -) error { - var ires []api.InputRoomEvent - for _, outlier := range missingStateEvents { - ires = append(ires, api.InputRoomEvent{ - Kind: api.KindOutlier, - Event: outlier.Headered(event.RoomVersion), - AuthEventIDs: outlier.AuthEventIDs(), - }) - } - - ires = append(ires, api.InputRoomEvent{ - Kind: api.KindNew, - Event: event, - AuthEventIDs: event.AuthEventIDs(), - HasState: true, - StateEventIDs: stateEventIDs, - }) - - _, err := c.SendInputRoomEvents(ctx, ires) - return err -} - // SendEventWithState writes an event with KindNew to the roomserver input log -// with the state at the event as KindOutlier before it. +// with the state at the event as KindOutlier before it. Will not send any event that is +// marked as `true` in haveEventIDs func (c *RoomserverProducer) SendEventWithState( - ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, + ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, ) error { outliers, err := state.Events() if err != nil { @@ -88,6 +64,9 @@ func (c *RoomserverProducer) SendEventWithState( var ires []api.InputRoomEvent for _, outlier := range outliers { + if haveEventIDs[outlier.EventID()] { + continue + } ires = append(ires, api.InputRoomEvent{ Kind: api.KindOutlier, Event: outlier.Headered(event.RoomVersion), diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 8251e9ff4..e6f91d94a 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -309,7 +309,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer // TODO: Attempt to fill in the gap using /get_missing_events // Attempt to fetch the missing state using /state_ids and /events - respState, newEvents, err := t.lookupMissingStateViaStateIDs(e, roomVersion) + respState, haveEventIDs, err := t.lookupMissingStateViaStateIDs(e, roomVersion) if err != nil { // Fallback to /state util.GetLogger(t.context).WithError(err).Warn("processEventWithMissingState failed to /state_ids, falling back to /state") @@ -341,16 +341,9 @@ retryAllowedState: return err } - if len(newEvents) > 0 { - stateEventIDs := make([]string, len(respState.StateEvents)) - for i := range respState.StateEvents { - stateEventIDs[i] = respState.StateEvents[i].EventID() - } - return t.producer.SendEventWithKnownMissingState(context.Background(), stateEventIDs, newEvents, e.Headered(roomVersion)) - } // pass the event along with the state to the roomserver using a background context so we don't // needlessly expire - return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion)) + return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion), haveEventIDs) } func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) ( @@ -367,7 +360,7 @@ func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersi } func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) ( - *gomatrixserverlib.RespState, []gomatrixserverlib.HeaderedEvent, error) { + *gomatrixserverlib.RespState, map[string]bool, error) { // fetch the state event IDs at the time of the event stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, e.RoomID(), e.EventID()) @@ -378,6 +371,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVe // fetch as many as we can from the roomserver, do them as 2 calls rather than // 1 to try to reduce the number of parameters in the bulk query this will use haveEventMap := make(map[string]*gomatrixserverlib.HeaderedEvent, len(stateIDs.StateEventIDs)) + haveEventIDs := make(map[string]bool) for _, eventList := range [][]string{stateIDs.StateEventIDs, stateIDs.AuthEventIDs} { queryReq := api.QueryEventsByIDRequest{ EventIDs: eventList, @@ -389,6 +383,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVe // allow indexing of current state by event ID for i := range queryRes.Events { haveEventMap[queryRes.Events[i].EventID()] = &queryRes.Events[i] + haveEventIDs[queryRes.Events[i].EventID()] = true } } @@ -408,7 +403,6 @@ func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVe "total_state": len(stateIDs.StateEventIDs), "total_auth_events": len(stateIDs.AuthEventIDs), }).Info("Fetching missing state at event") - var newEvents []gomatrixserverlib.HeaderedEvent for missingEventID := range missing { var txn gomatrixserverlib.Transaction @@ -430,11 +424,10 @@ func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVe } h := event.Headered(roomVersion) haveEventMap[event.EventID()] = &h - newEvents = append(newEvents, h) } } resp, err := t.createRespStateFromStateIDs(stateIDs, haveEventMap) - return resp, newEvents, err + return resp, haveEventIDs, err } func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) ( diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 3649aee6d..04dd3e390 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -505,10 +505,9 @@ func TestTransactionFetchMissingStateByFallbackState(t *testing.T) { }, } inputEvent := testEvents[len(testEvents)-1] - var stateEvents []gomatrixserverlib.HeaderedEvent - for _, ev := range testStateEvents { - stateEvents = append(stateEvents, ev) - } + // first 5 events are the state events, in auth event order. + stateEvents := testEvents[:5] + cli := &txnFedClient{ // /state_ids purposefully unset stateIDs: nil,