diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 60b8b7d81..9d723bed1 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -87,12 +87,17 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) return true } - events, err := output.NewRoomEvent.AddsState(ctx, s.rsAPI) - if err != nil { - log.WithError(err).Errorf("roomserver output log: failed to get state events") - return false + events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} + if len(output.NewRoomEvent.AddsStateEventIDs) > 0 { + eventsReq := &api.QueryEventsByIDRequest{ + EventIDs: output.NewRoomEvent.AddsStateEventIDs, + } + eventsRes := &api.QueryEventsByIDResponse{} + if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil { + return false + } + events = append(events, eventsRes.Events...) } - events = append(events, output.NewRoomEvent.Event) // Send event to any relevant application services if err := s.filterRoomserverEvents(context.TODO(), events); err != nil { diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index ba1089821..989f7cf49 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -146,13 +146,28 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { - stateEvents, err := ore.AddsState(s.ctx, s.rsAPI) - if err != nil { - return fmt.Errorf("ore.AddsState: %w", err) - } - stateEvents = append(stateEvents, ore.Event) + eventsRes := &api.QueryEventsByIDResponse{} + if len(ore.AddsStateEventIDs) > 0 { + eventsReq := &api.QueryEventsByIDRequest{ + EventIDs: ore.AddsStateEventIDs, + } + if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil { + return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err) + } - addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(stateEvents)) + found := false + for _, event := range eventsRes.Events { + if event.EventID() == ore.Event.EventID() { + found = true + break + } + } + if !found { + eventsRes.Events = append(eventsRes.Events, ore.Event) + } + } + + addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(eventsRes.Events)) if err != nil { return err } diff --git a/roomserver/api/output.go b/roomserver/api/output.go index edc2251ce..767611ec4 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -15,9 +15,6 @@ package api import ( - "context" - "fmt" - "github.com/matrix-org/gomatrixserverlib" ) @@ -166,21 +163,6 @@ type OutputNewRoomEvent struct { TransactionID *TransactionID `json:"transaction_id,omitempty"` } -// AddsState asks the roomserver API for events specified in `adds_state_event_ids`. -// The slice returned contains the output room event itself in all cases. -func (o *OutputNewRoomEvent) AddsState(ctx context.Context, rsAPI RoomserverInternalAPI) ([]*gomatrixserverlib.HeaderedEvent, error) { - events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(o.AddsStateEventIDs)) - eventsReq := &QueryEventsByIDRequest{ - EventIDs: o.AddsStateEventIDs, - } - eventsRes := &QueryEventsByIDResponse{} - if err := rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil { - return nil, fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err) - } - events = append(events, eventsRes.Events...) - return events, nil -} - // An OutputOldRoomEvent is written when the roomserver receives an old event. // This will typically happen as a result of getting either missing events // or backfilling. Downstream components may wish to send these events to