diff --git a/cmd/resolve-state/main.go b/cmd/resolve-state/main.go index d6db72436..9988a2411 100644 --- a/cmd/resolve-state/main.go +++ b/cmd/resolve-state/main.go @@ -32,6 +32,7 @@ import ( var roomVersion = flag.String("roomversion", "5", "the room version to parse events as") var filterType = flag.String("filtertype", "", "the event types to filter on") var difference = flag.Bool("difference", false, "whether to calculate the difference between snapshots") +var roomID = flag.String("room_id", "", "roomID to get the state for") // dummyQuerier implements QuerySenderIDAPI. Does **NOT** do any "magic" for pseudoID rooms // to avoid having to "start" a full roomserver API. @@ -58,8 +59,6 @@ func main() { args := flag.Args() - fmt.Println("Room version", *roomVersion) - snapshotNIDs := []types.StateSnapshotNID{} for _, arg := range args { if i, err := strconv.Atoi(arg); err == nil { @@ -89,6 +88,24 @@ func main() { roomInfo := &types.RoomInfo{ RoomVersion: gomatrixserverlib.RoomVersion(*roomVersion), } + if *roomID != "" { + roomInfo, err = roomserverDB.RoomInfo(ctx, *roomID) + if err != nil { + panic(err) + } + if roomInfo == nil { + panic("no room found") + } + + snapshotNIDs, err = roomserverDB.GetAllStateSnapshots(ctx, roomInfo.RoomNID) + if err != nil { + panic(err) + } + + } + + fmt.Println("Room version", roomInfo.RoomVersion) + stateres := state.NewStateResolution(roomserverDB, roomInfo, rsAPI) fmt.Println("Fetching", len(snapshotNIDs), "snapshot NIDs") @@ -152,7 +169,8 @@ func main() { } var stateEntries []types.StateEntry - for _, snapshotNID := range snapshotNIDs { + for i, snapshotNID := range snapshotNIDs { + fmt.Printf("\r \a %d of %d", i, len(snapshotNIDs)) var entries []types.StateEntry entries, err = stateres.LoadStateAtSnapshot(ctx, snapshotNID) if err != nil { @@ -160,6 +178,7 @@ func main() { } stateEntries = append(stateEntries, entries...) } + fmt.Println() eventNIDMap := map[types.EventNID]struct{}{} for _, entry := range stateEntries { diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 0638252b2..4d758d2ca 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -30,6 +30,7 @@ import ( type Database interface { UserRoomKeys + GetAllStateSnapshots(ctx context.Context, roomNID types.RoomNID) ([]types.StateSnapshotNID, error) // Do we support processing input events for more than one room at a time? SupportsConcurrentRoomInputs() bool AssignRoomNID(ctx context.Context, roomID spec.RoomID, roomVersion gomatrixserverlib.RoomVersion) (roomNID types.RoomNID, err error) diff --git a/roomserver/storage/postgres/state_snapshot_table.go b/roomserver/storage/postgres/state_snapshot_table.go index 32ed06a13..f99615872 100644 --- a/roomserver/storage/postgres/state_snapshot_table.go +++ b/roomserver/storage/postgres/state_snapshot_table.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/lib/pq" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -114,11 +115,14 @@ WHERE re.event_id = ANY($2) ` +const getAllSnapshotsSQL = "SELECT state_snapshot_nid FROM roomserver_state_snapshots WHERE room_nid = $1" + type stateSnapshotStatements struct { insertStateStmt *sql.Stmt bulkSelectStateBlockNIDsStmt *sql.Stmt bulkSelectStateForHistoryVisibilityStmt *sql.Stmt bulktSelectMembershipForHistoryVisibilityStmt *sql.Stmt + getAllSnapshotsStmt *sql.Stmt } func CreateStateSnapshotTable(db *sql.DB) error { @@ -134,9 +138,32 @@ func PrepareStateSnapshotTable(db *sql.DB) (*stateSnapshotStatements, error) { {&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL}, {&s.bulkSelectStateForHistoryVisibilityStmt, bulkSelectStateForHistoryVisibilitySQL}, {&s.bulktSelectMembershipForHistoryVisibilityStmt, bulkSelectMembershipForHistoryVisibilitySQL}, + {&s.getAllSnapshotsStmt, getAllSnapshotsSQL}, }.Prepare(db) } +func (s *stateSnapshotStatements) GetAllStateSnapshots(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.StateSnapshotNID, error) { + stmt := sqlutil.TxStmt(txn, s.getAllSnapshotsStmt) + + rows, err := stmt.QueryContext(ctx, roomNID) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "failed to close rows") + + nids := make([]types.StateSnapshotNID, 0, 2000) + var nid types.StateSnapshotNID + + for rows.Next() { + if err := rows.Scan(&nid); err != nil { + return nil, err + } + nids = append(nids, nid) + } + + return nids, rows.Err() +} + func (s *stateSnapshotStatements) InsertState( ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, nids types.StateBlockNIDs, ) (stateNID types.StateSnapshotNID, err error) { diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go index c5c206cfb..c2d35f43b 100644 --- a/roomserver/storage/postgres/storage.go +++ b/roomserver/storage/postgres/storage.go @@ -23,6 +23,7 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -36,6 +37,10 @@ type Database struct { shared.Database } +func (d *Database) GetAllStateSnapshots(ctx context.Context, roomNID types.RoomNID) ([]types.StateSnapshotNID, error) { + return d.StateSnapshotTable.GetAllStateSnapshots(ctx, nil, roomNID) +} + // Open a postgres database. func Open(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { var d Database diff --git a/roomserver/storage/sqlite3/state_snapshot_table.go b/roomserver/storage/sqlite3/state_snapshot_table.go index 2edff0ba8..c9c49b766 100644 --- a/roomserver/storage/sqlite3/state_snapshot_table.go +++ b/roomserver/storage/sqlite3/state_snapshot_table.go @@ -181,3 +181,7 @@ func (s *stateSnapshotStatements) selectStateBlockNIDsForRoomNID( return res, rows.Err() } + +func (s *stateSnapshotStatements) GetAllStateSnapshots(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.StateSnapshotNID, error) { + return []types.StateSnapshotNID{}, fmt.Errorf("not implemented") +} diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 98d88f923..6350a1237 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -35,6 +35,10 @@ type Database struct { shared.Database } +func (d *Database) GetAllStateSnapshots(ctx context.Context, roomNID types.RoomNID) ([]types.StateSnapshotNID, error) { + return []types.StateSnapshotNID{}, fmt.Errorf("not implemented") +} + // Open a sqlite database. func Open(ctx context.Context, conMan *sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { var d Database diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 0ae064e6b..a366a9c7b 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -96,6 +96,8 @@ type StateSnapshot interface { BulkSelectMembershipForHistoryVisibility( ctx context.Context, txn *sql.Tx, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string, ) (map[string]*types.HeaderedEvent, error) + + GetAllStateSnapshots(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.StateSnapshotNID, error) } type StateBlock interface {