Let doUpdateLatestEvents return the updates to send to NATS
This commit is contained in:
parent
b8f91485b4
commit
a4e111cd40
|
@ -83,10 +83,30 @@ func (r *Inputer) updateLatestEvents(
|
||||||
historyVisibility: historyVisibility,
|
historyVisibility: historyVisibility,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = u.doUpdateLatestEvents(); err != nil {
|
var updates []api.OutputEvent
|
||||||
|
updates, err = u.doUpdateLatestEvents()
|
||||||
|
if err != nil {
|
||||||
return fmt.Errorf("u.doUpdateLatestEvents: %w", err)
|
return fmt.Errorf("u.doUpdateLatestEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send the event to the output logs.
|
||||||
|
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
|
||||||
|
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
|
||||||
|
// the write to the output log succeeds)
|
||||||
|
// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
|
||||||
|
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
||||||
|
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
||||||
|
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
||||||
|
if len(updates) > 0 {
|
||||||
|
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil {
|
||||||
|
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
|
||||||
|
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
succeeded = true
|
succeeded = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -126,7 +146,7 @@ type latestEventsUpdater struct {
|
||||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
func (u *latestEventsUpdater) doUpdateLatestEvents() ([]api.OutputEvent, error) {
|
||||||
u.lastEventIDSent = u.updater.LastEventIDSent()
|
u.lastEventIDSent = u.updater.LastEventIDSent()
|
||||||
|
|
||||||
// If we are doing a regular event update then we will get the
|
// If we are doing a regular event update then we will get the
|
||||||
|
@ -144,9 +164,9 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
// If the event has already been written to the output log then we
|
// If the event has already been written to the output log then we
|
||||||
// don't need to do anything, as we've handled it already.
|
// don't need to do anything, as we've handled it already.
|
||||||
if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil {
|
if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil {
|
||||||
return fmt.Errorf("u.updater.HasEventBeenSent: %w", err)
|
return nil, fmt.Errorf("u.updater.HasEventBeenSent: %w", err)
|
||||||
} else if hasBeenSent {
|
} else if hasBeenSent {
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Work out what the latest events are. This will include the new
|
// Work out what the latest events are. This will include the new
|
||||||
|
@ -159,7 +179,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("u.calculateLatest: %w", err)
|
return nil, fmt.Errorf("u.calculateLatest: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now that we know what the latest events are, it's time to get the
|
// Now that we know what the latest events are, it's time to get the
|
||||||
|
@ -167,45 +187,29 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
var updates []api.OutputEvent
|
var updates []api.OutputEvent
|
||||||
if extremitiesChanged || u.rewritesState {
|
if extremitiesChanged || u.rewritesState {
|
||||||
if err = u.latestState(); err != nil {
|
if err = u.latestState(); err != nil {
|
||||||
return fmt.Errorf("u.latestState: %w", err)
|
return nil, fmt.Errorf("u.latestState: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we need to generate any output events then here's where we do it.
|
// If we need to generate any output events then here's where we do it.
|
||||||
// TODO: Move this!
|
// TODO: Move this!
|
||||||
if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil {
|
if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil {
|
||||||
return fmt.Errorf("u.api.updateMemberships: %w", err)
|
return nil, fmt.Errorf("u.api.updateMemberships: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
u.newStateNID = u.oldStateNID
|
u.newStateNID = u.oldStateNID
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
|
if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
|
||||||
return fmt.Errorf("u.updater.SetLatestEvents: %w", err)
|
return nil, fmt.Errorf("u.updater.SetLatestEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
update, err := u.makeOutputNewRoomEvent()
|
update, err := u.makeOutputNewRoomEvent()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
|
return nil, fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
|
||||||
}
|
}
|
||||||
updates = append(updates, *update)
|
updates = append(updates, *update)
|
||||||
|
|
||||||
// Send the event to the output logs.
|
return updates, nil
|
||||||
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
|
|
||||||
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
|
|
||||||
// the write to the output log succeeds)
|
|
||||||
// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
|
|
||||||
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
|
||||||
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
|
||||||
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
|
||||||
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil {
|
|
||||||
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
|
|
||||||
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *latestEventsUpdater) latestState() error {
|
func (u *latestEventsUpdater) latestState() error {
|
||||||
|
|
Loading…
Reference in a new issue