mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-01 03:03:10 -06:00
parent
e8764e59e9
commit
5f2521d4f6
|
|
@ -87,12 +87,17 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
events, err := output.NewRoomEvent.AddsState(ctx, s.rsAPI)
|
events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
|
||||||
if err != nil {
|
if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
|
||||||
log.WithError(err).Errorf("roomserver output log: failed to get state events")
|
eventsReq := &api.QueryEventsByIDRequest{
|
||||||
return false
|
EventIDs: output.NewRoomEvent.AddsStateEventIDs,
|
||||||
|
}
|
||||||
|
eventsRes := &api.QueryEventsByIDResponse{}
|
||||||
|
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
events = append(events, eventsRes.Events...)
|
||||||
}
|
}
|
||||||
events = append(events, output.NewRoomEvent.Event)
|
|
||||||
|
|
||||||
// Send event to any relevant application services
|
// Send event to any relevant application services
|
||||||
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
|
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -146,13 +146,28 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
|
||||||
// processMessage updates the list of currently joined hosts in the room
|
// processMessage updates the list of currently joined hosts in the room
|
||||||
// and then sends the event to the hosts that were joined before the event.
|
// and then sends the event to the hosts that were joined before the event.
|
||||||
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
||||||
stateEvents, err := ore.AddsState(s.ctx, s.rsAPI)
|
eventsRes := &api.QueryEventsByIDResponse{}
|
||||||
if err != nil {
|
if len(ore.AddsStateEventIDs) > 0 {
|
||||||
return fmt.Errorf("ore.AddsState: %w", err)
|
eventsReq := &api.QueryEventsByIDRequest{
|
||||||
}
|
EventIDs: ore.AddsStateEventIDs,
|
||||||
stateEvents = append(stateEvents, ore.Event)
|
}
|
||||||
|
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
||||||
|
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(stateEvents))
|
found := false
|
||||||
|
for _, event := range eventsRes.Events {
|
||||||
|
if event.EventID() == ore.Event.EventID() {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
eventsRes.Events = append(eventsRes.Events, ore.Event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(eventsRes.Events))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,9 +15,6 @@
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -166,21 +163,6 @@ type OutputNewRoomEvent struct {
|
||||||
TransactionID *TransactionID `json:"transaction_id,omitempty"`
|
TransactionID *TransactionID `json:"transaction_id,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddsState asks the roomserver API for events specified in `adds_state_event_ids`.
|
|
||||||
// The slice returned contains the output room event itself in all cases.
|
|
||||||
func (o *OutputNewRoomEvent) AddsState(ctx context.Context, rsAPI RoomserverInternalAPI) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
|
||||||
events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(o.AddsStateEventIDs))
|
|
||||||
eventsReq := &QueryEventsByIDRequest{
|
|
||||||
EventIDs: o.AddsStateEventIDs,
|
|
||||||
}
|
|
||||||
eventsRes := &QueryEventsByIDResponse{}
|
|
||||||
if err := rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
|
|
||||||
return nil, fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
|
|
||||||
}
|
|
||||||
events = append(events, eventsRes.Events...)
|
|
||||||
return events, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// An OutputOldRoomEvent is written when the roomserver receives an old event.
|
// An OutputOldRoomEvent is written when the roomserver receives an old event.
|
||||||
// This will typically happen as a result of getting either missing events
|
// This will typically happen as a result of getting either missing events
|
||||||
// or backfilling. Downstream components may wish to send these events to
|
// or backfilling. Downstream components may wish to send these events to
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue