Signal to downstream components if an event has become a forward extremity

This commit is contained in:
Neil Alexander 2020-10-16 10:56:17 +01:00
parent a0a41b21bd
commit 8d80f9a131
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 36 additions and 24 deletions

View file

@ -91,6 +91,9 @@ const (
type OutputNewRoomEvent struct {
// The Event.
Event gomatrixserverlib.HeaderedEvent `json:"event"`
// Is the event a forward extremity in the room at the time of the output event
// being generated?
IsForwardExtremity bool `json:"is_forward_extremity"`
// Does the event completely rewrite the room state? If so, then AddsStateEventIDs
// will contain the entire room state.
RewritesState bool `json:"rewrites_state"`

View file

@ -106,6 +106,8 @@ type latestEventsUpdater struct {
lastEventIDSent string
// The latest events in the room after processing this event.
latest []types.StateAtEventAndReference
// Is the event now a current forward extremity?
isForwardExtremity bool
// The state entries removed from and added to the current state of the
// room as a result of processing this event. They are sorted lists.
removed []types.StateEntry
@ -265,6 +267,8 @@ func (u *latestEventsUpdater) latestState() error {
return nil
}
// calculateLatest works out the new set of forward extremities. Returns
// true if the new event is included in those extremites, false otherwise.
func (u *latestEventsUpdater) calculateLatest(
oldLatest []types.StateAtEventAndReference,
newEvent types.StateAtEventAndReference,
@ -293,6 +297,7 @@ func (u *latestEventsUpdater) calculateLatest(
// We've already referenced this new event so we can just return
// the newly completed extremities at this point.
u.latest = newLatest
u.isForwardExtremity = true
return nil
}
}
@ -307,6 +312,7 @@ func (u *latestEventsUpdater) calculateLatest(
return fmt.Errorf("u.updater.IsReferenced (new): %w", err)
} else if !referenced || len(newLatest) == 0 {
newLatest = append(newLatest, newEvent)
u.isForwardExtremity = true
}
u.latest = newLatest
@ -321,37 +327,40 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
}
ore := api.OutputNewRoomEvent{
Event: u.event.Headered(u.roomInfo.RoomVersion),
RewritesState: u.rewritesState,
LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,
Event: u.event.Headered(u.roomInfo.RoomVersion),
IsForwardExtremity: u.isForwardExtremity,
RewritesState: u.rewritesState,
LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,
}
eventIDMap, err := u.stateEventMap()
if err != nil {
return nil, err
if u.isForwardExtremity {
eventIDMap, err := u.stateEventMap()
if err != nil {
return nil, err
}
for _, entry := range u.added {
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
}
for _, entry := range u.removed {
ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID])
}
for _, entry := range u.stateBeforeEventRemoves {
ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID])
}
for _, entry := range u.stateBeforeEventAdds {
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
}
}
for _, entry := range u.added {
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
}
for _, entry := range u.removed {
ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID])
}
for _, entry := range u.stateBeforeEventRemoves {
ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID])
}
for _, entry := range u.stateBeforeEventAdds {
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
}
ore.SendAsServer = u.sendAsServer
// include extra state events if they were added as nearly every downstream component will care about it
// and we'd rather not have them all hit QueryEventsByID at the same time!
if len(ore.AddsStateEventIDs) > 0 {
ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs)
if err != nil {
var err error
if ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs); err != nil {
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
}
}

View file

@ -146,7 +146,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}
}
if msg.RewritesState {
if msg.IsForwardExtremity && msg.RewritesState {
if err = s.db.PurgeRoom(ctx, ev.RoomID()); err != nil {
return fmt.Errorf("s.db.PurgeRoom: %w", err)
}
@ -159,7 +159,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
msg.AddsStateEventIDs,
msg.RemovesStateEventIDs,
msg.TransactionID,
false,
!msg.IsForwardExtremity,
)
if err != nil {
// panic rather than continue with an inconsistent database