Don't generate output events for rewrites, but instead notify that state is rewritten on the final new event

This commit is contained in:
Neil Alexander 2020-09-11 12:39:37 +01:00
parent 9d89328480
commit ae998c17e2
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
6 changed files with 33 additions and 28 deletions

View file

@ -86,9 +86,9 @@ const (
type OutputNewRoomEvent struct { type OutputNewRoomEvent struct {
// The Event. // The Event.
Event gomatrixserverlib.HeaderedEvent `json:"event"` Event gomatrixserverlib.HeaderedEvent `json:"event"`
// Is the event a timeline event or a state event? Defaults to timeline // Does the event completely rewrite the room state? If so, then AddsStateEventIDs
// if not specified. // will contain the entire room state.
Type OutputRoomEventType `json:"type"` RewritesState bool `json:"rewrites_state"`
// The latest events in the room after this event. // The latest events in the room after this event.
// This can be used to set the prev events for new events in the room. // This can be used to set the prev events for new events in the room.
// This also can be used to get the full current state after this event. // This also can be used to get the full current state after this event.

View file

@ -86,7 +86,7 @@ func (r *Inputer) processRoomEvent(
"event_id": event.EventID(), "event_id": event.EventID(),
"type": event.Type(), "type": event.Type(),
"room": event.RoomID(), "room": event.RoomID(),
}).Info("Stored outlier") }).Debug("Stored outlier")
return event.EventID(), nil return event.EventID(), nil
} }
@ -107,14 +107,23 @@ func (r *Inputer) processRoomEvent(
} }
} }
if input.Kind == api.KindRewrite {
logrus.WithFields(logrus.Fields{
"event_id": event.EventID(),
"type": event.Type(),
"room": event.RoomID(),
}).Debug("Stored rewrite")
return event.EventID(), nil
}
if err = r.updateLatestEvents( if err = r.updateLatestEvents(
ctx, // context ctx, // context
roomInfo, // room info for the room being updated roomInfo, // room info for the room being updated
stateAtEvent, // state at event (below) stateAtEvent, // state at event (below)
event, // event event, // event
input.SendAsServer, // send as server input.SendAsServer, // send as server
input.TransactionID, // transaction ID input.TransactionID, // transaction ID
input.Kind == api.KindRewrite, // historical input.HasState, // rewrites state?
); err != nil { ); err != nil {
return "", fmt.Errorf("r.updateLatestEvents: %w", err) return "", fmt.Errorf("r.updateLatestEvents: %w", err)
} }

View file

@ -54,7 +54,7 @@ func (r *Inputer) updateLatestEvents(
event gomatrixserverlib.Event, event gomatrixserverlib.Event,
sendAsServer string, sendAsServer string,
transactionID *api.TransactionID, transactionID *api.TransactionID,
isHistorical bool, rewritesState bool,
) (err error) { ) (err error) {
updater, err := r.DB.GetLatestEventsForUpdate(ctx, *roomInfo) updater, err := r.DB.GetLatestEventsForUpdate(ctx, *roomInfo)
if err != nil { if err != nil {
@ -72,7 +72,7 @@ func (r *Inputer) updateLatestEvents(
event: event, event: event,
sendAsServer: sendAsServer, sendAsServer: sendAsServer,
transactionID: transactionID, transactionID: transactionID,
isHistorical: isHistorical, rewritesState: rewritesState,
} }
if err = u.doUpdateLatestEvents(); err != nil { if err = u.doUpdateLatestEvents(); err != nil {
@ -95,7 +95,7 @@ type latestEventsUpdater struct {
stateAtEvent types.StateAtEvent stateAtEvent types.StateAtEvent
event gomatrixserverlib.Event event gomatrixserverlib.Event
transactionID *api.TransactionID transactionID *api.TransactionID
isHistorical bool rewritesState bool
// Which server to send this event as. // Which server to send this event as.
sendAsServer string sendAsServer string
// The eventID of the event that was processed before this one. // The eventID of the event that was processed before this one.
@ -307,14 +307,9 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
latestEventIDs[i] = u.latest[i].EventID latestEventIDs[i] = u.latest[i].EventID
} }
var outputType api.OutputRoomEventType
if u.isHistorical {
outputType = api.OutputRoomState
}
ore := api.OutputNewRoomEvent{ ore := api.OutputNewRoomEvent{
Event: u.event.Headered(u.roomInfo.RoomVersion), Event: u.event.Headered(u.roomInfo.RoomVersion),
Type: outputType, RewritesState: u.rewritesState,
LastSentEventID: u.lastEventIDSent, LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs, LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID, TransactionID: u.transactionID,

View file

@ -144,7 +144,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
} }
} }
if msg.Type == api.OutputRoomState { if msg.RewritesState {
err = s.db.RewriteState( err = s.db.RewriteState(
ctx, ctx,
&ev, &ev,
@ -155,7 +155,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
if err != nil { if err != nil {
return fmt.Errorf("s.db.RewriteState: %w", err) return fmt.Errorf("s.db.RewriteState: %w", err)
} }
return nil
} }
pduPos, err := s.db.WriteEvent( pduPos, err := s.db.WriteEvent(

View file

@ -417,7 +417,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
[]gomatrixserverlib.HeaderedEvent{}, []gomatrixserverlib.HeaderedEvent{},
[]string{}, []string{},
[]string{}, []string{},
nil, true, nil, false,
) )
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -267,12 +267,14 @@ func (d *Database) RewriteState(
} }
} }
if err := d.handleBackwardExtremities(ctx, txn, ev); err != nil { /*
return fmt.Errorf("d.handleBackwardExtremities: %w", err) if err := d.handleBackwardExtremities(ctx, txn, ev); err != nil {
} return fmt.Errorf("d.handleBackwardExtremities: %w", err)
}
*/
// TODO: is there something better here that we can do instead of giving stream position 0? // TODO: is there something better here that we can do instead of giving stream position 0?
return d.updateRoomState(ctx, txn, []string{}, addStateEvents, types.StreamPosition(0)) //return d.updateRoomState(ctx, txn, []string{}, addStateEvents, types.StreamPosition(0))
return nil
}) })
} }