Don't try to re-fetch the event if it is listed in adds_state_event_ids (#2437)

* Don't try to re-fetch the event in the output message

* Try that again

* Add the initial event into the set
This commit is contained in:
Neil Alexander 2022-05-09 15:22:33 +01:00 committed by GitHub
parent f69ebc6af2
commit 1a7f4c8aa9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -154,41 +154,76 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent, ctx context.Context, msg api.OutputNewRoomEvent,
) error { ) error {
ev := msg.Event ev := msg.Event
addsStateEvents := []*gomatrixserverlib.HeaderedEvent{} addsStateEvents := []*gomatrixserverlib.HeaderedEvent{}
foundEventIDs := map[string]bool{}
if len(msg.AddsStateEventIDs) > 0 { // Work out the list of events we need to find out about. Either
for _, eventID := range msg.AddsStateEventIDs { // they will be the event supplied in the request, we will find it
foundEventIDs[eventID] = false // in the sync API database or we'll need to ask the roomserver.
knownEventIDs := make(map[string]bool, len(msg.AddsStateEventIDs))
for _, eventID := range msg.AddsStateEventIDs {
if eventID == ev.EventID() {
knownEventIDs[eventID] = true
addsStateEvents = append(addsStateEvents, ev)
} else {
knownEventIDs[eventID] = false
} }
foundEvents, err := s.db.Events(ctx, msg.AddsStateEventIDs) }
// Work out which events we want to look up in the sync API database.
// At this stage the only event that should be excluded is the event
// supplied in the request, if it appears in the adds_state_event_ids.
missingEventIDs := make([]string, 0, len(msg.AddsStateEventIDs))
for eventID, known := range knownEventIDs {
if !known {
missingEventIDs = append(missingEventIDs, eventID)
}
}
// Look the events up in the database. If we know them, add them into
// the set of adds state events.
if len(missingEventIDs) > 0 {
alreadyKnown, err := s.db.Events(ctx, msg.AddsStateEventIDs)
if err != nil { if err != nil {
return fmt.Errorf("s.db.Events: %w", err) return fmt.Errorf("s.db.Events: %w", err)
} }
for _, event := range foundEvents { for _, knownEvent := range alreadyKnown {
foundEventIDs[event.EventID()] = true knownEventIDs[knownEvent.EventID()] = true
addsStateEvents = append(addsStateEvents, knownEvent)
}
}
// Now work out if there are any remaining events we don't know. For
// these we will need to ask the roomserver for help.
missingEventIDs = missingEventIDs[:0]
for eventID, known := range knownEventIDs {
if !known {
missingEventIDs = append(missingEventIDs, eventID)
}
}
// Ask the roomserver and add in the rest of the results into the set.
// Finally, work out if there are any more events missing.
if len(missingEventIDs) > 0 {
eventsReq := &api.QueryEventsByIDRequest{
EventIDs: missingEventIDs,
} }
eventsReq := &api.QueryEventsByIDRequest{}
eventsRes := &api.QueryEventsByIDResponse{} eventsRes := &api.QueryEventsByIDResponse{}
for eventID, found := range foundEventIDs { if err := s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
if !found {
eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
}
}
if err = s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err) return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
} }
for _, event := range eventsRes.Events { for _, event := range eventsRes.Events {
eventID := event.EventID() addsStateEvents = append(addsStateEvents, event)
foundEvents = append(foundEvents, event) knownEventIDs[event.EventID()] = true
foundEventIDs[eventID] = true
} }
for eventID, found := range foundEventIDs {
// This should never happen because this would imply that the
// roomserver has sent us adds_state_event_ids for events that it
// also doesn't know about, but let's just be sure.
for eventID, found := range knownEventIDs {
if !found { if !found {
return fmt.Errorf("event %s is missing", eventID) return fmt.Errorf("event %s is missing", eventID)
} }
} }
addsStateEvents = foundEvents
} }
ev, err := s.updateStateEvent(ev) ev, err := s.updateStateEvent(ev)