From a4e111cd4070e395bd8ed8dd00c197636ae72f27 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Thu, 23 Nov 2023 19:28:59 +0100 Subject: [PATCH] Let doUpdateLatestEvents return the updates to send to NATS --- .../internal/input/input_latest_events.go | 56 ++++++++++--------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index ec03d6f13..dd0bd6049 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -83,10 +83,30 @@ func (r *Inputer) updateLatestEvents( historyVisibility: historyVisibility, } - if err = u.doUpdateLatestEvents(); err != nil { + var updates []api.OutputEvent + updates, err = u.doUpdateLatestEvents() + if err != nil { return fmt.Errorf("u.doUpdateLatestEvents: %w", err) } + // Send the event to the output logs. + // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. + // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but + // the write to the output log succeeds) + // TODO: This assumes that writing the event to the output log is synchronous. It should be possible to + // send the event asynchronously but we would need to ensure that 1) the events are written to the log in + // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the + // necessary bookkeeping we'll keep the event sending synchronous for now. + if len(updates) > 0 { + if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil { + return fmt.Errorf("u.api.WriteOutputEvents: %w", err) + } + + if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { + return fmt.Errorf("u.updater.MarkEventAsSent: %w", err) + } + } + succeeded = true return } @@ -126,7 +146,7 @@ type latestEventsUpdater struct { historyVisibility gomatrixserverlib.HistoryVisibility } -func (u *latestEventsUpdater) doUpdateLatestEvents() error { +func (u *latestEventsUpdater) doUpdateLatestEvents() ([]api.OutputEvent, error) { u.lastEventIDSent = u.updater.LastEventIDSent() // If we are doing a regular event update then we will get the @@ -144,9 +164,9 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // If the event has already been written to the output log then we // don't need to do anything, as we've handled it already. if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil { - return fmt.Errorf("u.updater.HasEventBeenSent: %w", err) + return nil, fmt.Errorf("u.updater.HasEventBeenSent: %w", err) } else if hasBeenSent { - return nil + return nil, nil } // Work out what the latest events are. This will include the new @@ -159,7 +179,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { }, ) if err != nil { - return fmt.Errorf("u.calculateLatest: %w", err) + return nil, fmt.Errorf("u.calculateLatest: %w", err) } // Now that we know what the latest events are, it's time to get the @@ -167,45 +187,29 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { var updates []api.OutputEvent if extremitiesChanged || u.rewritesState { if err = u.latestState(); err != nil { - return fmt.Errorf("u.latestState: %w", err) + return nil, fmt.Errorf("u.latestState: %w", err) } // If we need to generate any output events then here's where we do it. // TODO: Move this! if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil { - return fmt.Errorf("u.api.updateMemberships: %w", err) + return nil, fmt.Errorf("u.api.updateMemberships: %w", err) } } else { u.newStateNID = u.oldStateNID } if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil { - return fmt.Errorf("u.updater.SetLatestEvents: %w", err) + return nil, fmt.Errorf("u.updater.SetLatestEvents: %w", err) } update, err := u.makeOutputNewRoomEvent() if err != nil { - return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) + return nil, fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) } updates = append(updates, *update) - // Send the event to the output logs. - // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. - // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but - // the write to the output log succeeds) - // TODO: This assumes that writing the event to the output log is synchronous. It should be possible to - // send the event asynchronously but we would need to ensure that 1) the events are written to the log in - // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the - // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil { - return fmt.Errorf("u.api.WriteOutputEvents: %w", err) - } - - if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { - return fmt.Errorf("u.updater.MarkEventAsSent: %w", err) - } - - return nil + return updates, nil } func (u *latestEventsUpdater) latestState() error {