diff --git a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go index 620b1dd6b..7582b1abe 100644 --- a/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go +++ b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go @@ -34,6 +34,26 @@ func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.T return nil } +// UpSetHistoryVisibility sets the history visibility for already stored events. +func UpSetHistoryVisibility(ctx context.Context, tx *sql.Tx) error { + // get the current room history visibilities + historyVisibilities, err := currentHistoryVisibilities(ctx, tx) + if err != nil { + return err + } + + // update the history visibility + for roomID, hisVis := range historyVisibilities { + _, err = tx.ExecContext(ctx, `UPDATE syncapi_output_room_events SET history_visibility = $1 + WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2 AND history_visibility <> $1`, hisVis, roomID) + if err != nil { + return fmt.Errorf("failed to update history visibility: %w", err) + } + } + + return nil +} + func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error { _, err := tx.ExecContext(ctx, ` ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2; @@ -43,13 +63,19 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T return fmt.Errorf("failed to execute upgrade: %w", err) } - // get the current room history visibility - rows, err := tx.Query(`SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state + return nil +} + +// currentHistoryVisibilities returns a map from roomID to current history visibility. +// If the history visibility was changed after room creation, defaults to joined. +func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gomatrixserverlib.HistoryVisibility, error) { + rows, err := tx.QueryContext(ctx, `SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state WHERE type = 'm.room.history_visibility' AND state_key = ''; `) if err != nil { - return fmt.Errorf("failed to query current room state: %w", err) + return nil, fmt.Errorf("failed to query current room state: %w", err) } + defer rows.Close() // nolint: errcheck var eventBytes []byte var roomID string var event gomatrixserverlib.HeaderedEvent @@ -57,27 +83,17 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility) for rows.Next() { if err = rows.Scan(&roomID, &eventBytes); err != nil { - return fmt.Errorf("failed to scan row: %w", err) + return nil, fmt.Errorf("failed to scan row: %w", err) } if err = json.Unmarshal(eventBytes, &event); err != nil { - return fmt.Errorf("failed to unmarshal event: %w", err) + return nil, fmt.Errorf("failed to unmarshal event: %w", err) } - historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityShared - if hisVis, err = event.HistoryVisibility(); err == nil { + historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityJoined + if hisVis, err = event.HistoryVisibility(); err == nil && event.Depth() < 10 { historyVisibilities[roomID] = hisVis } } - - // update the history visibility - for roomID, hisVis = range historyVisibilities { - _, err = tx.Exec(`UPDATE syncapi_output_room_events SET history_visibility = $1 - WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2`, hisVis, roomID) - if err != nil { - return fmt.Errorf("failed to update history visibility: %w", err) - } - } - - return nil + return historyVisibilities, nil } func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error { diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 34ff6700f..2a2c664f5 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -191,10 +191,16 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { } m := sqlutil.NewMigrator(db) - m.AddMigrations(sqlutil.Migration{ - Version: "syncapi: add history visibility column (output_room_events)", - Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, - }) + m.AddMigrations( + sqlutil.Migration{ + Version: "syncapi: add history visibility column (output_room_events)", + Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, + }, + sqlutil.Migration{ + Version: "syncapi: set history visibility for existing events", + Up: deltas.UpSetHistoryVisibility, + }, + ) err = m.Up(context.Background()) if err != nil { return nil, err diff --git a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go index 04ddd7930..3fa879db4 100644 --- a/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go +++ b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go @@ -40,6 +40,26 @@ func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.T return nil } +// UpSetHistoryVisibility sets the history visibility for already stored events. +func UpSetHistoryVisibility(ctx context.Context, tx *sql.Tx) error { + // get the current room history visibilities + historyVisibilities, err := currentHistoryVisibilities(ctx, tx) + if err != nil { + return err + } + + // update the history visibility + for roomID, hisVis := range historyVisibilities { + _, err = tx.ExecContext(ctx, `UPDATE syncapi_output_room_events SET history_visibility = $1 + WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2 AND history_visibility <> $1`, hisVis, roomID) + if err != nil { + return fmt.Errorf("failed to update history visibility: %w", err) + } + } + + return nil +} + func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error { // SQLite doesn't have "if exists", so check if the column exists. If the query doesn't return an error, it already exists. // Required for unit tests, as otherwise a duplicate column error will show up. @@ -55,13 +75,19 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T return fmt.Errorf("failed to execute upgrade: %w", err) } - // get the current room history visibility - rows, err := tx.Query(`SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state + return nil +} + +// currentHistoryVisibilities returns a map from roomID to current history visibility. +// If the history visibility was changed after room creation, defaults to joined. +func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gomatrixserverlib.HistoryVisibility, error) { + rows, err := tx.QueryContext(ctx, `SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state WHERE type = 'm.room.history_visibility' AND state_key = ''; `) if err != nil { - return fmt.Errorf("failed to query current room state: %w", err) + return nil, fmt.Errorf("failed to query current room state: %w", err) } + defer rows.Close() // nolint: errcheck var eventBytes []byte var roomID string var event gomatrixserverlib.HeaderedEvent @@ -69,27 +95,17 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility) for rows.Next() { if err = rows.Scan(&roomID, &eventBytes); err != nil { - return fmt.Errorf("failed to scan row: %w", err) + return nil, fmt.Errorf("failed to scan row: %w", err) } if err = json.Unmarshal(eventBytes, &event); err != nil { - return fmt.Errorf("failed to unmarshal event: %w", err) + return nil, fmt.Errorf("failed to unmarshal event: %w", err) } - historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityShared - if hisVis, err = event.HistoryVisibility(); err == nil { + historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityJoined + if hisVis, err = event.HistoryVisibility(); err == nil && event.Depth() < 10 { historyVisibilities[roomID] = hisVis } } - - // update the history visibility - for roomID, hisVis = range historyVisibilities { - _, err = tx.Exec(`UPDATE syncapi_output_room_events SET history_visibility = $1 - WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2`, hisVis, roomID) - if err != nil { - return fmt.Errorf("failed to update history visibility: %w", err) - } - } - - return nil + return historyVisibilities, nil } func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error { diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index de389fa9b..4f44d58b1 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -139,10 +139,16 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even } m := sqlutil.NewMigrator(db) - m.AddMigrations(sqlutil.Migration{ - Version: "syncapi: add history visibility column (output_room_events)", - Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, - }) + m.AddMigrations( + sqlutil.Migration{ + Version: "syncapi: add history visibility column (output_room_events)", + Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents, + }, + sqlutil.Migration{ + Version: "syncapi: set history visibility for existing events", + Up: deltas.UpSetHistoryVisibility, + }, + ) err = m.Up(context.Background()) if err != nil { return nil, err