From 8d80f9a1317fafd1bd38f2c8f5dc8e203a054f75 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 16 Oct 2020 10:56:17 +0100 Subject: [PATCH] Signal to downstream components if an event has become a forward extremity --- roomserver/api/output.go | 3 ++ .../internal/input/input_latest_events.go | 53 +++++++++++-------- syncapi/consumers/roomserver.go | 4 +- 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/roomserver/api/output.go b/roomserver/api/output.go index d57f3b04c..eae0d8627 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -91,6 +91,9 @@ const ( type OutputNewRoomEvent struct { // The Event. Event gomatrixserverlib.HeaderedEvent `json:"event"` + // Is the event a forward extremity in the room at the time of the output event + // being generated? + IsForwardExtremity bool `json:"is_forward_extremity"` // Does the event completely rewrite the room state? If so, then AddsStateEventIDs // will contain the entire room state. RewritesState bool `json:"rewrites_state"` diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index ad6ec8b59..5ef5b93b2 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -106,6 +106,8 @@ type latestEventsUpdater struct { lastEventIDSent string // The latest events in the room after processing this event. latest []types.StateAtEventAndReference + // Is the event now a current forward extremity? + isForwardExtremity bool // The state entries removed from and added to the current state of the // room as a result of processing this event. They are sorted lists. removed []types.StateEntry @@ -265,6 +267,8 @@ func (u *latestEventsUpdater) latestState() error { return nil } +// calculateLatest works out the new set of forward extremities. Returns +// true if the new event is included in those extremites, false otherwise. func (u *latestEventsUpdater) calculateLatest( oldLatest []types.StateAtEventAndReference, newEvent types.StateAtEventAndReference, @@ -293,6 +297,7 @@ func (u *latestEventsUpdater) calculateLatest( // We've already referenced this new event so we can just return // the newly completed extremities at this point. u.latest = newLatest + u.isForwardExtremity = true return nil } } @@ -307,6 +312,7 @@ func (u *latestEventsUpdater) calculateLatest( return fmt.Errorf("u.updater.IsReferenced (new): %w", err) } else if !referenced || len(newLatest) == 0 { newLatest = append(newLatest, newEvent) + u.isForwardExtremity = true } u.latest = newLatest @@ -321,37 +327,40 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) } ore := api.OutputNewRoomEvent{ - Event: u.event.Headered(u.roomInfo.RoomVersion), - RewritesState: u.rewritesState, - LastSentEventID: u.lastEventIDSent, - LatestEventIDs: latestEventIDs, - TransactionID: u.transactionID, + Event: u.event.Headered(u.roomInfo.RoomVersion), + IsForwardExtremity: u.isForwardExtremity, + RewritesState: u.rewritesState, + LastSentEventID: u.lastEventIDSent, + LatestEventIDs: latestEventIDs, + TransactionID: u.transactionID, } - eventIDMap, err := u.stateEventMap() - if err != nil { - return nil, err + if u.isForwardExtremity { + eventIDMap, err := u.stateEventMap() + if err != nil { + return nil, err + } + for _, entry := range u.added { + ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID]) + } + for _, entry := range u.removed { + ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID]) + } + for _, entry := range u.stateBeforeEventRemoves { + ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID]) + } + for _, entry := range u.stateBeforeEventAdds { + ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID]) + } } - for _, entry := range u.added { - ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID]) - } - for _, entry := range u.removed { - ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID]) - } - for _, entry := range u.stateBeforeEventRemoves { - ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID]) - } - for _, entry := range u.stateBeforeEventAdds { - ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID]) - } ore.SendAsServer = u.sendAsServer // include extra state events if they were added as nearly every downstream component will care about it // and we'd rather not have them all hit QueryEventsByID at the same time! if len(ore.AddsStateEventIDs) > 0 { - ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs) - if err != nil { + var err error + if ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs); err != nil { return nil, fmt.Errorf("failed to load add_state_events from db: %w", err) } } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index ca48c8300..a1f94441e 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -146,7 +146,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } } - if msg.RewritesState { + if msg.IsForwardExtremity && msg.RewritesState { if err = s.db.PurgeRoom(ctx, ev.RoomID()); err != nil { return fmt.Errorf("s.db.PurgeRoom: %w", err) } @@ -159,7 +159,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( msg.AddsStateEventIDs, msg.RemovesStateEventIDs, msg.TransactionID, - false, + !msg.IsForwardExtremity, ) if err != nil { // panic rather than continue with an inconsistent database