From ae998c17e2b215f0aaa10c96542f867825101778 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 11 Sep 2020 12:39:37 +0100 Subject: [PATCH] Don't generate output events for rewrites, but instead notify that state is rewritten on the final new event --- roomserver/api/output.go | 6 ++--- roomserver/internal/input/input_events.go | 25 +++++++++++++------ .../internal/input/input_latest_events.go | 13 +++------- syncapi/consumers/roomserver.go | 3 +-- syncapi/routing/messages.go | 2 +- syncapi/storage/shared/syncserver.go | 12 +++++---- 6 files changed, 33 insertions(+), 28 deletions(-) diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 0eaba2237..09525a20b 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -86,9 +86,9 @@ const ( type OutputNewRoomEvent struct { // The Event. Event gomatrixserverlib.HeaderedEvent `json:"event"` - // Is the event a timeline event or a state event? Defaults to timeline - // if not specified. - Type OutputRoomEventType `json:"type"` + // Does the event completely rewrite the room state? If so, then AddsStateEventIDs + // will contain the entire room state. + RewritesState bool `json:"rewrites_state"` // The latest events in the room after this event. // This can be used to set the prev events for new events in the room. // This also can be used to get the full current state after this event. diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index ec429b2dd..daf1afcd3 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -86,7 +86,7 @@ func (r *Inputer) processRoomEvent( "event_id": event.EventID(), "type": event.Type(), "room": event.RoomID(), - }).Info("Stored outlier") + }).Debug("Stored outlier") return event.EventID(), nil } @@ -107,14 +107,23 @@ func (r *Inputer) processRoomEvent( } } + if input.Kind == api.KindRewrite { + logrus.WithFields(logrus.Fields{ + "event_id": event.EventID(), + "type": event.Type(), + "room": event.RoomID(), + }).Debug("Stored rewrite") + return event.EventID(), nil + } + if err = r.updateLatestEvents( - ctx, // context - roomInfo, // room info for the room being updated - stateAtEvent, // state at event (below) - event, // event - input.SendAsServer, // send as server - input.TransactionID, // transaction ID - input.Kind == api.KindRewrite, // historical + ctx, // context + roomInfo, // room info for the room being updated + stateAtEvent, // state at event (below) + event, // event + input.SendAsServer, // send as server + input.TransactionID, // transaction ID + input.HasState, // rewrites state? ); err != nil { return "", fmt.Errorf("r.updateLatestEvents: %w", err) } diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index ce0f4ebaf..68c1d577a 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -54,7 +54,7 @@ func (r *Inputer) updateLatestEvents( event gomatrixserverlib.Event, sendAsServer string, transactionID *api.TransactionID, - isHistorical bool, + rewritesState bool, ) (err error) { updater, err := r.DB.GetLatestEventsForUpdate(ctx, *roomInfo) if err != nil { @@ -72,7 +72,7 @@ func (r *Inputer) updateLatestEvents( event: event, sendAsServer: sendAsServer, transactionID: transactionID, - isHistorical: isHistorical, + rewritesState: rewritesState, } if err = u.doUpdateLatestEvents(); err != nil { @@ -95,7 +95,7 @@ type latestEventsUpdater struct { stateAtEvent types.StateAtEvent event gomatrixserverlib.Event transactionID *api.TransactionID - isHistorical bool + rewritesState bool // Which server to send this event as. sendAsServer string // The eventID of the event that was processed before this one. @@ -307,14 +307,9 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) latestEventIDs[i] = u.latest[i].EventID } - var outputType api.OutputRoomEventType - if u.isHistorical { - outputType = api.OutputRoomState - } - ore := api.OutputNewRoomEvent{ Event: u.event.Headered(u.roomInfo.RoomVersion), - Type: outputType, + RewritesState: u.rewritesState, LastSentEventID: u.lastEventIDSent, LatestEventIDs: latestEventIDs, TransactionID: u.transactionID, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 21122abdb..ca697e425 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -144,7 +144,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } } - if msg.Type == api.OutputRoomState { + if msg.RewritesState { err = s.db.RewriteState( ctx, &ev, @@ -155,7 +155,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( if err != nil { return fmt.Errorf("s.db.RewriteState: %w", err) } - return nil } pduPos, err := s.db.WriteEvent( diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 0999d3e8c..fbe66dbd9 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -417,7 +417,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][] []gomatrixserverlib.HeaderedEvent{}, []string{}, []string{}, - nil, true, + nil, false, ) if err != nil { return nil, err diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 18f9beba0..d171eba4f 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -267,12 +267,14 @@ func (d *Database) RewriteState( } } - if err := d.handleBackwardExtremities(ctx, txn, ev); err != nil { - return fmt.Errorf("d.handleBackwardExtremities: %w", err) - } - + /* + if err := d.handleBackwardExtremities(ctx, txn, ev); err != nil { + return fmt.Errorf("d.handleBackwardExtremities: %w", err) + } + */ // TODO: is there something better here that we can do instead of giving stream position 0? - return d.updateRoomState(ctx, txn, []string{}, addStateEvents, types.StreamPosition(0)) + //return d.updateRoomState(ctx, txn, []string{}, addStateEvents, types.StreamPosition(0)) + return nil }) }