Optimize history visibility

This commit is contained in:
Till Faelligen 2022-11-01 11:57:09 +01:00
parent 869bf4d0ac
commit 88fcb1f1d6
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
9 changed files with 152 additions and 33 deletions

View file

@ -5,6 +5,7 @@ import (
"database/sql" "database/sql"
"errors" "errors"
"fmt" "fmt"
"sort"
"strings" "strings"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -159,7 +160,7 @@ func GetMembershipsAtState(
ctx context.Context, db storage.Database, stateEntries []types.StateEntry, joinedOnly bool, ctx context.Context, db storage.Database, stateEntries []types.StateEntry, joinedOnly bool,
) ([]types.Event, error) { ) ([]types.Event, error) {
var eventNIDs []types.EventNID var eventNIDs types.EventNIDs
for _, entry := range stateEntries { for _, entry := range stateEntries {
// Filter the events to retrieve to only keep the membership events // Filter the events to retrieve to only keep the membership events
if entry.EventTypeNID == types.MRoomMemberNID { if entry.EventTypeNID == types.MRoomMemberNID {
@ -167,6 +168,14 @@ func GetMembershipsAtState(
} }
} }
// There are no events to get, don't bother asking the database
if len(eventNIDs) == 0 {
return []types.Event{}, nil
}
sort.Sort(eventNIDs)
util.Unique(eventNIDs)
// Get all of the events in this state // Get all of the events in this state
stateEvents, err := db.Events(ctx, eventNIDs) stateEvents, err := db.Events(ctx, eventNIDs)
if err != nil { if err != nil {

View file

@ -20,6 +20,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -239,6 +240,7 @@ func (r *Queryer) QueryMembershipAtEvent(
return fmt.Errorf("unable to get state before event: %w", err) return fmt.Errorf("unable to get state before event: %w", err)
} }
start := time.Now()
for _, eventID := range request.EventIDs { for _, eventID := range request.EventIDs {
stateEntry, ok := stateEntries[eventID] stateEntry, ok := stateEntries[eventID]
if !ok { if !ok {
@ -259,6 +261,7 @@ func (r *Queryer) QueryMembershipAtEvent(
} }
response.Memberships[eventID] = res response.Memberships[eventID] = res
} }
logrus.Debugf("XXX: GetMembershipsAtState duration: %s", time.Since(start))
return nil return nil
} }

View file

@ -18,17 +18,18 @@ package state
import ( import (
"context" "context"
"database/sql"
"fmt" "fmt"
"sort" "sort"
"sync" "sync"
"time" "time"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/roomserver/types"
) )
type StateResolutionStorage interface { type StateResolutionStorage interface {
@ -37,6 +38,7 @@ type StateResolutionStorage interface {
StateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error) StateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
StateEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error) StateEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error)
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error) SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error)
StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error) StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error) StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error) AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
@ -131,21 +133,12 @@ func (v *StateResolution) LoadMembershipAtEvent(
defer span.Finish() defer span.Finish()
// De-dupe snapshotNIDs // De-dupe snapshotNIDs
snapshotNIDMap := make(map[types.StateSnapshotNID][]string) // map from snapshot NID to eventIDs start := time.Now()
for i := range eventIDs { snapshotNIDMap, err := v.db.BulkSelectSnapshotsFromEventIDs(ctx, eventIDs)
eventID := eventIDs[i] if err != nil {
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID) return nil, err
if err != nil && err != sql.ErrNoRows {
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err)
}
if snapshotNID == 0 {
// If we don't know a state snapshot for this event then we can't calculate
// memberships at the time of the event, so skip over it. This means that
// it isn't guaranteed that the response map will contain every single event.
continue
}
snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID)
} }
logrus.Debugf("XXX: duration to lookup snapshot nids: %s", time.Since(start))
snapshotNIDs := make([]types.StateSnapshotNID, 0, len(snapshotNIDMap)) snapshotNIDs := make([]types.StateSnapshotNID, 0, len(snapshotNIDMap))
for nid := range snapshotNIDMap { for nid := range snapshotNIDMap {
@ -157,27 +150,52 @@ func (v *StateResolution) LoadMembershipAtEvent(
return nil, err return nil, err
} }
var wantStateBlocks []types.StateBlockNID
for _, x := range stateBlockNIDLists {
wantStateBlocks = append(wantStateBlocks, x.StateBlockNIDs...)
}
start = time.Now()
stateEntryLists, err := v.db.StateEntriesForTuples(ctx, uniqueStateBlockNIDs(wantStateBlocks), []types.StateKeyTuple{
{
EventTypeNID: types.MRoomMemberNID,
EventStateKeyNID: stateKeyNID,
},
})
if err != nil {
return nil, err
}
logrus.Debugf("XXX: duration to lookup StateEntriesForTuples: %s", time.Since(start))
stateBlockNIDsMap := stateBlockNIDListMap(stateBlockNIDLists)
stateEntriesMap := stateEntryListMap(stateEntryLists)
result := make(map[string][]types.StateEntry) result := make(map[string][]types.StateEntry)
start = time.Now()
for _, stateBlockNIDList := range stateBlockNIDLists { for _, stateBlockNIDList := range stateBlockNIDLists {
// Query the membership event for the user at the given stateblocks stateBlockNIDs, ok := stateBlockNIDsMap.lookup(stateBlockNIDList.StateSnapshotNID)
stateEntryLists, err := v.db.StateEntriesForTuples(ctx, stateBlockNIDList.StateBlockNIDs, []types.StateKeyTuple{ if !ok {
{ // This should only get hit if the database is corrupt.
EventTypeNID: types.MRoomMemberNID, // It should be impossible for an event to reference a NID that doesn't exist
EventStateKeyNID: stateKeyNID, panic(fmt.Errorf("corrupt DB: Missing state snapshot numeric ID %d", stateBlockNIDList.StateSnapshotNID))
},
})
if err != nil {
return nil, err
} }
evIDs := snapshotNIDMap[stateBlockNIDList.StateSnapshotNID] for _, stateBlockNID := range stateBlockNIDs {
entries, ok := stateEntriesMap.lookup(stateBlockNID)
if !ok {
// This should only get hit if the database is corrupt.
// It should be impossible for an event to reference a NID that doesn't exist
panic(fmt.Errorf("corrupt DB: Missing state block numeric ID %d", stateBlockNID))
}
for _, evID := range evIDs { evIDs := snapshotNIDMap[stateBlockNIDList.StateSnapshotNID]
for _, x := range stateEntryLists {
result[evID] = append(result[evID], x.StateEntries...) for _, evID := range evIDs {
result[evID] = append(result[evID], entries...)
} }
} }
} }
logrus.Debugf("XXX: duration to generate list: %s", time.Since(start))
return result, nil return result, nil
} }

View file

@ -72,6 +72,7 @@ type Database interface {
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error) Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
// Look up snapshot NID for an event ID string // Look up snapshot NID for an event ID string
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error) SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error)
// Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error. // Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error.
StoreEvent( StoreEvent(
ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID,

View file

@ -22,11 +22,12 @@ import (
"sort" "sort"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
) )
const eventsSchema = ` const eventsSchema = `
@ -80,6 +81,9 @@ const insertEventSQL = "" +
const selectEventSQL = "" + const selectEventSQL = "" +
"SELECT event_nid, state_snapshot_nid FROM roomserver_events WHERE event_id = $1" "SELECT event_nid, state_snapshot_nid FROM roomserver_events WHERE event_id = $1"
const bulkSelectSnapshotsForEventIDsSQL = "" +
"SELECT event_id, state_snapshot_nid FROM roomserver_events WHERE event_id = ANY($1)"
// Bulk lookup of events by string ID. // Bulk lookup of events by string ID.
// Sort by the numeric IDs for event type and state key. // Sort by the numeric IDs for event type and state key.
// This means we can use binary search to lookup entries by type and state key. // This means we can use binary search to lookup entries by type and state key.
@ -150,6 +154,7 @@ const selectEventRejectedSQL = "" +
type eventStatements struct { type eventStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt selectEventStmt *sql.Stmt
bulkSelectSnapshotsForEventIDsStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt bulkSelectStateEventByIDStmt *sql.Stmt
bulkSelectStateEventByIDExcludingRejectedStmt *sql.Stmt bulkSelectStateEventByIDExcludingRejectedStmt *sql.Stmt
bulkSelectStateEventByNIDStmt *sql.Stmt bulkSelectStateEventByNIDStmt *sql.Stmt
@ -179,6 +184,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
return s, sqlutil.StatementList{ return s, sqlutil.StatementList{
{&s.insertEventStmt, insertEventSQL}, {&s.insertEventStmt, insertEventSQL},
{&s.selectEventStmt, selectEventSQL}, {&s.selectEventStmt, selectEventSQL},
{&s.bulkSelectSnapshotsForEventIDsStmt, bulkSelectSnapshotsForEventIDsSQL},
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL}, {&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
{&s.bulkSelectStateEventByIDExcludingRejectedStmt, bulkSelectStateEventByIDExcludingRejectedSQL}, {&s.bulkSelectStateEventByIDExcludingRejectedStmt, bulkSelectStateEventByIDExcludingRejectedSQL},
{&s.bulkSelectStateEventByNIDStmt, bulkSelectStateEventByNIDSQL}, {&s.bulkSelectStateEventByNIDStmt, bulkSelectStateEventByNIDSQL},
@ -230,6 +236,29 @@ func (s *eventStatements) SelectEvent(
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
} }
func (s *eventStatements) BulkSelectSnapshotsFromEventIDs(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) (map[types.StateSnapshotNID][]string, error) {
stmt := sqlutil.TxStmt(txn, s.bulkSelectSnapshotsForEventIDsStmt)
rows, err := stmt.QueryContext(ctx, pq.Array(eventIDs))
if err != nil {
return nil, err
}
var eventID string
var stateNID types.StateSnapshotNID
result := make(map[types.StateSnapshotNID][]string)
for rows.Next() {
if err := rows.Scan(&eventID, &stateNID); err != nil {
return nil, err
}
result[stateNID] = append(result[stateNID], eventID)
}
return result, rows.Err()
}
// bulkSelectStateEventByID lookups a list of state events by event ID. // bulkSelectStateEventByID lookups a list of state events by event ID.
// If not excluding rejected events, and any of the requested events are missing from // If not excluding rejected events, and any of the requested events are missing from
// the database it returns a types.MissingEventError. If excluding rejected events, // the database it returns a types.MissingEventError. If excluding rejected events,

View file

@ -5,8 +5,9 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/roomserver/types"
) )
type RoomUpdater struct { type RoomUpdater struct {
@ -186,6 +187,10 @@ func (u *RoomUpdater) EventIDs(
return u.d.EventsTable.BulkSelectEventID(ctx, u.txn, eventNIDs) return u.d.EventsTable.BulkSelectEventID(ctx, u.txn, eventNIDs)
} }
func (u *RoomUpdater) BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error) {
return u.d.EventsTable.BulkSelectSnapshotsFromEventIDs(ctx, u.txn, eventIDs)
}
func (u *RoomUpdater) StateAtEventIDs( func (u *RoomUpdater) StateAtEventIDs(
ctx context.Context, eventIDs []string, ctx context.Context, eventIDs []string,
) ([]types.StateAtEvent, error) { ) ([]types.StateAtEvent, error) {

View file

@ -469,6 +469,23 @@ func (d *Database) events(
eventNIDs = append(eventNIDs, nid) eventNIDs = append(eventNIDs, nid)
} }
} }
// If we don't need to get any events from the database, short circuit now
if len(eventNIDs) == 0 {
results := make([]types.Event, 0, len(inputEventNIDs))
for _, nid := range inputEventNIDs {
event, ok := events[nid]
if !ok || event == nil {
return nil, fmt.Errorf("event %d missing", nid)
}
results = append(results, types.Event{
EventNID: nid,
Event: event,
})
}
if !redactionsArePermanent {
d.applyRedactions(results)
}
}
eventJSONs, err := d.EventJSONTable.BulkSelectEventJSON(ctx, txn, eventNIDs) eventJSONs, err := d.EventJSONTable.BulkSelectEventJSON(ctx, txn, eventNIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -534,6 +551,12 @@ func (d *Database) events(
return results, nil return results, nil
} }
func (d *Database) BulkSelectSnapshotsFromEventIDs(
ctx context.Context, eventIDs []string,
) (map[types.StateSnapshotNID][]string, error) {
return d.EventsTable.BulkSelectSnapshotsFromEventIDs(ctx, nil, eventIDs)
}
func (d *Database) MembershipUpdater( func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string, ctx context.Context, roomID, targetUserID string,
targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion,

View file

@ -23,11 +23,13 @@ import (
"sort" "sort"
"strings" "strings"
"github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
) )
const eventsSchema = ` const eventsSchema = `
@ -57,6 +59,9 @@ const insertEventSQL = `
const selectEventSQL = "" + const selectEventSQL = "" +
"SELECT event_nid, state_snapshot_nid FROM roomserver_events WHERE event_id = $1" "SELECT event_nid, state_snapshot_nid FROM roomserver_events WHERE event_id = $1"
const bulkSelectSnapshotsForEventIDsSQL = "" +
"SELECT event_id, state_snapshot_nid FROM roomserver_events WHERE event_id = ANY($1)"
// Bulk lookup of events by string ID. // Bulk lookup of events by string ID.
// Sort by the numeric IDs for event type and state key. // Sort by the numeric IDs for event type and state key.
// This means we can use binary search to lookup entries by type and state key. // This means we can use binary search to lookup entries by type and state key.
@ -124,6 +129,7 @@ type eventStatements struct {
db *sql.DB db *sql.DB
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt selectEventStmt *sql.Stmt
bulkSelectSnapshotsForEventIDsStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt bulkSelectStateEventByIDStmt *sql.Stmt
bulkSelectStateEventByIDExcludingRejectedStmt *sql.Stmt bulkSelectStateEventByIDExcludingRejectedStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt bulkSelectStateAtEventByIDStmt *sql.Stmt
@ -153,6 +159,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
return s, sqlutil.StatementList{ return s, sqlutil.StatementList{
{&s.insertEventStmt, insertEventSQL}, {&s.insertEventStmt, insertEventSQL},
{&s.selectEventStmt, selectEventSQL}, {&s.selectEventStmt, selectEventSQL},
{&s.bulkSelectSnapshotsForEventIDsStmt, bulkSelectSnapshotsForEventIDsSQL},
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL}, {&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
{&s.bulkSelectStateEventByIDExcludingRejectedStmt, bulkSelectStateEventByIDExcludingRejectedSQL}, {&s.bulkSelectStateEventByIDExcludingRejectedStmt, bulkSelectStateEventByIDExcludingRejectedSQL},
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL}, {&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
@ -203,6 +210,29 @@ func (s *eventStatements) SelectEvent(
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
} }
func (s *eventStatements) BulkSelectSnapshotsFromEventIDs(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) (map[types.StateSnapshotNID][]string, error) {
stmt := sqlutil.TxStmt(txn, s.bulkSelectSnapshotsForEventIDsStmt)
rows, err := stmt.QueryContext(ctx, pq.Array(eventIDs))
if err != nil {
return nil, err
}
var eventID string
var stateNID types.StateSnapshotNID
result := make(map[types.StateSnapshotNID][]string)
for rows.Next() {
if err := rows.Scan(&eventID, &stateNID); err != nil {
return nil, err
}
result[stateNID] = append(result[stateNID], eventID)
}
return result, rows.Err()
}
// bulkSelectStateEventByID lookups a list of state events by event ID. // bulkSelectStateEventByID lookups a list of state events by event ID.
// If not excluding rejected events, and any of the requested events are missing from // If not excluding rejected events, and any of the requested events are missing from
// the database it returns a types.MissingEventError. If excluding rejected events, // the database it returns a types.MissingEventError. If excluding rejected events,

View file

@ -44,6 +44,7 @@ type Events interface {
referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64, isRejected bool, referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64, isRejected bool,
) (types.EventNID, types.StateSnapshotNID, error) ) (types.EventNID, types.StateSnapshotNID, error)
SelectEvent(ctx context.Context, txn *sql.Tx, eventID string) (types.EventNID, types.StateSnapshotNID, error) SelectEvent(ctx context.Context, txn *sql.Tx, eventID string) (types.EventNID, types.StateSnapshotNID, error)
BulkSelectSnapshotsFromEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[types.StateSnapshotNID][]string, error)
// bulkSelectStateEventByID lookups a list of state events by event ID. // bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError // If any of the requested events are missing from the database it returns a types.MissingEventError
BulkSelectStateEventByID(ctx context.Context, txn *sql.Tx, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error) BulkSelectStateEventByID(ctx context.Context, txn *sql.Tx, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error)