diff --git a/roomserver/api/input.go b/roomserver/api/input.go index a72e2d9a2..dd693203b 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -32,9 +32,6 @@ const ( // there was a new event that references an event that we don't // have a copy of. KindNew = 2 - // KindBackfill event extend the contiguous graph going backwards. - // They always have state. - KindBackfill = 3 ) // DoNotSendToOtherServers tells us not to send the event to other matrix diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 810d8cdaf..113341591 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -54,7 +54,7 @@ func (r *Inputer) processRoomEvent( } var softfail bool - if input.Kind == api.KindBackfill || input.Kind == api.KindNew { + if input.Kind == api.KindNew { // Check that the event passes authentication checks based on the // current room state. softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs) diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 2e9f3b4e4..883669538 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -28,6 +28,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) // updateLatestEvents updates the list of latest events for this room in the database and writes the @@ -146,24 +147,12 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { return fmt.Errorf("u.updater.StorePreviousEvents: %w", err) } - // Get the event reference for our new event. This will be used when - // determining if the event is referenced by an existing event. - eventReference := u.event.EventReference() - - // Check if our new event is already referenced by an existing event - // in the room. If it is then it isn't a latest event. - alreadyReferenced, err := u.updater.IsReferenced(eventReference) - if err != nil { - return fmt.Errorf("u.updater.IsReferenced: %w", err) - } - - // Work out what the latest events are. - u.latest = calculateLatest( + // Work out what the latest events are. This will include the new + // event if it is not already referenced. + u.calculateLatest( oldLatest, - alreadyReferenced, - prevEvents, types.StateAtEventAndReference{ - EventReference: eventReference, + EventReference: u.event.EventReference(), StateAtEvent: u.stateAtEvent, }, ) @@ -215,27 +204,12 @@ func (u *latestEventsUpdater) latestState() error { var err error roomState := state.NewStateResolution(u.api.DB, *u.roomInfo) - // Get a list of the current room state events if available. - var currentState []types.StateEntry - if u.roomInfo.StateSnapshotNID != 0 { - currentState, _ = roomState.LoadStateAtSnapshot(u.ctx, u.roomInfo.StateSnapshotNID) - } - - // Get a list of the current latest events. This will include both - // the current room state and the latest events after the input event. - // The idea is that we will perform state resolution on this set and - // any conflicting events will be resolved properly. - latestStateAtEvents := make([]types.StateAtEvent, len(u.latest)+len(currentState)) - offset := 0 - for i := range currentState { - latestStateAtEvents[i] = types.StateAtEvent{ - BeforeStateSnapshotNID: u.roomInfo.StateSnapshotNID, - StateEntry: currentState[i], - } - offset++ - } + // Get a list of the current latest events. This may or may not + // include the new event from the input path, depending on whether + // it is a forward extremity or not. + latestStateAtEvents := make([]types.StateAtEvent, len(u.latest)) for i := range u.latest { - latestStateAtEvents[offset+i] = u.latest[i].StateAtEvent + latestStateAtEvents[i] = u.latest[i].StateAtEvent } // Takes the NIDs of the latest events and creates a state snapshot @@ -266,6 +240,22 @@ func (u *latestEventsUpdater) latestState() error { if err != nil { return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err) } + if len(u.removed) > len(u.added) { + logrus.Infof("INVALID STATE DELTA (%d removes, %d adds)", len(u.removed), len(u.added)) + logrus.Infof("State snapshot before: %d, after: %d", u.oldStateNID, u.newStateNID) + logrus.Infof("Want to remove:") + for _, r := range u.removed { + logrus.Infof("* %+v", r) + } + logrus.Infof("Want to add:") + for _, a := range u.added { + logrus.Infof("* %+v", a) + } + return fmt.Errorf( + "event wants to remove %d state but only add %d state - this should be impossible", + len(u.removed), len(u.added), + ) + } // Also work out the state before the event removes and the event // adds. @@ -279,42 +269,46 @@ func (u *latestEventsUpdater) latestState() error { return nil } -func calculateLatest( +func (u *latestEventsUpdater) calculateLatest( oldLatest []types.StateAtEventAndReference, - alreadyReferenced bool, - prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference, -) []types.StateAtEventAndReference { - var alreadyInLatest bool +) { var newLatest []types.StateAtEventAndReference + + // First of all, let's see if any of the existing forward extremities + // now have entries in the previous events table. If they do then they + // are no longer forward extremities. for _, l := range oldLatest { - keep := true - for _, prevEvent := range prevEvents { - if l.EventID == prevEvent.EventID && bytes.Equal(l.EventSHA256, prevEvent.EventSHA256) { - // This event can be removed from the latest events cause we've found an event that references it. - // (If an event is referenced by another event then it can't be one of the latest events in the room - // because we have an event that comes after it) - keep = false - break - } + referenced, err := u.api.DB.EventIsReferenced(u.ctx, l.EventReference) + if err != nil { + logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID) } - if l.EventNID == newEvent.EventNID { - alreadyInLatest = true - } - if keep { - // Keep the event in the latest events. + if !referenced { newLatest = append(newLatest, l) } } - if !alreadyReferenced && !alreadyInLatest { - // This event is not referenced by any of the events in the room - // and the event is not already in the latest events. - // Add it to the latest events + // Then check and see if our new event is already included in that set. + for _, l := range newLatest { + if l.EventReference.EventID == newEvent.EventReference.EventID && bytes.Equal(l.EventReference.EventSHA256, newEvent.EventReference.EventSHA256) { + // We've already referenced this event so we can just return + // the newly completed extremities at this point. + u.latest = newLatest + return + } + } + + // If the new event isn't already in the set then we'll check if it + // really should be. + referenced, err := u.api.DB.EventIsReferenced(u.ctx, newEvent.EventReference) + if err != nil { + logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID) + } + if !referenced { newLatest = append(newLatest, newEvent) } - return newLatest + u.latest = newLatest } func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 10a380e85..ae51a4a41 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -87,6 +87,11 @@ type Database interface { // Lookup the event IDs for a batch of event numeric IDs. // Returns an error if the retrieval went wrong. EventIDs(ctx context.Context, eventNIDs []types.EventNID) (map[types.EventNID]string, error) + // EventIsReferenced returns true if the event is referenced by another event and false otherwise. + // This is used when working out if an event is a new forward extremity or not. + EventIsReferenced( + ctx context.Context, eventRef gomatrixserverlib.EventReference, + ) (bool, error) // Look up the latest events in a room in preparation for an update. // The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error. // Returns the latest events in the room and the last eventID sent to the log along with an updater. diff --git a/roomserver/storage/shared/latest_events_updater.go b/roomserver/storage/shared/latest_events_updater.go index 29eab0c98..cd01b1271 100644 --- a/roomserver/storage/shared/latest_events_updater.go +++ b/roomserver/storage/shared/latest_events_updater.go @@ -75,18 +75,6 @@ func (u *LatestEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previ }) } -// IsReferenced implements types.RoomRecentEventsUpdater -func (u *LatestEventsUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) { - err := u.d.PrevEventsTable.SelectPreviousEventExists(u.ctx, u.txn, eventReference.EventID, eventReference.EventSHA256) - if err == nil { - return true, nil - } - if err == sql.ErrNoRows { - return false, nil - } - return false, fmt.Errorf("u.d.PrevEventsTable.SelectPreviousEventExists: %w", err) -} - // SetLatestEvents implements types.RoomRecentEventsUpdater func (u *LatestEventsUpdater) SetLatestEvents( roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID, diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index f8e733ab7..0dbc0186a 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -186,6 +186,16 @@ func (d *Database) EventIDs( return d.EventsTable.BulkSelectEventID(ctx, eventNIDs) } +func (d *Database) EventIsReferenced( + ctx context.Context, eventRef gomatrixserverlib.EventReference, +) (bool, error) { + err := d.PrevEventsTable.SelectPreviousEventExists(ctx, nil, eventRef.EventID, eventRef.EventSHA256) + if err != nil && err != sql.ErrNoRows { + return false, err + } + return err != sql.ErrNoRows, nil +} + func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) { nidMap, err := d.EventNIDs(ctx, eventIDs) if err != nil {