Federation consumer adds_state_event_ids tweak (#2441)

* Don't ask roomserver for events we already have in federation API

* Check number of events returned is as expected

* Preallocate array

* Improve shape a bit
This commit is contained in:
Neil Alexander 2022-05-09 16:19:35 +01:00 committed by GitHub
parent 1a7f4c8aa9
commit 79da75d483
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 33 deletions

View file

@ -146,28 +146,25 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event. // and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
eventsRes := &api.QueryEventsByIDResponse{} addsStateEvents, missingEventIDs := ore.NeededStateEventIDs()
if len(ore.AddsStateEventIDs) > 0 {
// Ask the roomserver and add in the rest of the results into the set.
// Finally, work out if there are any more events missing.
if len(missingEventIDs) > 0 {
eventsReq := &api.QueryEventsByIDRequest{ eventsReq := &api.QueryEventsByIDRequest{
EventIDs: ore.AddsStateEventIDs, EventIDs: missingEventIDs,
} }
eventsRes := &api.QueryEventsByIDResponse{}
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil { if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err) return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
} }
if len(eventsRes.Events) != len(missingEventIDs) {
found := false return fmt.Errorf("missing state events")
for _, event := range eventsRes.Events {
if event.EventID() == ore.Event.EventID() {
found = true
break
}
}
if !found {
eventsRes.Events = append(eventsRes.Events, ore.Event)
} }
addsStateEvents = append(addsStateEvents, eventsRes.Events...)
} }
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(eventsRes.Events)) addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(addsStateEvents))
if err != nil { if err != nil {
return err return err
} }

View file

@ -163,6 +163,19 @@ type OutputNewRoomEvent struct {
TransactionID *TransactionID `json:"transaction_id,omitempty"` TransactionID *TransactionID `json:"transaction_id,omitempty"`
} }
func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.HeaderedEvent, []string) {
addsStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, 1)
missingEventIDs := make([]string, 0, len(o.AddsStateEventIDs))
for _, eventID := range o.AddsStateEventIDs {
if eventID != o.Event.EventID() {
missingEventIDs = append(missingEventIDs, eventID)
} else {
addsStateEvents = append(addsStateEvents, o.Event)
}
}
return addsStateEvents, missingEventIDs
}
// An OutputOldRoomEvent is written when the roomserver receives an old event. // An OutputOldRoomEvent is written when the roomserver receives an old event.
// This will typically happen as a result of getting either missing events // This will typically happen as a result of getting either missing events
// or backfilling. Downstream components may wish to send these events to // or backfilling. Downstream components may wish to send these events to

View file

@ -154,35 +154,20 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent, ctx context.Context, msg api.OutputNewRoomEvent,
) error { ) error {
ev := msg.Event ev := msg.Event
addsStateEvents := []*gomatrixserverlib.HeaderedEvent{} addsStateEvents, missingEventIDs := msg.NeededStateEventIDs()
// Work out the list of events we need to find out about. Either // Work out the list of events we need to find out about. Either
// they will be the event supplied in the request, we will find it // they will be the event supplied in the request, we will find it
// in the sync API database or we'll need to ask the roomserver. // in the sync API database or we'll need to ask the roomserver.
knownEventIDs := make(map[string]bool, len(msg.AddsStateEventIDs)) knownEventIDs := make(map[string]bool, len(msg.AddsStateEventIDs))
for _, eventID := range msg.AddsStateEventIDs { for _, eventID := range missingEventIDs {
if eventID == ev.EventID() {
knownEventIDs[eventID] = true
addsStateEvents = append(addsStateEvents, ev)
} else {
knownEventIDs[eventID] = false knownEventIDs[eventID] = false
} }
}
// Work out which events we want to look up in the sync API database.
// At this stage the only event that should be excluded is the event
// supplied in the request, if it appears in the adds_state_event_ids.
missingEventIDs := make([]string, 0, len(msg.AddsStateEventIDs))
for eventID, known := range knownEventIDs {
if !known {
missingEventIDs = append(missingEventIDs, eventID)
}
}
// Look the events up in the database. If we know them, add them into // Look the events up in the database. If we know them, add them into
// the set of adds state events. // the set of adds state events.
if len(missingEventIDs) > 0 { if len(missingEventIDs) > 0 {
alreadyKnown, err := s.db.Events(ctx, msg.AddsStateEventIDs) alreadyKnown, err := s.db.Events(ctx, missingEventIDs)
if err != nil { if err != nil {
return fmt.Errorf("s.db.Events: %w", err) return fmt.Errorf("s.db.Events: %w", err)
} }