mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-03-03 17:03:10 -06:00
Don't calculate state changes before the event (nothing uses them), set latest events before generating output events
This commit is contained in:
parent
350a5e5393
commit
117db97d9b
|
|
@ -102,27 +102,16 @@ type latestEventsUpdater struct {
|
||||||
stateAtEvent types.StateAtEvent
|
stateAtEvent types.StateAtEvent
|
||||||
event *gomatrixserverlib.Event
|
event *gomatrixserverlib.Event
|
||||||
transactionID *api.TransactionID
|
transactionID *api.TransactionID
|
||||||
rewritesState bool
|
rewritesState bool // Did this event rewrite the state entirely? (i.e. federated join)
|
||||||
// Which server to send this event as.
|
sendAsServer string // Which server to send this event as.
|
||||||
sendAsServer string
|
lastEventIDSent string // The eventID of the event that was processed before this one.
|
||||||
// The eventID of the event that was processed before this one.
|
oldLatest types.StateAtEventAndReferences // Forward extremities before this event.
|
||||||
lastEventIDSent string
|
latest types.StateAtEventAndReferences // Forward extremities after this event.
|
||||||
// The latest events in the room after processing this event.
|
removed []types.StateEntry // State events removed from the current room state.
|
||||||
oldLatest types.StateAtEventAndReferences
|
added []types.StateEntry // State events added to the current room state.
|
||||||
latest types.StateAtEventAndReferences
|
oldStateNID types.StateSnapshotNID // The old room current state snapshot NID.
|
||||||
// The state entries removed from and added to the current state of the
|
newStateNID types.StateSnapshotNID // The new room current state snapshot NID.
|
||||||
// room as a result of processing this event. They are sorted lists.
|
historyVisibility gomatrixserverlib.HistoryVisibility // The history visibility of the event itself (from state before).
|
||||||
removed []types.StateEntry
|
|
||||||
added []types.StateEntry
|
|
||||||
// The state entries that are removed and added to recover the state before
|
|
||||||
// the event being processed. They are sorted lists.
|
|
||||||
stateBeforeEventRemoves []types.StateEntry
|
|
||||||
stateBeforeEventAdds []types.StateEntry
|
|
||||||
// The snapshots of current state before and after processing this event
|
|
||||||
oldStateNID types.StateSnapshotNID
|
|
||||||
newStateNID types.StateSnapshotNID
|
|
||||||
// The history visibility of the event itself (from the state before the event).
|
|
||||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
|
|
@ -178,11 +167,9 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
u.newStateNID = u.oldStateNID
|
u.newStateNID = u.oldStateNID
|
||||||
}
|
}
|
||||||
|
|
||||||
update, err := u.makeOutputNewRoomEvent()
|
if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
|
||||||
if err != nil {
|
return fmt.Errorf("u.updater.SetLatestEvents: %w", err)
|
||||||
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
|
|
||||||
}
|
}
|
||||||
updates = append(updates, *update)
|
|
||||||
|
|
||||||
// Send the event to the output logs.
|
// Send the event to the output logs.
|
||||||
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
|
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
|
||||||
|
|
@ -192,14 +179,15 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
||||||
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
||||||
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
||||||
|
update, err := u.makeOutputNewRoomEvent()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
|
||||||
|
}
|
||||||
|
updates = append(updates, *update)
|
||||||
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID(), updates); err != nil {
|
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID(), updates); err != nil {
|
||||||
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
|
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
|
|
||||||
return fmt.Errorf("u.updater.SetLatestEvents: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
|
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
|
||||||
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
|
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -286,15 +274,6 @@ func (u *latestEventsUpdater) latestState() error {
|
||||||
}).Errorf("Unexpected state deletion (removing %d events)", removed)
|
}).Errorf("Unexpected state deletion (removing %d events)", removed)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Also work out the state before the event removes and the event
|
|
||||||
// adds.
|
|
||||||
u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = roomState.DifferenceBetweeenStateSnapshots(
|
|
||||||
ctx, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("roomState.DifferenceBetweeenStateSnapshots: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -388,12 +367,6 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
||||||
for _, entry := range u.removed {
|
for _, entry := range u.removed {
|
||||||
ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID])
|
ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID])
|
||||||
}
|
}
|
||||||
for _, entry := range u.stateBeforeEventRemoves {
|
|
||||||
ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID])
|
|
||||||
}
|
|
||||||
for _, entry := range u.stateBeforeEventAdds {
|
|
||||||
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
|
|
||||||
}
|
|
||||||
|
|
||||||
return &api.OutputEvent{
|
return &api.OutputEvent{
|
||||||
Type: api.OutputTypeNewRoomEvent,
|
Type: api.OutputTypeNewRoomEvent,
|
||||||
|
|
@ -403,13 +376,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
||||||
|
|
||||||
// retrieve an event nid -> event ID map for all events that need updating
|
// retrieve an event nid -> event ID map for all events that need updating
|
||||||
func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
|
func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
|
||||||
cap := len(u.added) + len(u.removed) + len(u.stateBeforeEventRemoves) + len(u.stateBeforeEventAdds)
|
cap := len(u.added) + len(u.removed) // + len(u.stateBeforeEventRemoves) + len(u.stateBeforeEventAdds)
|
||||||
stateEventNIDs := make(types.EventNIDs, 0, cap)
|
stateEventNIDs := make(types.EventNIDs, 0, cap)
|
||||||
allStateEntries := make([]types.StateEntry, 0, cap)
|
allStateEntries := make([]types.StateEntry, 0, cap)
|
||||||
allStateEntries = append(allStateEntries, u.added...)
|
allStateEntries = append(allStateEntries, u.added...)
|
||||||
allStateEntries = append(allStateEntries, u.removed...)
|
allStateEntries = append(allStateEntries, u.removed...)
|
||||||
allStateEntries = append(allStateEntries, u.stateBeforeEventRemoves...)
|
|
||||||
allStateEntries = append(allStateEntries, u.stateBeforeEventAdds...)
|
|
||||||
for _, entry := range allStateEntries {
|
for _, entry := range allStateEntries {
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue