dendrite/cmd/resolve-state/main.go

367 lines
11 KiB
Go
Raw Normal View History

package main
import (
"context"
"flag"
"fmt"
2022-05-30 08:38:50 -05:00
"sort"
"strconv"
2022-05-30 08:38:50 -05:00
"strings"
Ristretto cache (#2563) * Try Ristretto cache * Tweak * It's beautiful * Update GMSL * More strict keyable interface * Fix that some more * Make less panicky * Don't enforce mutability checks for now * Determine mutability using deep equality * Tweaks * Namespace keys * Make federation caches mutable * Update cost estimation, add metric * Update GMSL * Estimate cost for metrics better * Reduce counters a bit * Try caching events * Some guards * Try again * Try this * Use separate caches for hopefully better hash distribution * Fix bug with admitting events into cache * Try to fix bugs * Check nil * Try that again * Preserve order jeezo this is messy * thanks VS Code for doing exactly the wrong thing * Try this again * Be more specific * aaaaargh * One more time * That might be better * Stronger sorting * Cache expiries, async publishing of EDUs * Put it back * Use a shared cache again * Cost estimation fixes * Update ristretto * Reduce counters a bit * Clean up a bit * Update GMSL * 1GB * Configurable cache sizees * Tweaks * Add `config.DataUnit` for specifying friendly cache sizes * Various tweaks * Update GMSL * Add back some lazy loading caching * Include key in cost * Include key in cost * Tweak max age handling, config key name * Only register prometheus metrics if requested * Review comments @S7evinK * Don't return errors when creating caches (it is better just to crash since otherwise we'll `nil`-pointer exception everywhere) * Review comments * Update sample configs * Update GHA Workflow * Update Complement images to Go 1.18 * Remove the cache test from the federation API as we no longer guarantee immediate cache admission * Don't check the caches in the renewal test * Possibly fix the upgrade tests * Update to matrix-org/gomatrixserverlib#322 * Update documentation to refer to Go 1.18
2022-07-11 08:31:31 -05:00
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
2022-05-30 09:11:48 -05:00
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup"
2022-05-25 07:37:15 -05:00
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
)
// This is a utility for inspecting state snapshots and running state resolution
// against real snapshots in an actual database.
// It takes one or more state snapshot NIDs as arguments, along with a room version
// to use for unmarshalling events, and will produce resolved output.
//
// Usage: ./resolve-state --roomversion=version snapshot [snapshot ...]
// e.g. ./resolve-state --roomversion=5 1254 1235 1282
2023-10-30 02:53:02 -05:00
// e.g. ./resolve-state -room_id '!abc:localhost'
var roomVersion = flag.String("roomversion", "5", "the room version to parse events as")
2022-05-25 07:37:15 -05:00
var filterType = flag.String("filtertype", "", "the event types to filter on")
var difference = flag.Bool("difference", false, "whether to calculate the difference between snapshots")
2023-10-30 02:53:02 -05:00
var roomID = flag.String("room_id", "", "roomID to get the state for, using this flag ignores any passed snapshot NIDs and calculates the resolved state using ALL state snapshots")
var fixState = flag.Bool("fix", false, "attempt to fix the room state")
// dummyQuerier implements QuerySenderIDAPI. Does **NOT** do any "magic" for pseudoID rooms
// to avoid having to "start" a full roomserver API.
type dummyQuerier struct{}
func (d dummyQuerier) QuerySenderIDForUser(ctx context.Context, roomID spec.RoomID, userID spec.UserID) (*spec.SenderID, error) {
s := spec.SenderIDFromUserID(userID)
return &s, nil
}
func (d dummyQuerier) QueryUserIDForSender(ctx context.Context, roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return senderID.ToUserID(), nil
}
// nolint:gocyclo
func main() {
ctx := context.Background()
cfg := setup.ParseFlags(true)
2022-05-25 07:37:15 -05:00
cfg.Logging = append(cfg.Logging[:0], config.LogrusHook{
Type: "std",
Level: "error",
})
2022-06-13 04:46:59 -05:00
cfg.ClientAPI.RegistrationDisabled = true
2022-05-25 07:37:15 -05:00
args := flag.Args()
snapshotNIDs := []types.StateSnapshotNID{}
for _, arg := range args {
if i, err := strconv.Atoi(arg); err == nil {
snapshotNIDs = append(snapshotNIDs, types.StateSnapshotNID(i))
}
}
processCtx := process.NewProcessContext()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
dbOpts := cfg.RoomServer.Database
if dbOpts.ConnectionString == "" {
dbOpts = cfg.Global.DatabaseOptions
}
fmt.Println("Opening database")
Ristretto cache (#2563) * Try Ristretto cache * Tweak * It's beautiful * Update GMSL * More strict keyable interface * Fix that some more * Make less panicky * Don't enforce mutability checks for now * Determine mutability using deep equality * Tweaks * Namespace keys * Make federation caches mutable * Update cost estimation, add metric * Update GMSL * Estimate cost for metrics better * Reduce counters a bit * Try caching events * Some guards * Try again * Try this * Use separate caches for hopefully better hash distribution * Fix bug with admitting events into cache * Try to fix bugs * Check nil * Try that again * Preserve order jeezo this is messy * thanks VS Code for doing exactly the wrong thing * Try this again * Be more specific * aaaaargh * One more time * That might be better * Stronger sorting * Cache expiries, async publishing of EDUs * Put it back * Use a shared cache again * Cost estimation fixes * Update ristretto * Reduce counters a bit * Clean up a bit * Update GMSL * 1GB * Configurable cache sizees * Tweaks * Add `config.DataUnit` for specifying friendly cache sizes * Various tweaks * Update GMSL * Add back some lazy loading caching * Include key in cost * Include key in cost * Tweak max age handling, config key name * Only register prometheus metrics if requested * Review comments @S7evinK * Don't return errors when creating caches (it is better just to crash since otherwise we'll `nil`-pointer exception everywhere) * Review comments * Update sample configs * Update GHA Workflow * Update Complement images to Go 1.18 * Remove the cache test from the federation API as we no longer guarantee immediate cache admission * Don't check the caches in the renewal test * Possibly fix the upgrade tests * Update to matrix-org/gomatrixserverlib#322 * Update documentation to refer to Go 1.18
2022-07-11 08:31:31 -05:00
roomserverDB, err := storage.Open(
processCtx.Context(), cm, &dbOpts,
caching.NewRistrettoCache(8*1024*1024, time.Minute*5, caching.DisableMetrics),
Ristretto cache (#2563) * Try Ristretto cache * Tweak * It's beautiful * Update GMSL * More strict keyable interface * Fix that some more * Make less panicky * Don't enforce mutability checks for now * Determine mutability using deep equality * Tweaks * Namespace keys * Make federation caches mutable * Update cost estimation, add metric * Update GMSL * Estimate cost for metrics better * Reduce counters a bit * Try caching events * Some guards * Try again * Try this * Use separate caches for hopefully better hash distribution * Fix bug with admitting events into cache * Try to fix bugs * Check nil * Try that again * Preserve order jeezo this is messy * thanks VS Code for doing exactly the wrong thing * Try this again * Be more specific * aaaaargh * One more time * That might be better * Stronger sorting * Cache expiries, async publishing of EDUs * Put it back * Use a shared cache again * Cost estimation fixes * Update ristretto * Reduce counters a bit * Clean up a bit * Update GMSL * 1GB * Configurable cache sizees * Tweaks * Add `config.DataUnit` for specifying friendly cache sizes * Various tweaks * Update GMSL * Add back some lazy loading caching * Include key in cost * Include key in cost * Tweak max age handling, config key name * Only register prometheus metrics if requested * Review comments @S7evinK * Don't return errors when creating caches (it is better just to crash since otherwise we'll `nil`-pointer exception everywhere) * Review comments * Update sample configs * Update GHA Workflow * Update Complement images to Go 1.18 * Remove the cache test from the federation API as we no longer guarantee immediate cache admission * Don't check the caches in the renewal test * Possibly fix the upgrade tests * Update to matrix-org/gomatrixserverlib#322 * Update documentation to refer to Go 1.18
2022-07-11 08:31:31 -05:00
)
if err != nil {
panic(err)
}
rsAPI := dummyQuerier{}
roomInfo := &types.RoomInfo{
2022-05-30 09:11:48 -05:00
RoomVersion: gomatrixserverlib.RoomVersion(*roomVersion),
}
if *roomID != "" {
roomInfo, err = roomserverDB.RoomInfo(ctx, *roomID)
if err != nil {
panic(err)
}
if roomInfo == nil {
panic("no room found")
}
snapshotNIDs, err = roomserverDB.GetAllStateSnapshots(ctx, roomInfo.RoomNID)
if err != nil {
panic(err)
}
}
fmt.Println("Room version", roomInfo.RoomVersion)
stateres := state.NewStateResolution(roomserverDB, roomInfo, rsAPI)
fmt.Println("Fetching", len(snapshotNIDs), "snapshot NIDs")
if *difference {
if len(snapshotNIDs) != 2 {
panic("need exactly two state snapshot NIDs to calculate difference")
}
2022-06-13 04:46:59 -05:00
var removed, added []types.StateEntry
removed, added, err = stateres.DifferenceBetweeenStateSnapshots(ctx, snapshotNIDs[0], snapshotNIDs[1])
if err != nil {
panic(err)
}
2022-09-05 08:17:04 -05:00
eventNIDMap := map[types.EventNID]struct{}{}
for _, entry := range append(removed, added...) {
2022-09-05 08:17:04 -05:00
eventNIDMap[entry.EventNID] = struct{}{}
}
eventNIDs := make([]types.EventNID, 0, len(eventNIDMap))
for eventNID := range eventNIDMap {
eventNIDs = append(eventNIDs, eventNID)
}
2022-06-13 04:46:59 -05:00
var eventEntries []types.Event
eventEntries, err = roomserverDB.Events(ctx, roomInfo.RoomVersion, eventNIDs)
if err != nil {
panic(err)
}
events := make(map[types.EventNID]gomatrixserverlib.PDU, len(eventEntries))
for _, entry := range eventEntries {
events[entry.EventNID] = entry.PDU
}
if len(removed) > 0 {
fmt.Println("Removed:")
for _, r := range removed {
event := events[r.EventNID]
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
}
if len(removed) > 0 && len(added) > 0 {
fmt.Println()
}
if len(added) > 0 {
fmt.Println("Added:")
for _, a := range added {
event := events[a.EventNID]
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
}
return
}
2022-05-30 09:11:48 -05:00
var stateEntries []types.StateEntry
2023-10-30 02:53:02 -05:00
for i, snapshotNID := range snapshotNIDs {
fmt.Printf("\r \a %d of %d", i, len(snapshotNIDs))
2022-05-30 09:11:48 -05:00
var entries []types.StateEntry
entries, err = stateres.LoadStateAtSnapshot(ctx, snapshotNID)
if err != nil {
panic(err)
}
stateEntries = append(stateEntries, entries...)
}
fmt.Println()
2023-10-30 02:53:02 -05:00
eventNIDMap := map[types.EventNID]types.StateEntry{}
for _, entry := range stateEntries {
2023-10-30 02:53:02 -05:00
eventNIDMap[entry.EventNID] = entry
2022-09-05 08:17:04 -05:00
}
eventNIDs := make([]types.EventNID, 0, len(eventNIDMap))
for eventNID := range eventNIDMap {
eventNIDs = append(eventNIDs, eventNID)
}
2022-09-05 08:17:04 -05:00
fmt.Println("Fetching", len(eventNIDMap), "state events")
eventEntries, err := roomserverDB.Events(ctx, roomInfo.RoomVersion, eventNIDs)
if err != nil {
panic(err)
}
authEventIDMap := make(map[string]struct{})
events := make([]gomatrixserverlib.PDU, len(eventEntries))
2023-10-30 02:53:02 -05:00
eventIDNIDMap := make(map[string]types.EventNID)
for i := range eventEntries {
2023-10-30 02:53:02 -05:00
eventIDNIDMap[eventEntries[i].EventID()] = eventEntries[i].EventNID
events[i] = eventEntries[i].PDU
for _, authEventID := range eventEntries[i].AuthEventIDs() {
authEventIDMap[authEventID] = struct{}{}
}
}
authEventIDs := make([]string, 0, len(authEventIDMap))
for authEventID := range authEventIDMap {
authEventIDs = append(authEventIDs, authEventID)
}
fmt.Println("Fetching", len(authEventIDs), "auth events")
authEventEntries, err := roomserverDB.EventsFromIDs(ctx, roomInfo, authEventIDs)
if err != nil {
panic(err)
}
authEvents := make([]gomatrixserverlib.PDU, len(authEventEntries))
for i := range authEventEntries {
authEvents[i] = authEventEntries[i].PDU
}
// Get the roomNID
roomInfo, err = roomserverDB.RoomInfo(ctx, authEvents[0].RoomID().String())
if err != nil {
panic(err)
}
fmt.Println("Resolving state")
stateResStart := time.Now()
2022-05-30 08:38:50 -05:00
var resolved Events
resolved, err = gomatrixserverlib.ResolveConflicts(
gomatrixserverlib.RoomVersion(*roomVersion), events, authEvents, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
},
func(eventID string) bool {
isRejected, rejectedErr := roomserverDB.IsEventRejected(ctx, roomInfo.RoomNID, eventID)
if rejectedErr != nil {
return true
}
return isRejected
},
)
if err != nil {
panic(err)
}
fmt.Printf("Resolved state contains %d events (resolution took %s)\n", len(resolved), time.Since(stateResStart))
2022-05-30 08:38:50 -05:00
sort.Sort(resolved)
2022-05-25 07:37:15 -05:00
filteringEventType := *filterType
count := 0
for _, event := range resolved {
2022-05-25 07:37:15 -05:00
if filteringEventType != "" && event.Type() != filteringEventType {
continue
}
count++
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
2022-05-25 07:37:15 -05:00
fmt.Println()
fmt.Println("Returned", count, "state events after filtering")
2023-10-30 02:53:02 -05:00
if !*fixState {
return
}
fmt.Println()
fmt.Printf("\t\t !!! WARNING !!!\n")
fmt.Println("Attempting to fix the state of a room can make things even worse.")
fmt.Println("For the best result, please shut down Dendrite to avoid concurrent database changes.")
fmt.Println("If you have missing state events (e.g. users not in a room, missing power levels")
fmt.Println("make sure they would be added by checking the resolved state events above first (or by running without -fix).")
fmt.Println("If you are sure everything looks fine, press Return, if not, press CTRL+c.")
fmt.Scanln()
fmt.Println("Attempting to fix state")
stateEntriesResolved := make([]types.StateEntry, len(resolved))
for i := range resolved {
eventNID := eventIDNIDMap[resolved[i].EventID()]
stateEntriesResolved[i] = eventNIDMap[eventNID]
}
var succeeded bool
2023-10-30 02:53:02 -05:00
roomUpdater, err := roomserverDB.GetRoomUpdater(ctx, roomInfo)
if err != nil {
panic(err)
}
defer sqlutil.EndTransactionWithCheck(roomUpdater, &succeeded, &err)
2023-10-30 02:53:02 -05:00
latestEvents := make([]types.StateAtEventAndReference, 0, len(roomUpdater.LatestEvents()))
for _, event := range roomUpdater.LatestEvents() {
// SetLatestEvents only uses the EventNID, so populate that
latestEvents = append(latestEvents, types.StateAtEventAndReference{
StateAtEvent: types.StateAtEvent{
StateEntry: types.StateEntry{
EventNID: event.EventNID,
},
},
})
}
var lastEventSent []types.Event
lastEventSent, err = roomUpdater.EventsFromIDs(ctx, roomInfo, []string{roomUpdater.LastEventIDSent()})
2023-10-30 02:53:02 -05:00
if err != nil {
fmt.Printf("Error: %s", err)
return
2023-10-30 02:53:02 -05:00
}
if len(lastEventSent) != 1 {
fmt.Printf("Error: expected to get one event from the database but didn't, got %d", len(lastEventSent))
return
2023-10-30 02:53:02 -05:00
}
var newSnapshotNID types.StateSnapshotNID
newSnapshotNID, err = roomUpdater.AddState(ctx, roomInfo.RoomNID, nil, stateEntriesResolved)
2023-10-30 02:53:02 -05:00
if err != nil {
fmt.Printf("Error: %s", err)
return
2023-10-30 02:53:02 -05:00
}
if err = roomUpdater.SetLatestEvents(roomInfo.RoomNID, latestEvents, lastEventSent[0].EventNID, newSnapshotNID); err != nil {
fmt.Printf("Error: %s", err)
return
2023-10-30 02:53:02 -05:00
}
for _, latestEvent := range roomUpdater.LatestEvents() {
if err = roomUpdater.SetState(ctx, latestEvent.EventNID, newSnapshotNID); err != nil {
fmt.Printf("Error: %s", err)
return
}
}
succeeded = true
2023-10-30 02:53:02 -05:00
fmt.Printf("Successfully set new snapshot NID %d containing %d state events", newSnapshotNID, len(stateEntriesResolved))
}
2022-05-30 08:38:50 -05:00
type Events []gomatrixserverlib.PDU
2022-05-30 08:38:50 -05:00
func (e Events) Len() int {
return len(e)
}
func (e Events) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}
func (e Events) Less(i, j int) bool {
typeDelta := strings.Compare(e[i].Type(), e[j].Type())
if typeDelta < 0 {
return true
}
if typeDelta > 0 {
return false
}
stateKeyDelta := strings.Compare(*e[i].StateKey(), *e[j].StateKey())
return stateKeyDelta < 0
}