Try that again

This commit is contained in:
Neil Alexander 2022-05-09 14:56:48 +01:00
parent a2600cd27b
commit 74baae0669
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -154,49 +154,71 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent, ctx context.Context, msg api.OutputNewRoomEvent,
) error { ) error {
ev := msg.Event ev := msg.Event
addsStateEvents := []*gomatrixserverlib.HeaderedEvent{} addsStateEvents := []*gomatrixserverlib.HeaderedEvent{}
missingAddStateEventIDs := make([]string, 0, len(msg.AddsStateEventIDs))
foundEventIDs := map[string]bool{ // Work out the list of events we need to find out about. Either
msg.Event.EventID(): true, // they will be the event supplied in the request, we will find it
// in the sync API database or we'll need to ask the roomserver.
knownEventIDs := make(map[string]bool, len(msg.AddsStateEventIDs))
for _, eventID := range msg.AddsStateEventIDs {
knownEventIDs[eventID] = eventID == ev.EventID()
} }
if len(msg.AddsStateEventIDs) > 0 {
for _, eventID := range msg.AddsStateEventIDs { // Work out which events we want to look up in the sync API database.
if _, ok := foundEventIDs[eventID]; !ok { // At this stage the only event that should be excluded is the event
foundEventIDs[eventID] = false // supplied in the request, if it appears in the adds_state_event_ids.
missingAddStateEventIDs = append(missingAddStateEventIDs, eventID) missingEventIDs := make([]string, 0, len(msg.AddsStateEventIDs))
} for eventID, known := range knownEventIDs {
if !known {
missingEventIDs = append(missingEventIDs, eventID)
} }
} }
if len(missingAddStateEventIDs) > 0 {
foundEvents, err := s.db.Events(ctx, missingAddStateEventIDs) // Look the events up in the database. If we know them, add them into
// the set of adds state events.
if len(missingEventIDs) > 0 {
alreadyKnown, err := s.db.Events(ctx, msg.AddsStateEventIDs)
if err != nil { if err != nil {
return fmt.Errorf("s.db.Events: %w", err) return fmt.Errorf("s.db.Events: %w", err)
} }
for _, event := range foundEvents { for _, knownEvent := range alreadyKnown {
foundEventIDs[event.EventID()] = true knownEventIDs[knownEvent.EventID()] = true
addsStateEvents = append(addsStateEvents, knownEvent)
}
}
// Now work out if there are any remaining events we don't know. For
// these we will need to ask the roomserver for help.
missingEventIDs = missingEventIDs[:0]
for eventID, known := range knownEventIDs {
if !known {
missingEventIDs = append(missingEventIDs, eventID)
}
}
// Ask the roomserver and add in the rest of the results into the set.
// Finally, work out if there are any more events missing.
if len(missingEventIDs) > 0 {
eventsReq := &api.QueryEventsByIDRequest{
EventIDs: missingEventIDs,
} }
eventsReq := &api.QueryEventsByIDRequest{}
eventsRes := &api.QueryEventsByIDResponse{} eventsRes := &api.QueryEventsByIDResponse{}
for eventID, found := range foundEventIDs { if err := s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
if !found {
eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
}
}
if err = s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err) return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
} }
for _, event := range eventsRes.Events { for _, event := range eventsRes.Events {
eventID := event.EventID() addsStateEvents = append(addsStateEvents, event)
foundEvents = append(foundEvents, event) knownEventIDs[event.EventID()] = true
foundEventIDs[eventID] = true
} }
for eventID, found := range foundEventIDs {
// This should never happen because this would imply that the
// roomserver has sent us adds_state_event_ids for events that it
// also doesn't know about, but let's just be sure.
for eventID, found := range knownEventIDs {
if !found { if !found {
return fmt.Errorf("event %s is missing", eventID) return fmt.Errorf("event %s is missing", eventID)
} }
} }
addsStateEvents = foundEvents
} }
ev, err := s.updateStateEvent(ev) ev, err := s.updateStateEvent(ev)