diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 5a036d889..fc09d7a7c 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -67,7 +67,9 @@ type Database interface { // when generating the sync stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []*gomatrixserverlib.HeaderedEvent, - addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error) + addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool, + historyVisibility gomatrixserverlib.HistoryVisibility, + ) (types.StreamPosition, error) // PurgeRoomState completely purges room state from the sync API. This is done when // receiving an output event that completely resets the state. PurgeRoomState(ctx context.Context, roomID string) error diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 8ee387b39..0be78ce45 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -104,7 +104,7 @@ const selectEventsWithEventIDsSQL = "" + // the rowsToStreamEvents expects there to be exactly six columns. We need to // figure out if these really need to be in the DB, and if so, we need a // better permanent fix for this. - neilalexander, 2 Jan 2020 - "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, 3 as history_visibility" + " FROM syncapi_current_room_state WHERE event_id = ANY($1)" type currentRoomStateStatements struct { diff --git a/syncapi/storage/postgres/deltas/20220614120000_history_visibility_column.go b/syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go similarity index 100% rename from syncapi/storage/postgres/deltas/20220614120000_history_visibility_column.go rename to syncapi/storage/postgres/deltas/2022061412000000_history_visibility_column.go diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index f7f9d2d65..64b4f2a2f 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -68,8 +68,8 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- that relates to the moment these were retrieved rather than the moment these -- were emitted. exclude_from_sync BOOL DEFAULT FALSE, - -- The history visibility before this event (0 - world_readable; 1 - shared; 2 - invited; 3 - joined) - history_visibility SMALLINT NOT NULL DEFAULT 3 + -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined) + history_visibility SMALLINT NOT NULL DEFAULT 2 ); CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type); @@ -80,16 +80,16 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" + - ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " + + "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + "ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " + "RETURNING id" const selectEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)" const selectEventsWithFilterSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)" + " AND ( $2::text[] IS NULL OR sender = ANY($2) )" + " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + @@ -98,7 +98,7 @@ const selectEventsWithFilterSQL = "" + " LIMIT $7" const selectRecentEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + @@ -107,7 +107,7 @@ const selectRecentEventsSQL = "" + " ORDER BY id DESC LIMIT $8" const selectRecentEventsForSyncSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + @@ -116,7 +116,7 @@ const selectRecentEventsForSyncSQL = "" + " ORDER BY id DESC LIMIT $8" const selectEarlyEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + @@ -132,7 +132,7 @@ const updateEventJSONSQL = "" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + - "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " AND room_id = ANY($3)" + @@ -148,10 +148,10 @@ const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" const selectContextEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" + "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" const selectContextBeforeEventSQL = "" + - "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" + + "SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + @@ -159,7 +159,7 @@ const selectContextBeforeEventSQL = "" + " ORDER BY id DESC LIMIT $3" const selectContextAfterEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" + + "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + @@ -248,14 +248,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange( for rows.Next() { var ( - eventID string - streamPos types.StreamPosition - eventBytes []byte - excludeFromSync bool - addIDs pq.StringArray - delIDs pq.StringArray + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + addIDs pq.StringArray + delIDs pq.StringArray + historyVisibility uint8 ) - if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs, &historyVisibility); err != nil { return nil, nil, err } // Sanity check for deleted state and whine if we see it. We don't need to do anything @@ -285,6 +286,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( needSet[id] = true } stateNeeded[ev.RoomID()] = needSet + ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility) eventIDToEvent[eventID] = types.StreamEvent{ HeaderedEvent: &ev, @@ -316,7 +318,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID( func (s *outputRoomEventsStatements) InsertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, - transactionID *api.TransactionID, excludeFromSync bool, + transactionID *api.TransactionID, excludeFromSync bool, historyVisibility uint8, ) (streamPos types.StreamPosition, err error) { var txnID *string var sessionID *int64 @@ -353,6 +355,7 @@ func (s *outputRoomEventsStatements) InsertEvent( sessionID, txnID, excludeFromSync, + historyVisibility, ).Scan(&streamPos) return } @@ -506,13 +509,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID) var eventAsString string - if err = row.Scan(&id, &eventAsString); err != nil { + var historyVisibility uint8 + if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil { return 0, evt, err } if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil { return 0, evt, err } + evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility) return id, evt, nil } @@ -534,15 +539,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( for rows.Next() { var ( - eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + eventBytes []byte + evt *gomatrixserverlib.HeaderedEvent + historyVisibility uint8 ) - if err = rows.Scan(&eventBytes); err != nil { + if err = rows.Scan(&eventBytes, &historyVisibility); err != nil { return evts, err } if err = json.Unmarshal(eventBytes, &evt); err != nil { return evts, err } + evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility) evts = append(evts, evt) } @@ -567,15 +574,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( for rows.Next() { var ( - eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + eventBytes []byte + evt *gomatrixserverlib.HeaderedEvent + historyVisibility uint8 ) - if err = rows.Scan(&lastID, &eventBytes); err != nil { + if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil { return 0, evts, err } if err = json.Unmarshal(eventBytes, &evt); err != nil { return 0, evts, err } + evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility) evts = append(evts, evt) } @@ -586,15 +595,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { var ( - eventID string - streamPos types.StreamPosition - eventBytes []byte - excludeFromSync bool - sessionID *int64 - txnID *string - transactionID *api.TransactionID + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + sessionID *int64 + txnID *string + transactionID *api.TransactionID + historyVisibility uint8 ) - if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil { return nil, err } // TODO: Handle redacted events @@ -609,7 +619,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { TransactionID: *txnID, } } - + ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility) result = append(result, types.StreamEvent{ HeaderedEvent: &ev, StreamPosition: streamPos, diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 4e00f22e2..2da05d1c0 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/shared" + "github.com/sirupsen/logrus" ) // SyncServerDatasource represents a sync server datasource which manages @@ -42,9 +43,8 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil { return nil, err } - accountData, err := NewPostgresAccountDataTable(d.db) - if err != nil { - return nil, err + if _, err = d.db.Exec(outputRoomEventsSchema); err != nil { + logrus.Fatalf("unable to create table: %s", err) } events, err := NewPostgresEventsTable(d.db) if err != nil { @@ -105,6 +105,10 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if err = m.RunDeltas(d.db, dbProperties); err != nil { return nil, err } + accountData, err := NewPostgresAccountDataTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer, diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index ec5edd355..8868f0e87 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -364,11 +364,12 @@ func (d *Database) WriteEvent( addStateEvents []*gomatrixserverlib.HeaderedEvent, addStateEventIDs, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool, + historyVisibility gomatrixserverlib.HistoryVisibility, ) (pduPosition types.StreamPosition, returnErr error) { returnErr = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { var err error pos, err := d.OutputEvents.InsertEvent( - ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, + ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, historyVisibility.NumericValue(), ) if err != nil { return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err) diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index f0a1c7bb7..1f49bea3a 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -88,7 +88,7 @@ const selectEventsWithEventIDsSQL = "" + // the rowsToStreamEvents expects there to be exactly six columns. We need to // figure out if these really need to be in the DB, and if so, we need a // better permanent fix for this. - neilalexander, 2 Jan 2020 - "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, 3 AS history_visibility" + " FROM syncapi_current_room_state WHERE event_id IN ($1)" type currentRoomStateStatements struct { diff --git a/syncapi/storage/sqlite3/deltas/20220614120000_history_visibility_column.go b/syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go similarity index 100% rename from syncapi/storage/sqlite3/deltas/20220614120000_history_visibility_column.go rename to syncapi/storage/sqlite3/deltas/2022061412000000_history_visibility_column.go diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 8660f3e48..da1e2221f 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -48,7 +48,7 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( session_id BIGINT, transaction_id TEXT, exclude_from_sync BOOL NOT NULL DEFAULT FALSE, - history_visibility SMALLINT NOT NULL DEFAULT 3 -- The history visibility before this event (0 - world_readable; 1 - shared; 2 - invited; 3 - joined) + history_visibility SMALLINT NOT NULL DEFAULT 2 -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined) ); CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type); @@ -59,27 +59,27 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - "id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" + - ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + - "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $13)" + "id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) " + + "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $14)" const selectEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id IN ($1)" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id IN ($1)" const selectRecentEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters const selectRecentEventsForSyncSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters const selectEarlyEventsSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters @@ -91,7 +91,7 @@ const updateEventJSONSQL = "" + "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2" const selectStateInRangeSQL = "" + - "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2)" + " AND room_id IN ($3)" + @@ -103,15 +103,15 @@ const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" const selectContextEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" + "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" const selectContextBeforeEventSQL = "" + - "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" + "SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters const selectContextAfterEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" + "SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters @@ -197,14 +197,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange( for rows.Next() { var ( - eventID string - streamPos types.StreamPosition - eventBytes []byte - excludeFromSync bool - addIDsJSON string - delIDsJSON string + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + addIDsJSON string + delIDsJSON string + historyVisibility uint8 ) - if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON, &historyVisibility); err != nil { return nil, nil, err } @@ -240,6 +241,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( needSet[id] = true } stateNeeded[ev.RoomID()] = needSet + ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility) eventIDToEvent[eventID] = types.StreamEvent{ HeaderedEvent: &ev, @@ -271,7 +273,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID( func (s *outputRoomEventsStatements) InsertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, - transactionID *api.TransactionID, excludeFromSync bool, + transactionID *api.TransactionID, excludeFromSync bool, historyVisibility uint8, ) (types.StreamPosition, error) { var txnID *string var sessionID *int64 @@ -327,6 +329,7 @@ func (s *outputRoomEventsStatements) InsertEvent( sessionID, txnID, excludeFromSync, + historyVisibility, excludeFromSync, ) return streamPos, err @@ -482,15 +485,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { var ( - eventID string - streamPos types.StreamPosition - eventBytes []byte - excludeFromSync bool - sessionID *int64 - txnID *string - transactionID *api.TransactionID + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + sessionID *int64 + txnID *string + transactionID *api.TransactionID + historyVisibility uint8 ) - if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil { return nil, err } // TODO: Handle redacted events @@ -506,6 +510,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { } } + ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility) + result = append(result, types.StreamEvent{ HeaderedEvent: &ev, StreamPosition: streamPos, @@ -520,13 +526,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent( ) (id int, evt gomatrixserverlib.HeaderedEvent, err error) { row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID) var eventAsString string - if err = row.Scan(&id, &eventAsString); err != nil { + var historyVisibility uint8 + if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil { return 0, evt, err } if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil { return 0, evt, err } + evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility) return id, evt, nil } @@ -551,15 +559,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( for rows.Next() { var ( - eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + eventBytes []byte + evt *gomatrixserverlib.HeaderedEvent + historyVisibility uint8 ) - if err = rows.Scan(&eventBytes); err != nil { + if err = rows.Scan(&eventBytes, &historyVisibility); err != nil { return evts, err } if err = json.Unmarshal(eventBytes, &evt); err != nil { return evts, err } + evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility) evts = append(evts, evt) } @@ -587,15 +597,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( for rows.Next() { var ( - eventBytes []byte - evt *gomatrixserverlib.HeaderedEvent + eventBytes []byte + evt *gomatrixserverlib.HeaderedEvent + historyVisibility uint8 ) - if err = rows.Scan(&lastID, &eventBytes); err != nil { + if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil { return 0, evts, err } if err = json.Unmarshal(eventBytes, &evt); err != nil { return 0, evts, err } + evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility) evts = append(evts, evt) } return lastID, evts, rows.Err() diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 16ae92bbe..69fa4ef74 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" + "github.com/sirupsen/logrus" ) // SyncServerDatasource represents a sync server datasource which manages @@ -52,11 +53,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er if err = d.streamID.Prepare(d.db); err != nil { return err } - accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID) - if err != nil { - return err + if _, err = d.db.Exec(outputRoomEventsSchema); err != nil { + logrus.Fatalf("unable to create table: %s", err) } - events, err := NewSqliteEventsTable(d.db, &d.streamID) + accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID) if err != nil { return err } @@ -115,6 +115,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er if err = m.RunDeltas(d.db, dbProperties); err != nil { return err } + events, err := NewSqliteEventsTable(d.db, &d.streamID) + if err != nil { + return err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer, diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 563c92e34..d72592a24 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -35,7 +35,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserver addStateEvents = append(addStateEvents, ev) addStateEventIDs = append(addStateEventIDs, ev.EventID()) } - pos, err := db.WriteEvent(ctx, ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false) + pos, err := db.WriteEvent(ctx, ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false, gomatrixserverlib.HistoryVisibilityShared) if err != nil { t.Fatalf("WriteEvent failed: %s", err) } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index ccdebfdbd..ea43ef941 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -52,7 +52,14 @@ type Peeks interface { type Events interface { SelectStateInRange(ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter, roomIDs []string) (map[string]map[string]bool, map[string]types.StreamEvent, error) SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error) - InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error) + InsertEvent( + ctx context.Context, txn *sql.Tx, + event *gomatrixserverlib.HeaderedEvent, + addState, removeState []string, + transactionID *api.TransactionID, + excludeFromSync bool, + historyVisibility uint8, + ) (streamPos types.StreamPosition, err error) // SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync. // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`. diff --git a/syncapi/storage/tables/output_room_events_test.go b/syncapi/storage/tables/output_room_events_test.go index 69bbd04c9..ab7d85414 100644 --- a/syncapi/storage/tables/output_room_events_test.go +++ b/syncapi/storage/tables/output_room_events_test.go @@ -53,7 +53,7 @@ func TestOutputRoomEventsTable(t *testing.T) { events := room.Events() err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error { for _, ev := range events { - _, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false) + _, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false, 2) if err != nil { return fmt.Errorf("failed to InsertEvent: %s", err) } @@ -79,7 +79,7 @@ func TestOutputRoomEventsTable(t *testing.T) { "body": "test.txt", "url": "mxc://test.txt", }) - if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false); err != nil { + if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false, 2); err != nil { return fmt.Errorf("failed to InsertEvent: %s", err) } wantEventID := []string{urlEv.EventID()}