diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 76f8298ca..2e860137d 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -445,7 +445,7 @@ type QueryMembershipAtEventRequest struct { type QueryMembershipAtEventResponse struct { // Memberships is a map from eventID to a list of events (if any). Events that // do not have known state will return an empty array here. - Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"` + Memberships map[string]*gomatrixserverlib.HeaderedEvent `json:"memberships"` } // QueryLeftUsersRequest is a request to calculate users that we (the server) don't share a diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 69d841dda..177c1352d 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -216,7 +216,8 @@ func (r *Queryer) QueryMembershipAtEvent( request *api.QueryMembershipAtEventRequest, response *api.QueryMembershipAtEventResponse, ) error { - response.Memberships = make(map[string][]*gomatrixserverlib.HeaderedEvent) + response.Memberships = make(map[string]*gomatrixserverlib.HeaderedEvent) + info, err := r.DB.RoomInfo(ctx, request.RoomID) if err != nil { return fmt.Errorf("unable to get roomInfo: %w", err) @@ -225,7 +226,6 @@ func (r *Queryer) QueryMembershipAtEvent( return fmt.Errorf("no roomInfo found") } - // get the users stateKeyNID stateKeyNIDs, err := r.DB.EventStateKeyNIDs(ctx, []string{request.UserID}) if err != nil { return fmt.Errorf("unable to get stateKeyNIDs for %s: %w", request.UserID, err) @@ -234,59 +234,78 @@ func (r *Queryer) QueryMembershipAtEvent( return fmt.Errorf("requested stateKeyNID for %s was not found", request.UserID) } - stateEntries, err := helpers.MembershipAtEvent(ctx, r.DB, info, request.EventIDs, stateKeyNIDs[request.UserID]) - if err != nil { - return fmt.Errorf("unable to get state before event: %w", err) + response.Memberships, err = r.DB.GetMembershipForHistoryVisibility(ctx, stateKeyNIDs[request.UserID], info, request.EventIDs...) + switch err { + case nil: + return nil + //case tables.OptimisationNotSupportedError: // fallthrough + default: + return err } - // If we only have one or less state entries, we can short circuit the below - // loop and avoid hitting the database - allStateEventNIDs := make(map[types.EventNID]types.StateEntry) - for _, eventID := range request.EventIDs { - stateEntry := stateEntries[eventID] - for _, s := range stateEntry { - allStateEventNIDs[s.EventNID] = s + /* + // get the users stateKeyNID + stateKeyNIDs, err := r.DB.EventStateKeyNIDs(ctx, []string{request.UserID}) + if err != nil { + return fmt.Errorf("unable to get stateKeyNIDs for %s: %w", request.UserID, err) } - } - - var canShortCircuit bool - if len(allStateEventNIDs) <= 1 { - canShortCircuit = true - } - - var memberships []types.Event - for _, eventID := range request.EventIDs { - stateEntry, ok := stateEntries[eventID] - if !ok || len(stateEntry) == 0 { - response.Memberships[eventID] = []*gomatrixserverlib.HeaderedEvent{} - continue + if _, ok := stateKeyNIDs[request.UserID]; !ok { + return fmt.Errorf("requested stateKeyNID for %s was not found", request.UserID) } - // If we can short circuit, e.g. we only have 0 or 1 membership events, we only get the memberships - // once. If we have more than one membership event, we need to get the state for each state entry. - if canShortCircuit { - if len(memberships) == 0 { + stateEntries, err := helpers.MembershipAtEvent(ctx, r.DB, nil, request.EventIDs, stateKeyNIDs[request.UserID]) + if err != nil { + return fmt.Errorf("unable to get state before event: %w", err) + } + + // If we only have one or less state entries, we can short circuit the below + // loop and avoid hitting the database + allStateEventNIDs := make(map[types.EventNID]types.StateEntry) + for _, eventID := range request.EventIDs { + stateEntry := stateEntries[eventID] + for _, s := range stateEntry { + allStateEventNIDs[s.EventNID] = s + } + } + + var canShortCircuit bool + if len(allStateEventNIDs) <= 1 { + canShortCircuit = true + } + + var memberships []types.Event + for _, eventID := range request.EventIDs { + stateEntry, ok := stateEntries[eventID] + if !ok || len(stateEntry) == 0 { + response.Memberships[eventID] = nil + continue + } + + // If we can short circuit, e.g. we only have 0 or 1 membership events, we only get the memberships + // once. If we have more than one membership event, we need to get the state for each state entry. + if canShortCircuit { + if len(memberships) == 0 { + memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false) + } + } else { memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false) } - } else { - memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false) - } - if err != nil { - return fmt.Errorf("unable to get memberships at state: %w", err) - } - - res := make([]*gomatrixserverlib.HeaderedEvent, 0, len(memberships)) - - for i := range memberships { - ev := memberships[i] - if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(request.UserID) { - res = append(res, ev.Headered(info.RoomVersion)) + if err != nil { + return fmt.Errorf("unable to get memberships at state: %w", err) } - } - response.Memberships[eventID] = res - } - return nil + res := make([]*gomatrixserverlib.HeaderedEvent, 0, len(memberships)) + + for i := range memberships { + ev := memberships[i] + if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(request.UserID) { + res = append(res, ev.Headered(roomVersion)) + } + } + response.Memberships[eventID] = res + } + + return nil*/ } // QueryMembershipsForRoom implements api.RoomserverInternalAPI diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index e0b9c56b3..80c449623 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -175,4 +175,8 @@ type Database interface { GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error) PurgeRoom(ctx context.Context, roomID string) error UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error + + GetMembershipForHistoryVisibility( + ctx context.Context, userNID types.EventStateKeyNID, info *types.RoomInfo, eventIDs ...string, + ) (map[string]*gomatrixserverlib.HeaderedEvent, error) } diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index 9b5ed6eda..841f29546 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -69,6 +69,9 @@ CREATE TABLE IF NOT EXISTS roomserver_events ( auth_event_nids BIGINT[] NOT NULL, is_rejected BOOLEAN NOT NULL DEFAULT FALSE ); + +-- Create an index which helps in resolving membership events (event_type_nid = 5) - (used for history visibility) +create index if not exists roomserver_events_history_visibility_idx on roomserver_events (room_nid, event_state_key_nid) where (event_type_nid = 5); ` const insertEventSQL = "" + diff --git a/roomserver/storage/postgres/state_snapshot_table.go b/roomserver/storage/postgres/state_snapshot_table.go index a00c026f4..0c5afec07 100644 --- a/roomserver/storage/postgres/state_snapshot_table.go +++ b/roomserver/storage/postgres/state_snapshot_table.go @@ -21,10 +21,10 @@ import ( "fmt" "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -99,10 +99,24 @@ const bulkSelectStateForHistoryVisibilitySQL = ` AND (event_type_nid = 7 OR event_state_key LIKE '%:' || $2); ` +const bulkSelectMembershipForHistoryVisibilitySQL = ` +SELECT re.event_id, re2.event_id, rej.event_json +FROM roomserver_events re +LEFT JOIN roomserver_state_snapshots rss on re.state_snapshot_nid = rss.state_snapshot_nid +CROSS JOIN unnest(rss.state_block_nids) AS blocks(block_nid) +LEFT JOIN roomserver_state_block rsb ON rsb.state_block_nid = blocks.block_nid +CROSS JOIN unnest(rsb.event_nids) AS rsb2(event_nid) +JOIN roomserver_events re2 ON re2.room_nid = $3 AND re2.event_type_nid = 5 AND re2.event_nid = rsb2.event_nid AND re2.event_state_key_nid = $1 +LEFT JOIN roomserver_event_json rej ON rej.event_nid = re2.event_nid +WHERE re.event_id = ANY($2) + +` + type stateSnapshotStatements struct { - insertStateStmt *sql.Stmt - bulkSelectStateBlockNIDsStmt *sql.Stmt - bulkSelectStateForHistoryVisibilityStmt *sql.Stmt + insertStateStmt *sql.Stmt + bulkSelectStateBlockNIDsStmt *sql.Stmt + bulkSelectStateForHistoryVisibilityStmt *sql.Stmt + bulktSelectMembershipForHistoryVisibilityStmt *sql.Stmt } func CreateStateSnapshotTable(db *sql.DB) error { @@ -110,13 +124,14 @@ func CreateStateSnapshotTable(db *sql.DB) error { return err } -func PrepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) { +func PrepareStateSnapshotTable(db *sql.DB) (*stateSnapshotStatements, error) { s := &stateSnapshotStatements{} return s, sqlutil.StatementList{ {&s.insertStateStmt, insertStateSQL}, {&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL}, {&s.bulkSelectStateForHistoryVisibilityStmt, bulkSelectStateForHistoryVisibilitySQL}, + {&s.bulktSelectMembershipForHistoryVisibilityStmt, bulkSelectMembershipForHistoryVisibilitySQL}, }.Prepare(db) } @@ -185,3 +200,45 @@ func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility( } return results, rows.Err() } + +func (s *stateSnapshotStatements) BulkSelectMembershipForHistoryVisibility( + ctx context.Context, txn *sql.Tx, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string, +) (map[string]*gomatrixserverlib.HeaderedEvent, error) { + stmt := sqlutil.TxStmt(txn, s.bulktSelectMembershipForHistoryVisibilityStmt) + rows, err := stmt.QueryContext(ctx, userNID, pq.Array(eventIDs), roomInfo.RoomNID) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + result := make(map[string]*gomatrixserverlib.HeaderedEvent, len(eventIDs)) + var evJson []byte + var eventID string + var membershipEventID string + + knownEvents := make(map[string]*gomatrixserverlib.HeaderedEvent, len(eventIDs)) + + for rows.Next() { + if err = rows.Scan(&eventID, &membershipEventID, &evJson); err != nil { + return nil, err + } + if len(evJson) == 0 { + result[eventID] = &gomatrixserverlib.HeaderedEvent{} + continue + } + // If we already know this event, don't try to marshal the json again + if ev, ok := knownEvents[membershipEventID]; ok { + result[eventID] = ev + continue + } + event, err := gomatrixserverlib.NewEventFromTrustedJSON(evJson, false, roomInfo.RoomVersion) + if err != nil { + result[eventID] = &gomatrixserverlib.HeaderedEvent{} + // not fatal + continue + } + he := event.Headered(roomInfo.RoomVersion) + result[eventID] = he + knownEvents[membershipEventID] = he + } + return result, rows.Err() +} diff --git a/roomserver/storage/postgres/test_test.go b/roomserver/storage/postgres/test_test.go new file mode 100644 index 000000000..bf560bea2 --- /dev/null +++ b/roomserver/storage/postgres/test_test.go @@ -0,0 +1 @@ +package postgres diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 654b078d2..f8672496b 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -51,6 +51,12 @@ func (d *Database) SupportsConcurrentRoomInputs() bool { return true } +func (d *Database) GetMembershipForHistoryVisibility( + ctx context.Context, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string, +) (map[string]*gomatrixserverlib.HeaderedEvent, error) { + return d.StateSnapshotTable.BulkSelectMembershipForHistoryVisibility(ctx, nil, userNID, roomInfo, eventIDs...) +} + func (d *Database) EventTypeNIDs( ctx context.Context, eventTypes []string, ) (map[string]types.EventTypeNID, error) { diff --git a/roomserver/storage/sqlite3/state_snapshot_table.go b/roomserver/storage/sqlite3/state_snapshot_table.go index 930ad14dd..e57e1a4bf 100644 --- a/roomserver/storage/sqlite3/state_snapshot_table.go +++ b/roomserver/storage/sqlite3/state_snapshot_table.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -152,6 +153,10 @@ func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility( return nil, tables.OptimisationNotSupportedError } +func (s *stateSnapshotStatements) BulkSelectMembershipForHistoryVisibility(ctx context.Context, txn *sql.Tx, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string) (map[string]*gomatrixserverlib.HeaderedEvent, error) { + return nil, tables.OptimisationNotSupportedError +} + func (s *stateSnapshotStatements) selectStateBlockNIDsForRoomNID( ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, ) ([]types.StateBlockNID, error) { diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 64145f83d..c7f1064db 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -91,6 +91,10 @@ type StateSnapshot interface { // which users are in a room faster than having to load the entire room state. In the // case of SQLite, this will return tables.OptimisationNotSupportedError. BulkSelectStateForHistoryVisibility(ctx context.Context, txn *sql.Tx, stateSnapshotNID types.StateSnapshotNID, domain string) ([]types.EventNID, error) + + BulkSelectMembershipForHistoryVisibility( + ctx context.Context, txn *sql.Tx, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string, + ) (map[string]*gomatrixserverlib.HeaderedEvent, error) } type StateBlock interface { diff --git a/syncapi/internal/history_visibility.go b/syncapi/internal/history_visibility.go index 71d7ddd15..7b5fa29b9 100644 --- a/syncapi/internal/history_visibility.go +++ b/syncapi/internal/history_visibility.go @@ -121,10 +121,7 @@ func ApplyHistoryVisibilityFilter( // Get the mapping from eventID -> eventVisibility eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events)) - visibilities, err := visibilityForEvents(ctx, rsAPI, events, userID, events[0].RoomID()) - if err != nil { - return eventsFiltered, err - } + visibilities := visibilityForEvents(ctx, rsAPI, events, userID, events[0].RoomID()) for _, ev := range events { evVis := visibilities[ev.EventID()] evVis.membershipCurrent = membershipCurrent @@ -175,7 +172,7 @@ func visibilityForEvents( rsAPI api.SyncRoomserverAPI, events []*gomatrixserverlib.HeaderedEvent, userID, roomID string, -) (map[string]eventVisibility, error) { +) map[string]eventVisibility { eventIDs := make([]string, len(events)) for i := range events { eventIDs[i] = events[i].EventID() @@ -185,6 +182,7 @@ func visibilityForEvents( // get the membership events for all eventIDs membershipResp := &api.QueryMembershipAtEventResponse{} + err := rsAPI.QueryMembershipAtEvent(ctx, &api.QueryMembershipAtEventRequest{ RoomID: roomID, EventIDs: eventIDs, @@ -201,19 +199,20 @@ func visibilityForEvents( membershipAtEvent: gomatrixserverlib.Leave, // default to leave, to not expose events by accident visibility: event.Visibility, } - membershipEvs, ok := membershipResp.Memberships[eventID] + ev, ok := membershipResp.Memberships[eventID] if !ok { result[eventID] = vis continue } - for _, ev := range membershipEvs { - membership, err := ev.Membership() - if err != nil { - return result, err - } - vis.membershipAtEvent = membership + + membership, err := ev.Membership() + if err != nil { + result[eventID] = vis + continue } + vis.membershipAtEvent = membership + result[eventID] = vis } - return result, nil + return result } diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 6d7fb20f7..49dffade6 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- This isn't a problem for us since we just want to order by this field. id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), -- The event ID for the event - event_id TEXT NOT NULL CONSTRAINT syncapi_event_id_idx UNIQUE, + event_id TEXT NOT NULL CONSTRAINT syncapi_output_room_event_id_idx UNIQUE, -- The 'room_id' key for the event. room_id TEXT NOT NULL, -- The headered JSON for the event, containing potentially additional metadata such as @@ -83,7 +83,7 @@ const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + - "ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " + + "ON CONFLICT ON CONSTRAINT syncapi_output_room_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " + "RETURNING id" const selectEventsSQL = "" + diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 9b69a5ca9..56c80fdc5 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -453,7 +453,7 @@ func applyHistoryVisibilityFilter( "room_id": roomID, "before": len(recentEvents), "after": len(events), - }).Trace("Applied history visibility (sync)") + }).Debugf("Applied history visibility (sync)") return events, nil }