From 3bc10f9674a34b1a4f4471022157fba96f465de2 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 9 Sep 2020 11:17:55 +0100 Subject: [PATCH] Don't generate output events when rewriting forward extremities --- roomserver/api/input.go | 4 ++ roomserver/api/wrapper.go | 2 +- roomserver/internal/input/input_events.go | 13 +++--- .../internal/input/input_latest_events.go | 42 +++++++++++-------- 4 files changed, 37 insertions(+), 24 deletions(-) diff --git a/roomserver/api/input.go b/roomserver/api/input.go index 73c4994a7..de339eabf 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -33,6 +33,10 @@ const ( // KindBackfill event extend the contiguous graph going backwards. // They always have state. KindBackfill = 3 + // KindRewrite events are used to rewrite the head of the graph. + // They are used in state, forward extremity and membership updates + // but are not sent as output events. + KindRewrite = 4 ) // DoNotSendToOtherServers tells us not to send the event to other matrix diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index 5cc6c816e..d5e4b567c 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -76,7 +76,7 @@ func SendEventWithState( continue } ires = append(ires, InputRoomEvent{ - Kind: KindNew, + Kind: KindRewrite, Event: stateEvent.Headered(event.RoomVersion), AuthEventIDs: stateEvent.AuthEventIDs(), HasState: true, diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index fd9d057b4..875986904 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -108,12 +108,13 @@ func (r *Inputer) processRoomEvent( } if err = r.updateLatestEvents( - ctx, // context - roomInfo, // room info for the room being updated - stateAtEvent, // state at event (below) - event, // event - input.SendAsServer, // send as server - input.TransactionID, // transaction ID + ctx, // context + roomInfo, // room info for the room being updated + stateAtEvent, // state at event (below) + event, // event + input.SendAsServer, // send as server + input.TransactionID, // transaction ID + input.Kind == api.KindNew, // should we send output events? ); err != nil { return "", fmt.Errorf("r.updateLatestEvents: %w", err) } diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 67a7d8a40..24d814f8c 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -54,6 +54,7 @@ func (r *Inputer) updateLatestEvents( event gomatrixserverlib.Event, sendAsServer string, transactionID *api.TransactionID, + sendOutput bool, ) (err error) { updater, err := r.DB.GetLatestEventsForUpdate(ctx, *roomInfo) if err != nil { @@ -71,6 +72,7 @@ func (r *Inputer) updateLatestEvents( event: event, sendAsServer: sendAsServer, transactionID: transactionID, + sendOutput: sendOutput, } if err = u.doUpdateLatestEvents(); err != nil { @@ -93,6 +95,7 @@ type latestEventsUpdater struct { stateAtEvent types.StateAtEvent event gomatrixserverlib.Event transactionID *api.TransactionID + sendOutput bool // Which server to send this event as. sendAsServer string // The eventID of the event that was processed before this one. @@ -178,30 +181,35 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { return fmt.Errorf("u.api.updateMemberships: %w", err) } - update, err := u.makeOutputNewRoomEvent() - if err != nil { - return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) - } - updates = append(updates, *update) + if u.sendOutput { + var update *api.OutputEvent + update, err = u.makeOutputNewRoomEvent() + if err != nil { + return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) + } + updates = append(updates, *update) - // Send the event to the output logs. - // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. - // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but - // the write to the output log succeeds) - // TODO: This assumes that writing the event to the output log is synchronous. It should be possible to - // send the event asynchronously but we would need to ensure that 1) the events are written to the log in - // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the - // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { - return fmt.Errorf("u.api.WriteOutputEvents: %w", err) + // Send the event to the output logs. + // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. + // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but + // the write to the output log succeeds) + // TODO: This assumes that writing the event to the output log is synchronous. It should be possible to + // send the event asynchronously but we would need to ensure that 1) the events are written to the log in + // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the + // necessary bookkeeping we'll keep the event sending synchronous for now. + if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil { + return fmt.Errorf("u.api.WriteOutputEvents: %w", err) + } } if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil { return fmt.Errorf("u.updater.SetLatestEvents: %w", err) } - if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { - return fmt.Errorf("u.updater.MarkEventAsSent: %w", err) + if u.sendOutput { + if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { + return fmt.Errorf("u.updater.MarkEventAsSent: %w", err) + } } return nil