mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-23 14:53:10 -06:00
Deep forward extremity calculation
This commit is contained in:
parent
f7c15071de
commit
cc4f5fb531
|
|
@ -32,9 +32,6 @@ const (
|
||||||
// there was a new event that references an event that we don't
|
// there was a new event that references an event that we don't
|
||||||
// have a copy of.
|
// have a copy of.
|
||||||
KindNew = 2
|
KindNew = 2
|
||||||
// KindBackfill event extend the contiguous graph going backwards.
|
|
||||||
// They always have state.
|
|
||||||
KindBackfill = 3
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// DoNotSendToOtherServers tells us not to send the event to other matrix
|
// DoNotSendToOtherServers tells us not to send the event to other matrix
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
var softfail bool
|
var softfail bool
|
||||||
if input.Kind == api.KindBackfill || input.Kind == api.KindNew {
|
if input.Kind == api.KindNew {
|
||||||
// Check that the event passes authentication checks based on the
|
// Check that the event passes authentication checks based on the
|
||||||
// current room state.
|
// current room state.
|
||||||
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
|
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// updateLatestEvents updates the list of latest events for this room in the database and writes the
|
// updateLatestEvents updates the list of latest events for this room in the database and writes the
|
||||||
|
|
@ -146,24 +147,12 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
return fmt.Errorf("u.updater.StorePreviousEvents: %w", err)
|
return fmt.Errorf("u.updater.StorePreviousEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the event reference for our new event. This will be used when
|
// Work out what the latest events are. This will include the new
|
||||||
// determining if the event is referenced by an existing event.
|
// event if it is not already referenced.
|
||||||
eventReference := u.event.EventReference()
|
u.calculateLatest(
|
||||||
|
|
||||||
// Check if our new event is already referenced by an existing event
|
|
||||||
// in the room. If it is then it isn't a latest event.
|
|
||||||
alreadyReferenced, err := u.updater.IsReferenced(eventReference)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("u.updater.IsReferenced: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Work out what the latest events are.
|
|
||||||
u.latest = calculateLatest(
|
|
||||||
oldLatest,
|
oldLatest,
|
||||||
alreadyReferenced,
|
|
||||||
prevEvents,
|
|
||||||
types.StateAtEventAndReference{
|
types.StateAtEventAndReference{
|
||||||
EventReference: eventReference,
|
EventReference: u.event.EventReference(),
|
||||||
StateAtEvent: u.stateAtEvent,
|
StateAtEvent: u.stateAtEvent,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
@ -215,27 +204,12 @@ func (u *latestEventsUpdater) latestState() error {
|
||||||
var err error
|
var err error
|
||||||
roomState := state.NewStateResolution(u.api.DB, *u.roomInfo)
|
roomState := state.NewStateResolution(u.api.DB, *u.roomInfo)
|
||||||
|
|
||||||
// Get a list of the current room state events if available.
|
// Get a list of the current latest events. This may or may not
|
||||||
var currentState []types.StateEntry
|
// include the new event from the input path, depending on whether
|
||||||
if u.roomInfo.StateSnapshotNID != 0 {
|
// it is a forward extremity or not.
|
||||||
currentState, _ = roomState.LoadStateAtSnapshot(u.ctx, u.roomInfo.StateSnapshotNID)
|
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
|
||||||
}
|
|
||||||
|
|
||||||
// Get a list of the current latest events. This will include both
|
|
||||||
// the current room state and the latest events after the input event.
|
|
||||||
// The idea is that we will perform state resolution on this set and
|
|
||||||
// any conflicting events will be resolved properly.
|
|
||||||
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest)+len(currentState))
|
|
||||||
offset := 0
|
|
||||||
for i := range currentState {
|
|
||||||
latestStateAtEvents[i] = types.StateAtEvent{
|
|
||||||
BeforeStateSnapshotNID: u.roomInfo.StateSnapshotNID,
|
|
||||||
StateEntry: currentState[i],
|
|
||||||
}
|
|
||||||
offset++
|
|
||||||
}
|
|
||||||
for i := range u.latest {
|
for i := range u.latest {
|
||||||
latestStateAtEvents[offset+i] = u.latest[i].StateAtEvent
|
latestStateAtEvents[i] = u.latest[i].StateAtEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
// Takes the NIDs of the latest events and creates a state snapshot
|
// Takes the NIDs of the latest events and creates a state snapshot
|
||||||
|
|
@ -266,6 +240,22 @@ func (u *latestEventsUpdater) latestState() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
|
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
|
||||||
}
|
}
|
||||||
|
if len(u.removed) > len(u.added) {
|
||||||
|
logrus.Infof("INVALID STATE DELTA (%d removes, %d adds)", len(u.removed), len(u.added))
|
||||||
|
logrus.Infof("State snapshot before: %d, after: %d", u.oldStateNID, u.newStateNID)
|
||||||
|
logrus.Infof("Want to remove:")
|
||||||
|
for _, r := range u.removed {
|
||||||
|
logrus.Infof("* %+v", r)
|
||||||
|
}
|
||||||
|
logrus.Infof("Want to add:")
|
||||||
|
for _, a := range u.added {
|
||||||
|
logrus.Infof("* %+v", a)
|
||||||
|
}
|
||||||
|
return fmt.Errorf(
|
||||||
|
"event wants to remove %d state but only add %d state - this should be impossible",
|
||||||
|
len(u.removed), len(u.added),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// Also work out the state before the event removes and the event
|
// Also work out the state before the event removes and the event
|
||||||
// adds.
|
// adds.
|
||||||
|
|
@ -279,42 +269,46 @@ func (u *latestEventsUpdater) latestState() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func calculateLatest(
|
func (u *latestEventsUpdater) calculateLatest(
|
||||||
oldLatest []types.StateAtEventAndReference,
|
oldLatest []types.StateAtEventAndReference,
|
||||||
alreadyReferenced bool,
|
|
||||||
prevEvents []gomatrixserverlib.EventReference,
|
|
||||||
newEvent types.StateAtEventAndReference,
|
newEvent types.StateAtEventAndReference,
|
||||||
) []types.StateAtEventAndReference {
|
) {
|
||||||
var alreadyInLatest bool
|
|
||||||
var newLatest []types.StateAtEventAndReference
|
var newLatest []types.StateAtEventAndReference
|
||||||
|
|
||||||
|
// First of all, let's see if any of the existing forward extremities
|
||||||
|
// now have entries in the previous events table. If they do then they
|
||||||
|
// are no longer forward extremities.
|
||||||
for _, l := range oldLatest {
|
for _, l := range oldLatest {
|
||||||
keep := true
|
referenced, err := u.api.DB.EventIsReferenced(u.ctx, l.EventReference)
|
||||||
for _, prevEvent := range prevEvents {
|
if err != nil {
|
||||||
if l.EventID == prevEvent.EventID && bytes.Equal(l.EventSHA256, prevEvent.EventSHA256) {
|
logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID)
|
||||||
// This event can be removed from the latest events cause we've found an event that references it.
|
|
||||||
// (If an event is referenced by another event then it can't be one of the latest events in the room
|
|
||||||
// because we have an event that comes after it)
|
|
||||||
keep = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if l.EventNID == newEvent.EventNID {
|
if !referenced {
|
||||||
alreadyInLatest = true
|
|
||||||
}
|
|
||||||
if keep {
|
|
||||||
// Keep the event in the latest events.
|
|
||||||
newLatest = append(newLatest, l)
|
newLatest = append(newLatest, l)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !alreadyReferenced && !alreadyInLatest {
|
// Then check and see if our new event is already included in that set.
|
||||||
// This event is not referenced by any of the events in the room
|
for _, l := range newLatest {
|
||||||
// and the event is not already in the latest events.
|
if l.EventReference.EventID == newEvent.EventReference.EventID && bytes.Equal(l.EventReference.EventSHA256, newEvent.EventReference.EventSHA256) {
|
||||||
// Add it to the latest events
|
// We've already referenced this event so we can just return
|
||||||
|
// the newly completed extremities at this point.
|
||||||
|
u.latest = newLatest
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the new event isn't already in the set then we'll check if it
|
||||||
|
// really should be.
|
||||||
|
referenced, err := u.api.DB.EventIsReferenced(u.ctx, newEvent.EventReference)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID)
|
||||||
|
}
|
||||||
|
if !referenced {
|
||||||
newLatest = append(newLatest, newEvent)
|
newLatest = append(newLatest, newEvent)
|
||||||
}
|
}
|
||||||
|
|
||||||
return newLatest
|
u.latest = newLatest
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
|
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
|
||||||
|
|
|
||||||
|
|
@ -87,6 +87,11 @@ type Database interface {
|
||||||
// Lookup the event IDs for a batch of event numeric IDs.
|
// Lookup the event IDs for a batch of event numeric IDs.
|
||||||
// Returns an error if the retrieval went wrong.
|
// Returns an error if the retrieval went wrong.
|
||||||
EventIDs(ctx context.Context, eventNIDs []types.EventNID) (map[types.EventNID]string, error)
|
EventIDs(ctx context.Context, eventNIDs []types.EventNID) (map[types.EventNID]string, error)
|
||||||
|
// EventIsReferenced returns true if the event is referenced by another event and false otherwise.
|
||||||
|
// This is used when working out if an event is a new forward extremity or not.
|
||||||
|
EventIsReferenced(
|
||||||
|
ctx context.Context, eventRef gomatrixserverlib.EventReference,
|
||||||
|
) (bool, error)
|
||||||
// Look up the latest events in a room in preparation for an update.
|
// Look up the latest events in a room in preparation for an update.
|
||||||
// The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error.
|
// The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error.
|
||||||
// Returns the latest events in the room and the last eventID sent to the log along with an updater.
|
// Returns the latest events in the room and the last eventID sent to the log along with an updater.
|
||||||
|
|
|
||||||
|
|
@ -75,18 +75,6 @@ func (u *LatestEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previ
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsReferenced implements types.RoomRecentEventsUpdater
|
|
||||||
func (u *LatestEventsUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) {
|
|
||||||
err := u.d.PrevEventsTable.SelectPreviousEventExists(u.ctx, u.txn, eventReference.EventID, eventReference.EventSHA256)
|
|
||||||
if err == nil {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
return false, fmt.Errorf("u.d.PrevEventsTable.SelectPreviousEventExists: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetLatestEvents implements types.RoomRecentEventsUpdater
|
// SetLatestEvents implements types.RoomRecentEventsUpdater
|
||||||
func (u *LatestEventsUpdater) SetLatestEvents(
|
func (u *LatestEventsUpdater) SetLatestEvents(
|
||||||
roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID,
|
roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID,
|
||||||
|
|
|
||||||
|
|
@ -186,6 +186,16 @@ func (d *Database) EventIDs(
|
||||||
return d.EventsTable.BulkSelectEventID(ctx, eventNIDs)
|
return d.EventsTable.BulkSelectEventID(ctx, eventNIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) EventIsReferenced(
|
||||||
|
ctx context.Context, eventRef gomatrixserverlib.EventReference,
|
||||||
|
) (bool, error) {
|
||||||
|
err := d.PrevEventsTable.SelectPreviousEventExists(ctx, nil, eventRef.EventID, eventRef.EventSHA256)
|
||||||
|
if err != nil && err != sql.ErrNoRows {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return err != sql.ErrNoRows, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
|
func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
|
||||||
nidMap, err := d.EventNIDs(ctx, eventIDs)
|
nidMap, err := d.EventNIDs(ctx, eventIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue