Optimize getting history visibility events (postgres)

This commit is contained in:
Till Faelligen 2023-02-01 11:12:33 +01:00
parent c068c6eb62
commit 823540e861
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
12 changed files with 166 additions and 68 deletions

View file

@ -445,7 +445,7 @@ type QueryMembershipAtEventRequest struct {
type QueryMembershipAtEventResponse struct {
// Memberships is a map from eventID to a list of events (if any). Events that
// do not have known state will return an empty array here.
Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
Memberships map[string]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
}
// QueryLeftUsersRequest is a request to calculate users that we (the server) don't share a

View file

@ -216,7 +216,8 @@ func (r *Queryer) QueryMembershipAtEvent(
request *api.QueryMembershipAtEventRequest,
response *api.QueryMembershipAtEventResponse,
) error {
response.Memberships = make(map[string][]*gomatrixserverlib.HeaderedEvent)
response.Memberships = make(map[string]*gomatrixserverlib.HeaderedEvent)
info, err := r.DB.RoomInfo(ctx, request.RoomID)
if err != nil {
return fmt.Errorf("unable to get roomInfo: %w", err)
@ -225,7 +226,6 @@ func (r *Queryer) QueryMembershipAtEvent(
return fmt.Errorf("no roomInfo found")
}
// get the users stateKeyNID
stateKeyNIDs, err := r.DB.EventStateKeyNIDs(ctx, []string{request.UserID})
if err != nil {
return fmt.Errorf("unable to get stateKeyNIDs for %s: %w", request.UserID, err)
@ -234,59 +234,78 @@ func (r *Queryer) QueryMembershipAtEvent(
return fmt.Errorf("requested stateKeyNID for %s was not found", request.UserID)
}
stateEntries, err := helpers.MembershipAtEvent(ctx, r.DB, info, request.EventIDs, stateKeyNIDs[request.UserID])
if err != nil {
return fmt.Errorf("unable to get state before event: %w", err)
response.Memberships, err = r.DB.GetMembershipForHistoryVisibility(ctx, stateKeyNIDs[request.UserID], info, request.EventIDs...)
switch err {
case nil:
return nil
//case tables.OptimisationNotSupportedError: // fallthrough
default:
return err
}
// If we only have one or less state entries, we can short circuit the below
// loop and avoid hitting the database
allStateEventNIDs := make(map[types.EventNID]types.StateEntry)
for _, eventID := range request.EventIDs {
stateEntry := stateEntries[eventID]
for _, s := range stateEntry {
allStateEventNIDs[s.EventNID] = s
/*
// get the users stateKeyNID
stateKeyNIDs, err := r.DB.EventStateKeyNIDs(ctx, []string{request.UserID})
if err != nil {
return fmt.Errorf("unable to get stateKeyNIDs for %s: %w", request.UserID, err)
}
}
var canShortCircuit bool
if len(allStateEventNIDs) <= 1 {
canShortCircuit = true
}
var memberships []types.Event
for _, eventID := range request.EventIDs {
stateEntry, ok := stateEntries[eventID]
if !ok || len(stateEntry) == 0 {
response.Memberships[eventID] = []*gomatrixserverlib.HeaderedEvent{}
continue
if _, ok := stateKeyNIDs[request.UserID]; !ok {
return fmt.Errorf("requested stateKeyNID for %s was not found", request.UserID)
}
// If we can short circuit, e.g. we only have 0 or 1 membership events, we only get the memberships
// once. If we have more than one membership event, we need to get the state for each state entry.
if canShortCircuit {
if len(memberships) == 0 {
stateEntries, err := helpers.MembershipAtEvent(ctx, r.DB, nil, request.EventIDs, stateKeyNIDs[request.UserID])
if err != nil {
return fmt.Errorf("unable to get state before event: %w", err)
}
// If we only have one or less state entries, we can short circuit the below
// loop and avoid hitting the database
allStateEventNIDs := make(map[types.EventNID]types.StateEntry)
for _, eventID := range request.EventIDs {
stateEntry := stateEntries[eventID]
for _, s := range stateEntry {
allStateEventNIDs[s.EventNID] = s
}
}
var canShortCircuit bool
if len(allStateEventNIDs) <= 1 {
canShortCircuit = true
}
var memberships []types.Event
for _, eventID := range request.EventIDs {
stateEntry, ok := stateEntries[eventID]
if !ok || len(stateEntry) == 0 {
response.Memberships[eventID] = nil
continue
}
// If we can short circuit, e.g. we only have 0 or 1 membership events, we only get the memberships
// once. If we have more than one membership event, we need to get the state for each state entry.
if canShortCircuit {
if len(memberships) == 0 {
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
}
} else {
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
}
} else {
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
}
if err != nil {
return fmt.Errorf("unable to get memberships at state: %w", err)
}
res := make([]*gomatrixserverlib.HeaderedEvent, 0, len(memberships))
for i := range memberships {
ev := memberships[i]
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(request.UserID) {
res = append(res, ev.Headered(info.RoomVersion))
if err != nil {
return fmt.Errorf("unable to get memberships at state: %w", err)
}
}
response.Memberships[eventID] = res
}
return nil
res := make([]*gomatrixserverlib.HeaderedEvent, 0, len(memberships))
for i := range memberships {
ev := memberships[i]
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(request.UserID) {
res = append(res, ev.Headered(roomVersion))
}
}
response.Memberships[eventID] = res
}
return nil*/
}
// QueryMembershipsForRoom implements api.RoomserverInternalAPI

View file

@ -175,4 +175,8 @@ type Database interface {
GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error)
PurgeRoom(ctx context.Context, roomID string) error
UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error
GetMembershipForHistoryVisibility(
ctx context.Context, userNID types.EventStateKeyNID, info *types.RoomInfo, eventIDs ...string,
) (map[string]*gomatrixserverlib.HeaderedEvent, error)
}

View file

@ -69,6 +69,9 @@ CREATE TABLE IF NOT EXISTS roomserver_events (
auth_event_nids BIGINT[] NOT NULL,
is_rejected BOOLEAN NOT NULL DEFAULT FALSE
);
-- Create an index which helps in resolving membership events (event_type_nid = 5) - (used for history visibility)
create index if not exists roomserver_events_history_visibility_idx on roomserver_events (room_nid, event_state_key_nid) where (event_type_nid = 5);
`
const insertEventSQL = "" +

View file

@ -21,10 +21,10 @@ import (
"fmt"
"github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
)
@ -99,10 +99,24 @@ const bulkSelectStateForHistoryVisibilitySQL = `
AND (event_type_nid = 7 OR event_state_key LIKE '%:' || $2);
`
const bulkSelectMembershipForHistoryVisibilitySQL = `
SELECT re.event_id, re2.event_id, rej.event_json
FROM roomserver_events re
LEFT JOIN roomserver_state_snapshots rss on re.state_snapshot_nid = rss.state_snapshot_nid
CROSS JOIN unnest(rss.state_block_nids) AS blocks(block_nid)
LEFT JOIN roomserver_state_block rsb ON rsb.state_block_nid = blocks.block_nid
CROSS JOIN unnest(rsb.event_nids) AS rsb2(event_nid)
JOIN roomserver_events re2 ON re2.room_nid = $3 AND re2.event_type_nid = 5 AND re2.event_nid = rsb2.event_nid AND re2.event_state_key_nid = $1
LEFT JOIN roomserver_event_json rej ON rej.event_nid = re2.event_nid
WHERE re.event_id = ANY($2)
`
type stateSnapshotStatements struct {
insertStateStmt *sql.Stmt
bulkSelectStateBlockNIDsStmt *sql.Stmt
bulkSelectStateForHistoryVisibilityStmt *sql.Stmt
insertStateStmt *sql.Stmt
bulkSelectStateBlockNIDsStmt *sql.Stmt
bulkSelectStateForHistoryVisibilityStmt *sql.Stmt
bulktSelectMembershipForHistoryVisibilityStmt *sql.Stmt
}
func CreateStateSnapshotTable(db *sql.DB) error {
@ -110,13 +124,14 @@ func CreateStateSnapshotTable(db *sql.DB) error {
return err
}
func PrepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
func PrepareStateSnapshotTable(db *sql.DB) (*stateSnapshotStatements, error) {
s := &stateSnapshotStatements{}
return s, sqlutil.StatementList{
{&s.insertStateStmt, insertStateSQL},
{&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL},
{&s.bulkSelectStateForHistoryVisibilityStmt, bulkSelectStateForHistoryVisibilitySQL},
{&s.bulktSelectMembershipForHistoryVisibilityStmt, bulkSelectMembershipForHistoryVisibilitySQL},
}.Prepare(db)
}
@ -185,3 +200,45 @@ func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility(
}
return results, rows.Err()
}
func (s *stateSnapshotStatements) BulkSelectMembershipForHistoryVisibility(
ctx context.Context, txn *sql.Tx, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string,
) (map[string]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.bulktSelectMembershipForHistoryVisibilityStmt)
rows, err := stmt.QueryContext(ctx, userNID, pq.Array(eventIDs), roomInfo.RoomNID)
if err != nil {
return nil, err
}
defer rows.Close() // nolint: errcheck
result := make(map[string]*gomatrixserverlib.HeaderedEvent, len(eventIDs))
var evJson []byte
var eventID string
var membershipEventID string
knownEvents := make(map[string]*gomatrixserverlib.HeaderedEvent, len(eventIDs))
for rows.Next() {
if err = rows.Scan(&eventID, &membershipEventID, &evJson); err != nil {
return nil, err
}
if len(evJson) == 0 {
result[eventID] = &gomatrixserverlib.HeaderedEvent{}
continue
}
// If we already know this event, don't try to marshal the json again
if ev, ok := knownEvents[membershipEventID]; ok {
result[eventID] = ev
continue
}
event, err := gomatrixserverlib.NewEventFromTrustedJSON(evJson, false, roomInfo.RoomVersion)
if err != nil {
result[eventID] = &gomatrixserverlib.HeaderedEvent{}
// not fatal
continue
}
he := event.Headered(roomInfo.RoomVersion)
result[eventID] = he
knownEvents[membershipEventID] = he
}
return result, rows.Err()
}

View file

@ -0,0 +1 @@
package postgres

View file

@ -51,6 +51,12 @@ func (d *Database) SupportsConcurrentRoomInputs() bool {
return true
}
func (d *Database) GetMembershipForHistoryVisibility(
ctx context.Context, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string,
) (map[string]*gomatrixserverlib.HeaderedEvent, error) {
return d.StateSnapshotTable.BulkSelectMembershipForHistoryVisibility(ctx, nil, userNID, roomInfo, eventIDs...)
}
func (d *Database) EventTypeNIDs(
ctx context.Context, eventTypes []string,
) (map[string]types.EventTypeNID, error) {

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@ -152,6 +153,10 @@ func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility(
return nil, tables.OptimisationNotSupportedError
}
func (s *stateSnapshotStatements) BulkSelectMembershipForHistoryVisibility(ctx context.Context, txn *sql.Tx, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string) (map[string]*gomatrixserverlib.HeaderedEvent, error) {
return nil, tables.OptimisationNotSupportedError
}
func (s *stateSnapshotStatements) selectStateBlockNIDsForRoomNID(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) ([]types.StateBlockNID, error) {

View file

@ -91,6 +91,10 @@ type StateSnapshot interface {
// which users are in a room faster than having to load the entire room state. In the
// case of SQLite, this will return tables.OptimisationNotSupportedError.
BulkSelectStateForHistoryVisibility(ctx context.Context, txn *sql.Tx, stateSnapshotNID types.StateSnapshotNID, domain string) ([]types.EventNID, error)
BulkSelectMembershipForHistoryVisibility(
ctx context.Context, txn *sql.Tx, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string,
) (map[string]*gomatrixserverlib.HeaderedEvent, error)
}
type StateBlock interface {

View file

@ -121,10 +121,7 @@ func ApplyHistoryVisibilityFilter(
// Get the mapping from eventID -> eventVisibility
eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events))
visibilities, err := visibilityForEvents(ctx, rsAPI, events, userID, events[0].RoomID())
if err != nil {
return eventsFiltered, err
}
visibilities := visibilityForEvents(ctx, rsAPI, events, userID, events[0].RoomID())
for _, ev := range events {
evVis := visibilities[ev.EventID()]
evVis.membershipCurrent = membershipCurrent
@ -175,7 +172,7 @@ func visibilityForEvents(
rsAPI api.SyncRoomserverAPI,
events []*gomatrixserverlib.HeaderedEvent,
userID, roomID string,
) (map[string]eventVisibility, error) {
) map[string]eventVisibility {
eventIDs := make([]string, len(events))
for i := range events {
eventIDs[i] = events[i].EventID()
@ -185,6 +182,7 @@ func visibilityForEvents(
// get the membership events for all eventIDs
membershipResp := &api.QueryMembershipAtEventResponse{}
err := rsAPI.QueryMembershipAtEvent(ctx, &api.QueryMembershipAtEventRequest{
RoomID: roomID,
EventIDs: eventIDs,
@ -201,19 +199,20 @@ func visibilityForEvents(
membershipAtEvent: gomatrixserverlib.Leave, // default to leave, to not expose events by accident
visibility: event.Visibility,
}
membershipEvs, ok := membershipResp.Memberships[eventID]
ev, ok := membershipResp.Memberships[eventID]
if !ok {
result[eventID] = vis
continue
}
for _, ev := range membershipEvs {
membership, err := ev.Membership()
if err != nil {
return result, err
}
vis.membershipAtEvent = membership
membership, err := ev.Membership()
if err != nil {
result[eventID] = vis
continue
}
vis.membershipAtEvent = membership
result[eventID] = vis
}
return result, nil
return result
}

View file

@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
-- This isn't a problem for us since we just want to order by this field.
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
-- The event ID for the event
event_id TEXT NOT NULL CONSTRAINT syncapi_event_id_idx UNIQUE,
event_id TEXT NOT NULL CONSTRAINT syncapi_output_room_event_id_idx UNIQUE,
-- The 'room_id' key for the event.
room_id TEXT NOT NULL,
-- The headered JSON for the event, containing potentially additional metadata such as
@ -83,7 +83,7 @@ const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" +
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
"ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
"ON CONFLICT ON CONSTRAINT syncapi_output_room_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
"RETURNING id"
const selectEventsSQL = "" +

View file

@ -453,7 +453,7 @@ func applyHistoryVisibilityFilter(
"room_id": roomID,
"before": len(recentEvents),
"after": len(events),
}).Trace("Applied history visibility (sync)")
}).Debugf("Applied history visibility (sync)")
return events, nil
}