mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-17 03:43:11 -06:00
Review comments: send in auth event order
This commit is contained in:
parent
83a4498fbf
commit
ecb574b490
|
|
@ -51,35 +51,11 @@ func (c *RoomserverProducer) SendEvents(
|
||||||
return c.SendInputRoomEvents(ctx, ires)
|
return c.SendInputRoomEvents(ctx, ires)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendEventWithKnownMissingState sends the missing state events followed by the new event to the roomserver with the given stateEventIDs.
|
|
||||||
func (c *RoomserverProducer) SendEventWithKnownMissingState(
|
|
||||||
ctx context.Context, stateEventIDs []string, missingStateEvents []gomatrixserverlib.HeaderedEvent, event gomatrixserverlib.HeaderedEvent,
|
|
||||||
) error {
|
|
||||||
var ires []api.InputRoomEvent
|
|
||||||
for _, outlier := range missingStateEvents {
|
|
||||||
ires = append(ires, api.InputRoomEvent{
|
|
||||||
Kind: api.KindOutlier,
|
|
||||||
Event: outlier.Headered(event.RoomVersion),
|
|
||||||
AuthEventIDs: outlier.AuthEventIDs(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
ires = append(ires, api.InputRoomEvent{
|
|
||||||
Kind: api.KindNew,
|
|
||||||
Event: event,
|
|
||||||
AuthEventIDs: event.AuthEventIDs(),
|
|
||||||
HasState: true,
|
|
||||||
StateEventIDs: stateEventIDs,
|
|
||||||
})
|
|
||||||
|
|
||||||
_, err := c.SendInputRoomEvents(ctx, ires)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendEventWithState writes an event with KindNew to the roomserver input log
|
// SendEventWithState writes an event with KindNew to the roomserver input log
|
||||||
// with the state at the event as KindOutlier before it.
|
// with the state at the event as KindOutlier before it. Will not send any event that is
|
||||||
|
// marked as `true` in haveEventIDs
|
||||||
func (c *RoomserverProducer) SendEventWithState(
|
func (c *RoomserverProducer) SendEventWithState(
|
||||||
ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent,
|
ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool,
|
||||||
) error {
|
) error {
|
||||||
outliers, err := state.Events()
|
outliers, err := state.Events()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -88,6 +64,9 @@ func (c *RoomserverProducer) SendEventWithState(
|
||||||
|
|
||||||
var ires []api.InputRoomEvent
|
var ires []api.InputRoomEvent
|
||||||
for _, outlier := range outliers {
|
for _, outlier := range outliers {
|
||||||
|
if haveEventIDs[outlier.EventID()] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
ires = append(ires, api.InputRoomEvent{
|
ires = append(ires, api.InputRoomEvent{
|
||||||
Kind: api.KindOutlier,
|
Kind: api.KindOutlier,
|
||||||
Event: outlier.Headered(event.RoomVersion),
|
Event: outlier.Headered(event.RoomVersion),
|
||||||
|
|
|
||||||
|
|
@ -309,7 +309,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
|
||||||
// TODO: Attempt to fill in the gap using /get_missing_events
|
// TODO: Attempt to fill in the gap using /get_missing_events
|
||||||
|
|
||||||
// Attempt to fetch the missing state using /state_ids and /events
|
// Attempt to fetch the missing state using /state_ids and /events
|
||||||
respState, newEvents, err := t.lookupMissingStateViaStateIDs(e, roomVersion)
|
respState, haveEventIDs, err := t.lookupMissingStateViaStateIDs(e, roomVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Fallback to /state
|
// Fallback to /state
|
||||||
util.GetLogger(t.context).WithError(err).Warn("processEventWithMissingState failed to /state_ids, falling back to /state")
|
util.GetLogger(t.context).WithError(err).Warn("processEventWithMissingState failed to /state_ids, falling back to /state")
|
||||||
|
|
@ -341,16 +341,9 @@ retryAllowedState:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(newEvents) > 0 {
|
|
||||||
stateEventIDs := make([]string, len(respState.StateEvents))
|
|
||||||
for i := range respState.StateEvents {
|
|
||||||
stateEventIDs[i] = respState.StateEvents[i].EventID()
|
|
||||||
}
|
|
||||||
return t.producer.SendEventWithKnownMissingState(context.Background(), stateEventIDs, newEvents, e.Headered(roomVersion))
|
|
||||||
}
|
|
||||||
// pass the event along with the state to the roomserver using a background context so we don't
|
// pass the event along with the state to the roomserver using a background context so we don't
|
||||||
// needlessly expire
|
// needlessly expire
|
||||||
return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion))
|
return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion), haveEventIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
|
func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
|
||||||
|
|
@ -367,7 +360,7 @@ func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersi
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
|
func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
|
||||||
*gomatrixserverlib.RespState, []gomatrixserverlib.HeaderedEvent, error) {
|
*gomatrixserverlib.RespState, map[string]bool, error) {
|
||||||
|
|
||||||
// fetch the state event IDs at the time of the event
|
// fetch the state event IDs at the time of the event
|
||||||
stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, e.RoomID(), e.EventID())
|
stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, e.RoomID(), e.EventID())
|
||||||
|
|
@ -378,6 +371,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVe
|
||||||
// fetch as many as we can from the roomserver, do them as 2 calls rather than
|
// fetch as many as we can from the roomserver, do them as 2 calls rather than
|
||||||
// 1 to try to reduce the number of parameters in the bulk query this will use
|
// 1 to try to reduce the number of parameters in the bulk query this will use
|
||||||
haveEventMap := make(map[string]*gomatrixserverlib.HeaderedEvent, len(stateIDs.StateEventIDs))
|
haveEventMap := make(map[string]*gomatrixserverlib.HeaderedEvent, len(stateIDs.StateEventIDs))
|
||||||
|
haveEventIDs := make(map[string]bool)
|
||||||
for _, eventList := range [][]string{stateIDs.StateEventIDs, stateIDs.AuthEventIDs} {
|
for _, eventList := range [][]string{stateIDs.StateEventIDs, stateIDs.AuthEventIDs} {
|
||||||
queryReq := api.QueryEventsByIDRequest{
|
queryReq := api.QueryEventsByIDRequest{
|
||||||
EventIDs: eventList,
|
EventIDs: eventList,
|
||||||
|
|
@ -389,6 +383,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVe
|
||||||
// allow indexing of current state by event ID
|
// allow indexing of current state by event ID
|
||||||
for i := range queryRes.Events {
|
for i := range queryRes.Events {
|
||||||
haveEventMap[queryRes.Events[i].EventID()] = &queryRes.Events[i]
|
haveEventMap[queryRes.Events[i].EventID()] = &queryRes.Events[i]
|
||||||
|
haveEventIDs[queryRes.Events[i].EventID()] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -408,7 +403,6 @@ func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVe
|
||||||
"total_state": len(stateIDs.StateEventIDs),
|
"total_state": len(stateIDs.StateEventIDs),
|
||||||
"total_auth_events": len(stateIDs.AuthEventIDs),
|
"total_auth_events": len(stateIDs.AuthEventIDs),
|
||||||
}).Info("Fetching missing state at event")
|
}).Info("Fetching missing state at event")
|
||||||
var newEvents []gomatrixserverlib.HeaderedEvent
|
|
||||||
|
|
||||||
for missingEventID := range missing {
|
for missingEventID := range missing {
|
||||||
var txn gomatrixserverlib.Transaction
|
var txn gomatrixserverlib.Transaction
|
||||||
|
|
@ -430,11 +424,10 @@ func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVe
|
||||||
}
|
}
|
||||||
h := event.Headered(roomVersion)
|
h := event.Headered(roomVersion)
|
||||||
haveEventMap[event.EventID()] = &h
|
haveEventMap[event.EventID()] = &h
|
||||||
newEvents = append(newEvents, h)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
resp, err := t.createRespStateFromStateIDs(stateIDs, haveEventMap)
|
resp, err := t.createRespStateFromStateIDs(stateIDs, haveEventMap)
|
||||||
return resp, newEvents, err
|
return resp, haveEventIDs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) (
|
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) (
|
||||||
|
|
|
||||||
|
|
@ -505,10 +505,9 @@ func TestTransactionFetchMissingStateByFallbackState(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
inputEvent := testEvents[len(testEvents)-1]
|
inputEvent := testEvents[len(testEvents)-1]
|
||||||
var stateEvents []gomatrixserverlib.HeaderedEvent
|
// first 5 events are the state events, in auth event order.
|
||||||
for _, ev := range testStateEvents {
|
stateEvents := testEvents[:5]
|
||||||
stateEvents = append(stateEvents, ev)
|
|
||||||
}
|
|
||||||
cli := &txnFedClient{
|
cli := &txnFedClient{
|
||||||
// /state_ids purposefully unset
|
// /state_ids purposefully unset
|
||||||
stateIDs: nil,
|
stateIDs: nil,
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue