Historical output events

This commit is contained in:
Neil Alexander 2020-09-09 12:07:05 +01:00
parent 29d8da614b
commit 54d1228609
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
5 changed files with 33 additions and 25 deletions

View file

@ -110,6 +110,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event. // and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
if ore.Historical {
return nil
}
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState())) addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState()))
if err != nil { if err != nil {
return err return err

View file

@ -75,6 +75,9 @@ type OutputEvent struct {
type OutputNewRoomEvent struct { type OutputNewRoomEvent struct {
// The Event. // The Event.
Event gomatrixserverlib.HeaderedEvent `json:"event"` Event gomatrixserverlib.HeaderedEvent `json:"event"`
// Is the event historical? If so, then downstream components should not treat the
// event as if it just arrived.
Historical bool `json:"historical"`
// The latest events in the room after this event. // The latest events in the room after this event.
// This can be used to set the prev events for new events in the room. // This can be used to set the prev events for new events in the room.
// This also can be used to get the full current state after this event. // This also can be used to get the full current state after this event.

View file

@ -114,7 +114,7 @@ func (r *Inputer) processRoomEvent(
event, // event event, // event
input.SendAsServer, // send as server input.SendAsServer, // send as server
input.TransactionID, // transaction ID input.TransactionID, // transaction ID
input.Kind != api.KindRewrite, // should we send output events? input.Kind == api.KindRewrite, // historical
); err != nil { ); err != nil {
return "", fmt.Errorf("r.updateLatestEvents: %w", err) return "", fmt.Errorf("r.updateLatestEvents: %w", err)
} }

View file

@ -72,7 +72,7 @@ func (r *Inputer) updateLatestEvents(
event: event, event: event,
sendAsServer: sendAsServer, sendAsServer: sendAsServer,
transactionID: transactionID, transactionID: transactionID,
sendOutput: sendOutput, isHistorical: sendOutput,
} }
if err = u.doUpdateLatestEvents(); err != nil { if err = u.doUpdateLatestEvents(); err != nil {
@ -95,7 +95,7 @@ type latestEventsUpdater struct {
stateAtEvent types.StateAtEvent stateAtEvent types.StateAtEvent
event gomatrixserverlib.Event event gomatrixserverlib.Event
transactionID *api.TransactionID transactionID *api.TransactionID
sendOutput bool isHistorical bool
// Which server to send this event as. // Which server to send this event as.
sendAsServer string sendAsServer string
// The eventID of the event that was processed before this one. // The eventID of the event that was processed before this one.
@ -181,35 +181,31 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
return fmt.Errorf("u.api.updateMemberships: %w", err) return fmt.Errorf("u.api.updateMemberships: %w", err)
} }
if u.sendOutput { var update *api.OutputEvent
var update *api.OutputEvent update, err = u.makeOutputNewRoomEvent()
update, err = u.makeOutputNewRoomEvent() if err != nil {
if err != nil { return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) }
} updates = append(updates, *update)
updates = append(updates, *update)
// Send the event to the output logs. // Send the event to the output logs.
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
// the write to the output log succeeds) // the write to the output log succeeds)
// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to // TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in // send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now. // necessary bookkeeping we'll keep the event sending synchronous for now.
if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
return fmt.Errorf("u.api.WriteOutputEvents: %w", err) return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
}
} }
if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil { if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
return fmt.Errorf("u.updater.SetLatestEvents: %w", err) return fmt.Errorf("u.updater.SetLatestEvents: %w", err)
} }
if u.sendOutput { if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
}
} }
return nil return nil
@ -313,6 +309,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
ore := api.OutputNewRoomEvent{ ore := api.OutputNewRoomEvent{
Event: u.event.Headered(u.roomInfo.RoomVersion), Event: u.event.Headered(u.roomInfo.RoomVersion),
Historical: u.isHistorical,
LastSentEventID: u.lastEventIDSent, LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs, LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID, TransactionID: u.transactionID,

View file

@ -143,6 +143,10 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
} }
} }
if msg.Historical {
return nil
}
pduPos, err := s.db.WriteEvent( pduPos, err := s.db.WriteEvent(
ctx, ctx,
&ev, &ev,