Fix bugs related to state resolution

This commit is contained in:
Neil Alexander 2022-05-31 15:40:00 +01:00
parent ae7b6dd516
commit 5095457a46
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
6 changed files with 105 additions and 37 deletions

View file

@ -28,6 +28,7 @@ import (
var roomVersion = flag.String("roomversion", "5", "the room version to parse events as")
var filterType = flag.String("filtertype", "", "the event types to filter on")
var difference = flag.Bool("difference", false, "whether to calculate the difference between snapshots")
func main() {
ctx := context.Background()
@ -64,6 +65,58 @@ func main() {
RoomVersion: gomatrixserverlib.RoomVersion(*roomVersion),
})
if *difference {
if len(snapshotNIDs) != 2 {
panic("need exactly two state snapshot NIDs to calculate difference")
}
removed, added, err := stateres.DifferenceBetweeenStateSnapshots(ctx, snapshotNIDs[0], snapshotNIDs[1])
if err != nil {
panic(err)
}
var eventNIDs []types.EventNID
for _, entry := range append(removed, added...) {
eventNIDs = append(eventNIDs, entry.EventNID)
}
eventEntries, err := roomserverDB.Events(ctx, eventNIDs)
if err != nil {
panic(err)
}
events := make(map[types.EventNID]*gomatrixserverlib.Event, len(eventEntries))
for _, entry := range eventEntries {
events[entry.EventNID] = entry.Event
}
if len(removed) > 0 {
fmt.Println("Removed:")
for _, r := range removed {
event := events[r.EventNID]
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
}
if len(removed) > 0 && len(added) > 0 {
fmt.Println()
}
if len(added) > 0 {
fmt.Println("Added:")
for _, a := range added {
event := events[a.EventNID]
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
}
return
}
var stateEntries []types.StateEntry
for _, snapshotNID := range snapshotNIDs {
var entries []types.StateEntry

2
go.mod
View file

@ -34,7 +34,7 @@ require (
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220530084946-3a4b148706bc
github.com/matrix-org/gomatrixserverlib v0.0.0-20220531143527-a7d16af5affc
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.13

4
go.sum
View file

@ -418,8 +418,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220530084946-3a4b148706bc h1:58tT3VznINdRWimb3yYb8QWmTAHX9AAuyOwzdmrp9q4=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220530084946-3a4b148706bc/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220531143527-a7d16af5affc h1:TkWOX+RUAEduShhRxmB+tyCg+qr/SK7CVaYqp/3vRTw=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220531143527-a7d16af5affc/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE=
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -206,7 +206,7 @@ func (u *latestEventsUpdater) latestState() error {
// Work out if the state at the extremities has actually changed
// or not. If they haven't then we won't bother doing all of the
// hard work.
if u.event.StateKey() == nil {
if !u.stateAtEvent.IsStateEvent() {
stateChanged := false
oldStateNIDs := make([]types.StateSnapshotNID, 0, len(u.oldLatest))
newStateNIDs := make([]types.StateSnapshotNID, 0, len(u.latest))

View file

@ -39,6 +39,7 @@ type StateResolutionStorage interface {
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
}
type StateResolution struct {
@ -659,15 +660,13 @@ func (v *StateResolution) calculateStateAfterManyEvents(
}
// Collect all the entries with the same type and key together.
// We don't care about the order here because the conflict resolution
// algorithm doesn't depend on the order of the prev events.
// Remove duplicate entires.
// This is done so findDuplicateStateKeys can work in groups.
// We remove duplicates (same type, state key and event NID) too.
combined = combined[:util.SortAndUnique(stateEntrySorter(combined))]
// Find the conflicts
conflicts := findDuplicateStateKeys(combined)
if len(conflicts) > 0 {
if conflicts := findDuplicateStateKeys(combined); len(conflicts) > 0 {
conflictMap := stateEntryMap(conflicts)
conflictLength = len(conflicts)
// 5) There are conflicting state events, for each conflict workout
@ -676,7 +675,7 @@ func (v *StateResolution) calculateStateAfterManyEvents(
// Work out which entries aren't conflicted.
var notConflicted []types.StateEntry
for _, entry := range combined {
if _, ok := stateEntryMap(conflicts).lookup(entry.StateKeyTuple); !ok {
if _, ok := conflictMap.lookup(entry.StateKeyTuple); !ok {
notConflicted = append(notConflicted, entry)
}
}
@ -689,7 +688,7 @@ func (v *StateResolution) calculateStateAfterManyEvents(
return
}
algorithm = "full_state_with_conflicts"
state = resolved[:util.SortAndUnique(stateEntrySorter(resolved))]
state = resolved
} else {
algorithm = "full_state_no_conflicts"
// 6) There weren't any conflicts
@ -818,36 +817,12 @@ func (v *StateResolution) resolveConflictsV2(
authDifference := make([]*gomatrixserverlib.Event, 0, estimate)
// For each conflicted event, let's try and get the needed auth events.
neededStateKeys := make([]string, 16)
authEntries := make([]types.StateEntry, 16)
for _, conflictedEvent := range conflictedEvents {
// Work out which auth events we need to load.
key := conflictedEvent.EventID()
needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{conflictedEvent})
// Find the numeric IDs for the necessary state keys.
neededStateKeys = neededStateKeys[:0]
neededStateKeys = append(neededStateKeys, needed.Member...)
neededStateKeys = append(neededStateKeys, needed.ThirdPartyInvite...)
stateKeyNIDMap, err := v.db.EventStateKeyNIDs(ctx, neededStateKeys)
if err != nil {
return nil, err
}
// Load the necessary auth events.
tuplesNeeded := v.stateKeyTuplesNeeded(stateKeyNIDMap, needed)
authEntries = authEntries[:0]
for _, tuple := range tuplesNeeded {
if eventNID, ok := stateEntryMap(notConflicted).lookup(tuple); ok {
authEntries = append(authEntries, types.StateEntry{
StateKeyTuple: tuple,
EventNID: eventNID,
})
}
}
// Store the newly found auth events in the auth set for this event.
authSets[key], _, err = v.loadStateEvents(ctx, authEntries)
authSets[key], err = v.loadAuthEvents(ctx, conflictedEvent)
if err != nil {
return nil, err
}
@ -996,6 +971,42 @@ func (v *StateResolution) loadStateEvents(
return result, eventIDMap, nil
}
// loadAuthEvents loads all of the auth events for a given event recursively.
func (v *StateResolution) loadAuthEvents(
ctx context.Context, event *gomatrixserverlib.Event,
) ([]*gomatrixserverlib.Event, error) {
eventMap := map[string]struct{}{}
var getEvents func(eventIDs []string) ([]*gomatrixserverlib.Event, error)
getEvents = func(eventIDs []string) ([]*gomatrixserverlib.Event, error) {
lookup := make([]string, 0, len(event.AuthEventIDs()))
for _, eventID := range eventIDs {
if _, ok := eventMap[eventID]; ok {
continue
}
lookup = append(lookup, eventID)
}
if len(lookup) == 0 {
return nil, nil
}
events, err := v.db.EventsFromIDs(ctx, lookup)
if err != nil {
return nil, fmt.Errorf("v.db.EventsFromIDs: %w", err)
}
result := make([]*gomatrixserverlib.Event, 0, len(events))
for _, event := range events {
result = append(result, event.Event)
eventMap[event.EventID()] = struct{}{}
next, err := getEvents(event.AuthEventIDs())
if err != nil {
return nil, err
}
result = append(result, next...)
}
return result, nil
}
return getEvents(event.AuthEventIDs())
}
// findDuplicateStateKeys finds the state entries where the state key tuple appears more than once in a sorted list.
// Returns a sorted list of those state entries.
func findDuplicateStateKeys(a []types.StateEntry) []types.StateEntry {

View file

@ -192,6 +192,10 @@ func (u *RoomUpdater) StateAtEventIDs(
return u.d.EventsTable.BulkSelectStateAtEventByID(ctx, u.txn, eventIDs)
}
func (u *RoomUpdater) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, false)
}
func (u *RoomUpdater) UnsentEventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, true)
}