From c9fbe45475eb12ae44d2a8da7c0fc3a002ad9819 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 7 Mar 2022 16:30:57 +0000 Subject: [PATCH] Add `AddsState` helper function --- appservice/consumers/roomserver.go | 15 +++++---------- federationapi/consumers/roomserver.go | 25 +++++-------------------- roomserver/api/output.go | 18 ++++++++++++++++++ 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 9d723bed1..60b8b7d81 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -87,17 +87,12 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) return true } - events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} - if len(output.NewRoomEvent.AddsStateEventIDs) > 0 { - eventsReq := &api.QueryEventsByIDRequest{ - EventIDs: output.NewRoomEvent.AddsStateEventIDs, - } - eventsRes := &api.QueryEventsByIDResponse{} - if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil { - return false - } - events = append(events, eventsRes.Events...) + events, err := output.NewRoomEvent.AddsState(ctx, s.rsAPI) + if err != nil { + log.WithError(err).Errorf("roomserver output log: failed to get state events") + return false } + events = append(events, output.NewRoomEvent.Event) // Send event to any relevant application services if err := s.filterRoomserverEvents(context.TODO(), events); err != nil { diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 989f7cf49..ba1089821 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -146,28 +146,13 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { - eventsRes := &api.QueryEventsByIDResponse{} - if len(ore.AddsStateEventIDs) > 0 { - eventsReq := &api.QueryEventsByIDRequest{ - EventIDs: ore.AddsStateEventIDs, - } - if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil { - return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err) - } - - found := false - for _, event := range eventsRes.Events { - if event.EventID() == ore.Event.EventID() { - found = true - break - } - } - if !found { - eventsRes.Events = append(eventsRes.Events, ore.Event) - } + stateEvents, err := ore.AddsState(s.ctx, s.rsAPI) + if err != nil { + return fmt.Errorf("ore.AddsState: %w", err) } + stateEvents = append(stateEvents, ore.Event) - addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(eventsRes.Events)) + addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(stateEvents)) if err != nil { return err } diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 767611ec4..edc2251ce 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -15,6 +15,9 @@ package api import ( + "context" + "fmt" + "github.com/matrix-org/gomatrixserverlib" ) @@ -163,6 +166,21 @@ type OutputNewRoomEvent struct { TransactionID *TransactionID `json:"transaction_id,omitempty"` } +// AddsState asks the roomserver API for events specified in `adds_state_event_ids`. +// The slice returned contains the output room event itself in all cases. +func (o *OutputNewRoomEvent) AddsState(ctx context.Context, rsAPI RoomserverInternalAPI) ([]*gomatrixserverlib.HeaderedEvent, error) { + events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(o.AddsStateEventIDs)) + eventsReq := &QueryEventsByIDRequest{ + EventIDs: o.AddsStateEventIDs, + } + eventsRes := &QueryEventsByIDResponse{} + if err := rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil { + return nil, fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err) + } + events = append(events, eventsRes.Events...) + return events, nil +} + // An OutputOldRoomEvent is written when the roomserver receives an old event. // This will typically happen as a result of getting either missing events // or backfilling. Downstream components may wish to send these events to