Don't send adds_state_events
in roomserver output events anymore (#2258)
* Don't send `adds_state_events` in roomserver output events anymore * Set `omitempty` on some output fields that aren't always set * Add `AddsState` helper function * No-op if no added state event IDs * Revert "No-op if no added state event IDs" This reverts commit71a0ef3df1
. * Revert "Add `AddsState` helper function" This reverts commitc9fbe45475
.
This commit is contained in:
parent
05fa66c9c8
commit
67de4dbd0c
|
@ -88,7 +88,16 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
|
events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
|
||||||
events = append(events, output.NewRoomEvent.AddStateEvents...)
|
if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
|
||||||
|
eventsReq := &api.QueryEventsByIDRequest{
|
||||||
|
EventIDs: output.NewRoomEvent.AddsStateEventIDs,
|
||||||
|
}
|
||||||
|
eventsRes := &api.QueryEventsByIDResponse{}
|
||||||
|
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
events = append(events, eventsRes.Events...)
|
||||||
|
}
|
||||||
|
|
||||||
// Send event to any relevant application services
|
// Send event to any relevant application services
|
||||||
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
|
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
|
||||||
|
|
|
@ -146,7 +146,28 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
|
||||||
// processMessage updates the list of currently joined hosts in the room
|
// processMessage updates the list of currently joined hosts in the room
|
||||||
// and then sends the event to the hosts that were joined before the event.
|
// and then sends the event to the hosts that were joined before the event.
|
||||||
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
||||||
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState()))
|
eventsRes := &api.QueryEventsByIDResponse{}
|
||||||
|
if len(ore.AddsStateEventIDs) > 0 {
|
||||||
|
eventsReq := &api.QueryEventsByIDRequest{
|
||||||
|
EventIDs: ore.AddsStateEventIDs,
|
||||||
|
}
|
||||||
|
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
||||||
|
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
found := false
|
||||||
|
for _, event := range eventsRes.Events {
|
||||||
|
if event.EventID() == ore.Event.EventID() {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
eventsRes.Events = append(eventsRes.Events, ore.Event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(eventsRes.Events))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,7 +105,7 @@ type OutputNewRoomEvent struct {
|
||||||
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
|
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
|
||||||
// Does the event completely rewrite the room state? If so, then AddsStateEventIDs
|
// Does the event completely rewrite the room state? If so, then AddsStateEventIDs
|
||||||
// will contain the entire room state.
|
// will contain the entire room state.
|
||||||
RewritesState bool `json:"rewrites_state"`
|
RewritesState bool `json:"rewrites_state,omitempty"`
|
||||||
// The latest events in the room after this event.
|
// The latest events in the room after this event.
|
||||||
// This can be used to set the prev events for new events in the room.
|
// This can be used to set the prev events for new events in the room.
|
||||||
// This also can be used to get the full current state after this event.
|
// This also can be used to get the full current state after this event.
|
||||||
|
@ -113,16 +113,9 @@ type OutputNewRoomEvent struct {
|
||||||
// The state event IDs that were added to the state of the room by this event.
|
// The state event IDs that were added to the state of the room by this event.
|
||||||
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date
|
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date
|
||||||
// view of the current state of the room.
|
// view of the current state of the room.
|
||||||
AddsStateEventIDs []string `json:"adds_state_event_ids"`
|
AddsStateEventIDs []string `json:"adds_state_event_ids,omitempty"`
|
||||||
// All extra newly added state events. This is only set if there are *extra* events
|
|
||||||
// other than `Event`. This can happen when forks get merged because state resolution
|
|
||||||
// may decide a bunch of state events on one branch are now valid, so they will be
|
|
||||||
// present in this list. This is useful when trying to maintain the current state of a room
|
|
||||||
// as to do so you need to include both these events and `Event`.
|
|
||||||
AddStateEvents []*gomatrixserverlib.HeaderedEvent `json:"adds_state_events"`
|
|
||||||
|
|
||||||
// The state event IDs that were removed from the state of the room by this event.
|
// The state event IDs that were removed from the state of the room by this event.
|
||||||
RemovesStateEventIDs []string `json:"removes_state_event_ids"`
|
RemovesStateEventIDs []string `json:"removes_state_event_ids,omitempty"`
|
||||||
// The ID of the event that was output before this event.
|
// The ID of the event that was output before this event.
|
||||||
// Or the empty string if this is the first event output for this room.
|
// Or the empty string if this is the first event output for this room.
|
||||||
// This is used by consumers to check if they can safely update their
|
// This is used by consumers to check if they can safely update their
|
||||||
|
@ -145,10 +138,10 @@ type OutputNewRoomEvent struct {
|
||||||
//
|
//
|
||||||
// The state is given as a delta against the current state because they are
|
// The state is given as a delta against the current state because they are
|
||||||
// usually either the same state, or differ by just a couple of events.
|
// usually either the same state, or differ by just a couple of events.
|
||||||
StateBeforeAddsEventIDs []string `json:"state_before_adds_event_ids"`
|
StateBeforeAddsEventIDs []string `json:"state_before_adds_event_ids,omitempty"`
|
||||||
// The state event IDs that are part of the current state, but not part
|
// The state event IDs that are part of the current state, but not part
|
||||||
// of the state at the event.
|
// of the state at the event.
|
||||||
StateBeforeRemovesEventIDs []string `json:"state_before_removes_event_ids"`
|
StateBeforeRemovesEventIDs []string `json:"state_before_removes_event_ids,omitempty"`
|
||||||
// The server name to use to push this event to other servers.
|
// The server name to use to push this event to other servers.
|
||||||
// Or empty if this event shouldn't be pushed to other servers.
|
// Or empty if this event shouldn't be pushed to other servers.
|
||||||
//
|
//
|
||||||
|
@ -167,27 +160,7 @@ type OutputNewRoomEvent struct {
|
||||||
SendAsServer string `json:"send_as_server"`
|
SendAsServer string `json:"send_as_server"`
|
||||||
// The transaction ID of the send request if sent by a local user and one
|
// The transaction ID of the send request if sent by a local user and one
|
||||||
// was specified
|
// was specified
|
||||||
TransactionID *TransactionID `json:"transaction_id"`
|
TransactionID *TransactionID `json:"transaction_id,omitempty"`
|
||||||
}
|
|
||||||
|
|
||||||
// AddsState returns all added state events from this event.
|
|
||||||
//
|
|
||||||
// This function is needed because `AddStateEvents` will not include a copy of
|
|
||||||
// the original event to save space, so you cannot use that slice alone.
|
|
||||||
// Instead, use this function which will add the original event if it is present
|
|
||||||
// in `AddsStateEventIDs`.
|
|
||||||
func (ore *OutputNewRoomEvent) AddsState() []*gomatrixserverlib.HeaderedEvent {
|
|
||||||
includeOutputEvent := false
|
|
||||||
for _, id := range ore.AddsStateEventIDs {
|
|
||||||
if id == ore.Event.EventID() {
|
|
||||||
includeOutputEvent = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !includeOutputEvent {
|
|
||||||
return ore.AddStateEvents
|
|
||||||
}
|
|
||||||
return append(ore.AddStateEvents, ore.Event)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// An OutputOldRoomEvent is written when the roomserver receives an old event.
|
// An OutputOldRoomEvent is written when the roomserver receives an old event.
|
||||||
|
|
|
@ -365,6 +365,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
||||||
LastSentEventID: u.lastEventIDSent,
|
LastSentEventID: u.lastEventIDSent,
|
||||||
LatestEventIDs: latestEventIDs,
|
LatestEventIDs: latestEventIDs,
|
||||||
TransactionID: u.transactionID,
|
TransactionID: u.transactionID,
|
||||||
|
SendAsServer: u.sendAsServer,
|
||||||
}
|
}
|
||||||
|
|
||||||
eventIDMap, err := u.stateEventMap()
|
eventIDMap, err := u.stateEventMap()
|
||||||
|
@ -384,51 +385,17 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
||||||
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
|
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
|
||||||
}
|
}
|
||||||
|
|
||||||
ore.SendAsServer = u.sendAsServer
|
|
||||||
|
|
||||||
// include extra state events if they were added as nearly every downstream component will care about it
|
|
||||||
// and we'd rather not have them all hit QueryEventsByID at the same time!
|
|
||||||
if len(ore.AddsStateEventIDs) > 0 {
|
|
||||||
var err error
|
|
||||||
if ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &api.OutputEvent{
|
return &api.OutputEvent{
|
||||||
Type: api.OutputTypeNewRoomEvent,
|
Type: api.OutputTypeNewRoomEvent,
|
||||||
NewRoomEvent: &ore,
|
NewRoomEvent: &ore,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// extraEventsForIDs returns the full events for the event IDs given, but does not include the current event being
|
|
||||||
// updated.
|
|
||||||
func (u *latestEventsUpdater) extraEventsForIDs(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
|
||||||
var extraEventIDs []string
|
|
||||||
for _, e := range eventIDs {
|
|
||||||
if e == u.event.EventID() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
extraEventIDs = append(extraEventIDs, e)
|
|
||||||
}
|
|
||||||
if len(extraEventIDs) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
extraEvents, err := u.updater.UnsentEventsFromIDs(u.ctx, extraEventIDs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var h []*gomatrixserverlib.HeaderedEvent
|
|
||||||
for _, e := range extraEvents {
|
|
||||||
h = append(h, e.Headered(roomVersion))
|
|
||||||
}
|
|
||||||
return h, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// retrieve an event nid -> event ID map for all events that need updating
|
// retrieve an event nid -> event ID map for all events that need updating
|
||||||
func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
|
func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
|
||||||
var stateEventNIDs []types.EventNID
|
cap := len(u.added) + len(u.removed) + len(u.stateBeforeEventRemoves) + len(u.stateBeforeEventAdds)
|
||||||
var allStateEntries []types.StateEntry
|
stateEventNIDs := make(types.EventNIDs, 0, cap)
|
||||||
|
allStateEntries := make([]types.StateEntry, 0, cap)
|
||||||
allStateEntries = append(allStateEntries, u.added...)
|
allStateEntries = append(allStateEntries, u.added...)
|
||||||
allStateEntries = append(allStateEntries, u.removed...)
|
allStateEntries = append(allStateEntries, u.removed...)
|
||||||
allStateEntries = append(allStateEntries, u.stateBeforeEventRemoves...)
|
allStateEntries = append(allStateEntries, u.stateBeforeEventRemoves...)
|
||||||
|
@ -436,12 +403,6 @@ func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error)
|
||||||
for _, entry := range allStateEntries {
|
for _, entry := range allStateEntries {
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||||
}
|
}
|
||||||
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
|
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(stateEventNIDs)]
|
||||||
return u.updater.EventIDs(u.ctx, stateEventNIDs)
|
return u.updater.EventIDs(u.ctx, stateEventNIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventNIDSorter []types.EventNID
|
|
||||||
|
|
||||||
func (s eventNIDSorter) Len() int { return len(s) }
|
|
||||||
func (s eventNIDSorter) Less(i, j int) bool { return s[i] < s[j] }
|
|
||||||
func (s eventNIDSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
||||||
|
|
|
@ -154,7 +154,42 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
ctx context.Context, msg api.OutputNewRoomEvent,
|
ctx context.Context, msg api.OutputNewRoomEvent,
|
||||||
) error {
|
) error {
|
||||||
ev := msg.Event
|
ev := msg.Event
|
||||||
addsStateEvents := msg.AddsState()
|
|
||||||
|
addsStateEvents := []*gomatrixserverlib.HeaderedEvent{}
|
||||||
|
foundEventIDs := map[string]bool{}
|
||||||
|
if len(msg.AddsStateEventIDs) > 0 {
|
||||||
|
for _, eventID := range msg.AddsStateEventIDs {
|
||||||
|
foundEventIDs[eventID] = false
|
||||||
|
}
|
||||||
|
foundEvents, err := s.db.Events(ctx, msg.AddsStateEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("s.db.Events: %w", err)
|
||||||
|
}
|
||||||
|
for _, event := range foundEvents {
|
||||||
|
foundEventIDs[event.EventID()] = true
|
||||||
|
}
|
||||||
|
eventsReq := &api.QueryEventsByIDRequest{}
|
||||||
|
eventsRes := &api.QueryEventsByIDResponse{}
|
||||||
|
for eventID, found := range foundEventIDs {
|
||||||
|
if !found {
|
||||||
|
eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err = s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
|
||||||
|
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
|
||||||
|
}
|
||||||
|
for _, event := range eventsRes.Events {
|
||||||
|
eventID := event.EventID()
|
||||||
|
foundEvents = append(foundEvents, event)
|
||||||
|
foundEventIDs[eventID] = true
|
||||||
|
}
|
||||||
|
for eventID, found := range foundEventIDs {
|
||||||
|
if !found {
|
||||||
|
return fmt.Errorf("event %s is missing", eventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
addsStateEvents = foundEvents
|
||||||
|
}
|
||||||
|
|
||||||
ev, err := s.updateStateEvent(ev)
|
ev, err := s.updateStateEvent(ev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue