From 54d1228609a0dd541e8f457162643fbf450b6843 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 9 Sep 2020 12:07:05 +0100 Subject: [PATCH] Historical output events --- federationsender/consumers/roomserver.go | 4 ++ roomserver/api/output.go | 3 ++ roomserver/internal/input/input_events.go | 2 +- .../internal/input/input_latest_events.go | 45 +++++++++---------- syncapi/consumers/roomserver.go | 4 ++ 5 files changed, 33 insertions(+), 25 deletions(-) diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index efeb53fa6..1a677faa8 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -110,6 +110,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { + if ore.Historical { + return nil + } + addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState())) if err != nil { return err diff --git a/roomserver/api/output.go b/roomserver/api/output.go index d6c09f9e8..fc9595310 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -75,6 +75,9 @@ type OutputEvent struct { type OutputNewRoomEvent struct { // The Event. Event gomatrixserverlib.HeaderedEvent `json:"event"` + // Is the event historical? If so, then downstream components should not treat the + // event as if it just arrived. + Historical bool `json:"historical"` // The latest events in the room after this event. // This can be used to set the prev events for new events in the room. // This also can be used to get the full current state after this event. diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index ffda04581..19089cbd3 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -114,7 +114,7 @@ func (r *Inputer) processRoomEvent( event, // event input.SendAsServer, // send as server input.TransactionID, // transaction ID - input.Kind != api.KindRewrite, // should we send output events? + input.Kind == api.KindRewrite, // historical ); err != nil { return "", fmt.Errorf("r.updateLatestEvents: %w", err) } diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 24d814f8c..54e18bc68 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -72,7 +72,7 @@ func (r *Inputer) updateLatestEvents( event: event, sendAsServer: sendAsServer, transactionID: transactionID, - sendOutput: sendOutput, + isHistorical: sendOutput, } if err = u.doUpdateLatestEvents(); err != nil { @@ -95,7 +95,7 @@ type latestEventsUpdater struct { stateAtEvent types.StateAtEvent event gomatrixserverlib.Event transactionID *api.TransactionID - sendOutput bool + isHistorical bool // Which server to send this event as. sendAsServer string // The eventID of the event that was processed before this one. @@ -181,35 +181,31 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { return fmt.Errorf("u.api.updateMemberships: %w", err) } - if u.sendOutput { - var update *api.OutputEvent - update, err = u.makeOutputNewRoomEvent() - if err != nil { - return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) - } - updates = append(updates, *update) + var update *api.OutputEvent + update, err = u.makeOutputNewRoomEvent() + if err != nil { + return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) + } + updates = append(updates, *update) - // Send the event to the output logs. - // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. - // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but - // the write to the output log succeeds) - // TODO: This assumes that writing the event to the output log is synchronous. It should be possible to - // send the event asynchronously but we would need to ensure that 1) the events are written to the log in - // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the - // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { - return fmt.Errorf("u.api.WriteOutputEvents: %w", err) - } + // Send the event to the output logs. + // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. + // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but + // the write to the output log succeeds) + // TODO: This assumes that writing the event to the output log is synchronous. It should be possible to + // send the event asynchronously but we would need to ensure that 1) the events are written to the log in + // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the + // necessary bookkeeping we'll keep the event sending synchronous for now. + if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { + return fmt.Errorf("u.api.WriteOutputEvents: %w", err) } if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil { return fmt.Errorf("u.updater.SetLatestEvents: %w", err) } - if u.sendOutput { - if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { - return fmt.Errorf("u.updater.MarkEventAsSent: %w", err) - } + if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { + return fmt.Errorf("u.updater.MarkEventAsSent: %w", err) } return nil @@ -313,6 +309,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) ore := api.OutputNewRoomEvent{ Event: u.event.Headered(u.roomInfo.RoomVersion), + Historical: u.isHistorical, LastSentEventID: u.lastEventIDSent, LatestEventIDs: latestEventIDs, TransactionID: u.transactionID, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 67e656c9a..f43ea58bf 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -143,6 +143,10 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } } + if msg.Historical { + return nil + } + pduPos, err := s.db.WriteEvent( ctx, &ev,