Don't generate output events when rewriting forward extremities

This commit is contained in:
Neil Alexander 2020-09-09 11:17:55 +01:00
parent fdb3480cc2
commit 3bc10f9674
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 37 additions and 24 deletions

View file

@ -33,6 +33,10 @@ const (
// KindBackfill event extend the contiguous graph going backwards. // KindBackfill event extend the contiguous graph going backwards.
// They always have state. // They always have state.
KindBackfill = 3 KindBackfill = 3
// KindRewrite events are used to rewrite the head of the graph.
// They are used in state, forward extremity and membership updates
// but are not sent as output events.
KindRewrite = 4
) )
// DoNotSendToOtherServers tells us not to send the event to other matrix // DoNotSendToOtherServers tells us not to send the event to other matrix

View file

@ -76,7 +76,7 @@ func SendEventWithState(
continue continue
} }
ires = append(ires, InputRoomEvent{ ires = append(ires, InputRoomEvent{
Kind: KindNew, Kind: KindRewrite,
Event: stateEvent.Headered(event.RoomVersion), Event: stateEvent.Headered(event.RoomVersion),
AuthEventIDs: stateEvent.AuthEventIDs(), AuthEventIDs: stateEvent.AuthEventIDs(),
HasState: true, HasState: true,

View file

@ -108,12 +108,13 @@ func (r *Inputer) processRoomEvent(
} }
if err = r.updateLatestEvents( if err = r.updateLatestEvents(
ctx, // context ctx, // context
roomInfo, // room info for the room being updated roomInfo, // room info for the room being updated
stateAtEvent, // state at event (below) stateAtEvent, // state at event (below)
event, // event event, // event
input.SendAsServer, // send as server input.SendAsServer, // send as server
input.TransactionID, // transaction ID input.TransactionID, // transaction ID
input.Kind == api.KindNew, // should we send output events?
); err != nil { ); err != nil {
return "", fmt.Errorf("r.updateLatestEvents: %w", err) return "", fmt.Errorf("r.updateLatestEvents: %w", err)
} }

View file

@ -54,6 +54,7 @@ func (r *Inputer) updateLatestEvents(
event gomatrixserverlib.Event, event gomatrixserverlib.Event,
sendAsServer string, sendAsServer string,
transactionID *api.TransactionID, transactionID *api.TransactionID,
sendOutput bool,
) (err error) { ) (err error) {
updater, err := r.DB.GetLatestEventsForUpdate(ctx, *roomInfo) updater, err := r.DB.GetLatestEventsForUpdate(ctx, *roomInfo)
if err != nil { if err != nil {
@ -71,6 +72,7 @@ func (r *Inputer) updateLatestEvents(
event: event, event: event,
sendAsServer: sendAsServer, sendAsServer: sendAsServer,
transactionID: transactionID, transactionID: transactionID,
sendOutput: sendOutput,
} }
if err = u.doUpdateLatestEvents(); err != nil { if err = u.doUpdateLatestEvents(); err != nil {
@ -93,6 +95,7 @@ type latestEventsUpdater struct {
stateAtEvent types.StateAtEvent stateAtEvent types.StateAtEvent
event gomatrixserverlib.Event event gomatrixserverlib.Event
transactionID *api.TransactionID transactionID *api.TransactionID
sendOutput bool
// Which server to send this event as. // Which server to send this event as.
sendAsServer string sendAsServer string
// The eventID of the event that was processed before this one. // The eventID of the event that was processed before this one.
@ -178,30 +181,35 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
return fmt.Errorf("u.api.updateMemberships: %w", err) return fmt.Errorf("u.api.updateMemberships: %w", err)
} }
update, err := u.makeOutputNewRoomEvent() if u.sendOutput {
if err != nil { var update *api.OutputEvent
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) update, err = u.makeOutputNewRoomEvent()
} if err != nil {
updates = append(updates, *update) return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
}
updates = append(updates, *update)
// Send the event to the output logs. // Send the event to the output logs.
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
// the write to the output log succeeds) // the write to the output log succeeds)
// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to // TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in // send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now. // necessary bookkeeping we'll keep the event sending synchronous for now.
if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
return fmt.Errorf("u.api.WriteOutputEvents: %w", err) return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
}
} }
if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil { if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
return fmt.Errorf("u.updater.SetLatestEvents: %w", err) return fmt.Errorf("u.updater.SetLatestEvents: %w", err)
} }
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { if u.sendOutput {
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err) if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
}
} }
return nil return nil