s/StateDataNID/StateBlockNID/

This commit is contained in:
Mark Haines 2017-02-14 17:11:16 +00:00
parent 548ba1fd1f
commit d2952f485b
5 changed files with 93 additions and 100 deletions

View file

@ -26,12 +26,12 @@ type RoomEventDatabase interface {
StateAtEventIDs(eventIDs []string) ([]types.StateAtEvent, error)
// Lookup the numeric state data IDs for each numeric state snapshot ID
// The returned slice is sorted by numeric state snapshot ID.
StateDataNIDs(stateNIDs []types.StateSnapshotNID) ([]types.StateDataNIDList, error)
StateBlockNIDs(stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
// Lookup the state data for each numeric state data ID
// The returned slice is sorted by numeric state data ID.
StateEntries(stateDataNIDs []types.StateDataNID) ([]types.StateEntryList, error)
StateEntries(stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error)
// Store the room state at an event in the database
AddState(roomNID types.RoomNID, stateDataNIDs []types.StateDataNID, state []types.StateEntry) (types.StateSnapshotNID, error)
AddState(roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
// Set the state at an event.
SetState(eventNID types.EventNID, stateNID types.StateSnapshotNID) error
}
@ -78,9 +78,7 @@ func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error {
}
} else {
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
if stateAtEvent.BeforeStateSnapshotNID, err = calculateAndStoreState(
db, event, roomNID, input.StateEventIDs,
); err != nil {
if stateAtEvent.BeforeStateSnapshotNID, err = calculateAndStoreState(db, event, roomNID); err != nil {
return err
}
}

View file

@ -7,20 +7,12 @@ import (
"sort"
)
// calculateAndStoreState calculates a snapshot of the state of a room before an event.
// Stores the snapshot of the state in the database.
// Returns a numeric ID for that snapshot.
func calculateAndStoreState(
db RoomEventDatabase, event gomatrixserverlib.Event, roomNID types.RoomNID, stateEventIDs []string,
db RoomEventDatabase, event gomatrixserverlib.Event, roomNID types.RoomNID,
) (types.StateSnapshotNID, error) {
if stateEventIDs != nil {
// 1) We've been told what the state at the event is.
// Check that those state events are in the database and store the state.
entries, err := db.StateEntriesForEventIDs(stateEventIDs)
if err != nil {
return 0, err
}
return db.AddState(roomNID, nil, entries)
}
// Load the state at the prev events.
prevEventRefs := event.PrevEvents()
prevEventIDs := make([]string, len(prevEventRefs))
@ -50,16 +42,16 @@ func calculateAndStoreState(
}
// The previous event was a state event so we need to store a copy
// of the previous state updated with that event.
stateDataNIDLists, err := db.StateDataNIDs([]types.StateSnapshotNID{prevState.BeforeStateSnapshotNID})
stateBlockNIDLists, err := db.StateBlockNIDs([]types.StateSnapshotNID{prevState.BeforeStateSnapshotNID})
if err != nil {
return 0, err
}
stateDataNIDs := stateDataNIDLists[0].StateDataNIDs
if len(stateDataNIDs) < maxStateDataNIDs {
stateBlockNIDs := stateBlockNIDLists[0].StateBlockNIDs
if len(stateBlockNIDs) < maxStateBlockNIDs {
// 4) The number of state data blocks is small enough that we can just
// add the state event as a block of size one to the end of the blocks.
return db.AddState(
roomNID, stateDataNIDs, []types.StateEntry{prevState.StateEntry},
roomNID, stateBlockNIDs, []types.StateEntry{prevState.StateEntry},
)
}
// If there are too many deltas then we need to calculate the full state
@ -68,13 +60,16 @@ func calculateAndStoreState(
return calculateAndStoreStateMany(db, roomNID, prevStates)
}
// maxStateDataNIDs is the maximum number of state data blocks to use to encode a snapshot of room state.
// maxStateBlockNIDs is the maximum number of state data blocks to use to encode a snapshot of room state.
// Increasing this number means that we can encode more of the state changes as simple deltas which means that
// we need fewer entries in the state data table. However making this number bigger will increase the size of
// the rows in the state table itself and will require more index lookups when retrieving a snapshot.
// TODO: Tune this to get the right balance between size and lookup performance.
const maxStateDataNIDs = 64
const maxStateBlockNIDs = 64
// calculateAndStoreStateMany calculates the state of the room before an event
// using the states at each of the event's prev events.
// Stores the resulting state and returns a numeric ID for the snapshot.
func calculateAndStoreStateMany(db RoomEventDatabase, roomNID types.RoomNID, prevStates []types.StateAtEvent) (types.StateSnapshotNID, error) {
// Conflict resolution.
// First stage: load the state datablocks for the prev events.
@ -86,31 +81,31 @@ func calculateAndStoreStateMany(db RoomEventDatabase, roomNID types.RoomNID, pre
// Deduplicate the IDs before passing them to the database.
// There could be duplicates because the events could be state events where
// the snapshot of the room state before them was the same.
stateDataNIDLists, err := db.StateDataNIDs(uniqueStateSnapshotNIDs(stateNIDs))
stateBlockNIDLists, err := db.StateBlockNIDs(uniqueStateSnapshotNIDs(stateNIDs))
if err != nil {
return 0, err
}
var stateDataNIDs []types.StateDataNID
for _, list := range stateDataNIDLists {
stateDataNIDs = append(stateDataNIDs, list.StateDataNIDs...)
var stateBlockNIDs []types.StateBlockNID
for _, list := range stateBlockNIDLists {
stateBlockNIDs = append(stateBlockNIDs, list.StateBlockNIDs...)
}
// Fetch the state entries that will be combined to create the snapshots.
// Deduplicate the IDs before passing them to the database.
// There could be duplicates because a block of state entries could be reused by
// multiple snapshots.
stateEntryLists, err := db.StateEntries(uniqueStateDataNIDs(stateDataNIDs))
stateEntryLists, err := db.StateEntries(uniqueStateBlockNIDs(stateBlockNIDs))
if err != nil {
return 0, err
}
stateDataNIDsMap := stateDataNIDListMap(stateDataNIDLists)
stateBlockNIDsMap := stateBlockNIDListMap(stateBlockNIDLists)
stateEntriesMap := stateEntryListMap(stateEntryLists)
// Combine the entries from all the snapshots of state after each prev event into a single list.
var combined []types.StateEntry
for _, prevState := range prevStates {
// Grab the list of state data NIDs for this snapshot.
list, ok := stateDataNIDsMap.lookup(prevState.BeforeStateSnapshotNID)
list, ok := stateBlockNIDsMap.lookup(prevState.BeforeStateSnapshotNID)
if !ok {
// This should only get hit if the database is corrupt.
// It should be impossible for an event to reference a NID that doesn't exist
@ -120,8 +115,8 @@ func calculateAndStoreStateMany(db RoomEventDatabase, roomNID types.RoomNID, pre
// Combined all the state entries for this snapshot.
// The order of state data NIDs in the list tells us the order to combine them in.
var fullState []types.StateEntry
for _, stateDataNID := range list {
entries, ok := stateEntriesMap.lookup(stateDataNID)
for _, stateBlockNID := range list {
entries, ok := stateEntriesMap.lookup(stateBlockNID)
if !ok {
// This should only get hit if the database is corrupt.
// It should be impossible for an event to reference a NID that doesn't exist
@ -138,7 +133,7 @@ func calculateAndStoreStateMany(db RoomEventDatabase, roomNID types.RoomNID, pre
// Stable sort so that the most recent entry for each state key stays
// remains later in the list than the older entries for the same state key.
sort.Stable(stateEntryByStateKeySorter(fullState))
// Unique returns the last entry for each state key.
// Unique returns the last entry and hence the most recent entry for each state key.
fullState = fullState[:unique(stateEntryByStateKeySorter(fullState))]
// Add the full state for this StateSnapshotNID.
combined = append(combined, fullState...)
@ -201,28 +196,28 @@ func findDuplicateStateKeys(a []types.StateEntry) []types.StateEntry {
return result
}
type stateDataNIDListMap []types.StateDataNIDList
type stateBlockNIDListMap []types.StateBlockNIDList
func (m stateDataNIDListMap) lookup(stateNID types.StateSnapshotNID) (stateDataNIDs []types.StateDataNID, ok bool) {
list := []types.StateDataNIDList(m)
func (m stateBlockNIDListMap) lookup(stateNID types.StateSnapshotNID) (stateBlockNIDs []types.StateBlockNID, ok bool) {
list := []types.StateBlockNIDList(m)
i := sort.Search(len(list), func(i int) bool {
return list[i].StateSnapshotNID >= stateNID
})
if i < len(list) && list[i].StateSnapshotNID == stateNID {
ok = true
stateDataNIDs = list[i].StateDataNIDs
stateBlockNIDs = list[i].StateBlockNIDs
}
return
}
type stateEntryListMap []types.StateEntryList
func (m stateEntryListMap) lookup(stateDataNID types.StateDataNID) (stateEntries []types.StateEntry, ok bool) {
func (m stateEntryListMap) lookup(stateBlockNID types.StateBlockNID) (stateEntries []types.StateEntry, ok bool) {
list := []types.StateEntryList(m)
i := sort.Search(len(list), func(i int) bool {
return list[i].StateDataNID >= stateDataNID
return list[i].StateBlockNID >= stateBlockNID
})
if i < len(list) && list[i].StateDataNID == stateDataNID {
if i < len(list) && list[i].StateBlockNID == stateBlockNID {
ok = true
stateEntries = list[i].StateEntries
}
@ -254,15 +249,15 @@ func uniqueStateSnapshotNIDs(nids []types.StateSnapshotNID) []types.StateSnapsho
return nids[:unique(stateNIDSorter(nids))]
}
type stateDataNIDSorter []types.StateDataNID
type stateBlockNIDSorter []types.StateBlockNID
func (s stateDataNIDSorter) Len() int { return len(s) }
func (s stateDataNIDSorter) Less(i, j int) bool { return s[i] < s[j] }
func (s stateDataNIDSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s stateBlockNIDSorter) Len() int { return len(s) }
func (s stateBlockNIDSorter) Less(i, j int) bool { return s[i] < s[j] }
func (s stateBlockNIDSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func uniqueStateDataNIDs(nids []types.StateDataNID) []types.StateDataNID {
sort.Sort(stateDataNIDSorter(nids))
return nids[:unique(stateDataNIDSorter(nids))]
func uniqueStateBlockNIDs(nids []types.StateBlockNID) []types.StateBlockNID {
sort.Sort(stateBlockNIDSorter(nids))
return nids[:unique(stateBlockNIDSorter(nids))]
}
// Remove duplicate items from a sorted list.

View file

@ -24,9 +24,9 @@ type statements struct {
insertEventJSONStmt *sql.Stmt
bulkSelectEventJSONStmt *sql.Stmt
insertStateStmt *sql.Stmt
bulkSelectStateDataNIDsStmt *sql.Stmt
bulkSelectStateBlockNIDsStmt *sql.Stmt
insertStateDataStmt *sql.Stmt
selectNextStateDataNIDStmt *sql.Stmt
selectNextStateBlockNIDStmt *sql.Stmt
bulkSelectStateDataEntriesStmt *sql.Stmt
}
@ -605,7 +605,7 @@ const insertStateSQL = "" +
// Bulk state data NID lookup.
// Sorting by state_snapshot_nid means we can use binary search over the result
// to lookup the state data NIDs for a state snapshot NID.
const bulkSelectStateDataNIDsSQL = "" +
const bulkSelectStateBlockNIDsSQL = "" +
"SELECT state_snapshot_nid, state_data_nids FROM state_snapshots" +
" WHERE state_snapshot_nid = ANY($1) ORDER BY state_snapshot_nid ASC"
@ -617,42 +617,42 @@ func (s *statements) prepareState(db *sql.DB) (err error) {
if s.insertStateStmt, err = db.Prepare(insertStateSQL); err != nil {
return
}
if s.bulkSelectStateDataNIDsStmt, err = db.Prepare(bulkSelectStateDataNIDsSQL); err != nil {
if s.bulkSelectStateBlockNIDsStmt, err = db.Prepare(bulkSelectStateBlockNIDsSQL); err != nil {
return
}
return
}
func (s *statements) insertState(roomNID types.RoomNID, stateDataNIDs []types.StateDataNID) (stateNID types.StateSnapshotNID, err error) {
nids := make([]int64, len(stateDataNIDs))
for i := range stateDataNIDs {
nids[i] = int64(stateDataNIDs[i])
func (s *statements) insertState(roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID) (stateNID types.StateSnapshotNID, err error) {
nids := make([]int64, len(stateBlockNIDs))
for i := range stateBlockNIDs {
nids[i] = int64(stateBlockNIDs[i])
}
err = s.insertStateStmt.QueryRow(int64(roomNID), pq.Int64Array(nids)).Scan(&stateNID)
return
}
func (s *statements) bulkSelectStateDataNIDs(stateNIDs []types.StateSnapshotNID) ([]types.StateDataNIDList, error) {
func (s *statements) bulkSelectStateBlockNIDs(stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error) {
nids := make([]int64, len(stateNIDs))
for i := range stateNIDs {
nids[i] = int64(stateNIDs[i])
}
rows, err := s.bulkSelectStateDataNIDsStmt.Query(pq.Int64Array(nids))
rows, err := s.bulkSelectStateBlockNIDsStmt.Query(pq.Int64Array(nids))
if err != nil {
return nil, err
}
defer rows.Close()
results := make([]types.StateDataNIDList, len(stateNIDs))
results := make([]types.StateBlockNIDList, len(stateNIDs))
i := 0
for ; rows.Next(); i++ {
result := &results[i]
var stateDataNIDs pq.Int64Array
if err := rows.Scan(&result.StateSnapshotNID, &stateDataNIDs); err != nil {
var stateBlockNIDs pq.Int64Array
if err := rows.Scan(&result.StateSnapshotNID, &stateBlockNIDs); err != nil {
return nil, err
}
result.StateDataNIDs = make([]types.StateDataNID, len(stateDataNIDs))
for k := range stateDataNIDs {
result.StateDataNIDs[k] = types.StateDataNID(stateDataNIDs[k])
result.StateBlockNIDs = make([]types.StateBlockNID, len(stateBlockNIDs))
for k := range stateBlockNIDs {
result.StateBlockNIDs[k] = types.StateBlockNID(stateBlockNIDs[k])
}
}
if i != len(stateNIDs) {
@ -686,7 +686,7 @@ const insertStateDataSQL = "" +
"INSERT INTO state_data (state_data_nid, event_type_nid, event_state_key_nid, event_nid)" +
" VALUES ($1, $2, $3, $4)"
const selectNextStateDataNIDSQL = "" +
const selectNextStateBlockNIDSQL = "" +
"SELECT nextval('state_data_nid_seq')"
// Bulk state lookup by numeric event ID.
@ -708,7 +708,7 @@ func (s *statements) prepareStateData(db *sql.DB) (err error) {
if s.insertStateDataStmt, err = db.Prepare(insertStateDataSQL); err != nil {
return
}
if s.selectNextStateDataNIDStmt, err = db.Prepare(selectNextStateDataNIDSQL); err != nil {
if s.selectNextStateBlockNIDStmt, err = db.Prepare(selectNextStateBlockNIDSQL); err != nil {
return
}
@ -718,10 +718,10 @@ func (s *statements) prepareStateData(db *sql.DB) (err error) {
return
}
func (s *statements) bulkInsertStateData(stateDataNID types.StateDataNID, entries []types.StateEntry) error {
func (s *statements) bulkInsertStateData(stateBlockNID types.StateBlockNID, entries []types.StateEntry) error {
for _, entry := range entries {
_, err := s.insertStateDataStmt.Exec(
int64(stateDataNID),
int64(stateBlockNID),
int64(entry.EventTypeNID),
int64(entry.EventStateKeyNID),
int64(entry.EventNID),
@ -733,16 +733,16 @@ func (s *statements) bulkInsertStateData(stateDataNID types.StateDataNID, entrie
return nil
}
func (s *statements) selectNextStateDataNID() (types.StateDataNID, error) {
var stateDataNID int64
err := s.selectNextStateDataNIDStmt.QueryRow().Scan(&stateDataNID)
return types.StateDataNID(stateDataNID), err
func (s *statements) selectNextStateBlockNID() (types.StateBlockNID, error) {
var stateBlockNID int64
err := s.selectNextStateBlockNIDStmt.QueryRow().Scan(&stateBlockNID)
return types.StateBlockNID(stateBlockNID), err
}
func (s *statements) bulkSelectStateDataEntries(stateDataNIDs []types.StateDataNID) ([]types.StateEntryList, error) {
nids := make([]int64, len(stateDataNIDs))
for i := range stateDataNIDs {
nids[i] = int64(stateDataNIDs[i])
func (s *statements) bulkSelectStateDataEntries(stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error) {
nids := make([]int64, len(stateBlockNIDs))
for i := range stateBlockNIDs {
nids[i] = int64(stateBlockNIDs[i])
}
rows, err := s.bulkSelectStateDataEntriesStmt.Query(pq.Int64Array(nids))
if err != nil {
@ -750,37 +750,37 @@ func (s *statements) bulkSelectStateDataEntries(stateDataNIDs []types.StateDataN
}
defer rows.Close()
results := make([]types.StateEntryList, len(stateDataNIDs))
results := make([]types.StateEntryList, len(stateBlockNIDs))
// current is a pointer to the StateEntryList to append the state entries to.
var current *types.StateEntryList
i := 0
for rows.Next() {
var (
stateDataNID int64
stateBlockNID int64
eventTypeNID int64
eventStateKeyNID int64
eventNID int64
entry types.StateEntry
)
if err := rows.Scan(
&stateDataNID, &eventTypeNID, &eventStateKeyNID, &eventNID,
&stateBlockNID, &eventTypeNID, &eventStateKeyNID, &eventNID,
); err != nil {
return nil, err
}
entry.EventTypeNID = types.EventTypeNID(eventTypeNID)
entry.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID)
entry.EventNID = types.EventNID(eventNID)
if current == nil || types.StateDataNID(stateDataNID) != current.StateDataNID {
if current == nil || types.StateBlockNID(stateBlockNID) != current.StateBlockNID {
// The state entry row is for a different state data block to the current one.
// So we start appending to the next entry in the list.
current = &results[i]
current.StateDataNID = types.StateDataNID(stateDataNID)
current.StateBlockNID = types.StateBlockNID(stateBlockNID)
i++
}
current.StateEntries = append(current.StateEntries, entry)
}
if i != len(stateDataNIDs) {
return nil, fmt.Errorf("storage: state data NIDs missing from the database (%d != %d)", i, len(stateDataNIDs))
if i != len(stateBlockNIDs) {
return nil, fmt.Errorf("storage: state data NIDs missing from the database (%d != %d)", i, len(stateBlockNIDs))
}
return results, nil
}

View file

@ -161,19 +161,19 @@ func (d *Database) Events(eventNIDs []types.EventNID) ([]types.Event, error) {
}
// AddState implements input.EventDatabase
func (d *Database) AddState(roomNID types.RoomNID, stateDataNIDs []types.StateDataNID, state []types.StateEntry) (types.StateSnapshotNID, error) {
func (d *Database) AddState(roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error) {
if len(state) > 0 {
stateDataNID, err := d.statements.selectNextStateDataNID()
stateBlockNID, err := d.statements.selectNextStateBlockNID()
if err != nil {
return 0, err
}
if err = d.statements.bulkInsertStateData(stateDataNID, state); err != nil {
if err = d.statements.bulkInsertStateData(stateBlockNID, state); err != nil {
return 0, err
}
stateDataNIDs = append(stateDataNIDs[:len(stateDataNIDs):len(stateDataNIDs)], stateDataNID)
stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID)
}
return d.statements.insertState(roomNID, stateDataNIDs)
return d.statements.insertState(roomNID, stateBlockNIDs)
}
// SetState implements input.EventDatabase
@ -186,12 +186,12 @@ func (d *Database) StateAtEventIDs(eventIDs []string) ([]types.StateAtEvent, err
return d.statements.bulkSelectStateAtEventByID(eventIDs)
}
// StateDataNIDs implements input.EventDatabase
func (d *Database) StateDataNIDs(stateNIDs []types.StateSnapshotNID) ([]types.StateDataNIDList, error) {
return d.statements.bulkSelectStateDataNIDs(stateNIDs)
// StateBlockNIDs implements input.EventDatabase
func (d *Database) StateBlockNIDs(stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error) {
return d.statements.bulkSelectStateBlockNIDs(stateNIDs)
}
// StateEntries implements input.EventDatabase
func (d *Database) StateEntries(stateDataNIDs []types.StateDataNID) ([]types.StateEntryList, error) {
return d.statements.bulkSelectStateDataEntries(stateDataNIDs)
func (d *Database) StateEntries(stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error) {
return d.statements.bulkSelectStateDataEntries(stateBlockNIDs)
}

View file

@ -28,9 +28,9 @@ type RoomNID int64
// StateSnapshotNID is a numeric ID for the state at an event.
type StateSnapshotNID int64
// StateDataNID is a numeric ID for a block of state data.
// StateBlockNID is a numeric ID for a block of state data.
// These blocks of state data are combined to form the actual state.
type StateDataNID int64
type StateBlockNID int64
// A StateKeyTuple is a pair of a numeric event type and a numeric state key.
// It is used to lookup state entries.
@ -103,14 +103,14 @@ const (
EmptyStateKeyNID = 1
)
// StateDataNIDList is used to return the result of bulk StateDataNID lookups from the database.
type StateDataNIDList struct {
// StateBlockNIDList is used to return the result of bulk StateBlockNID lookups from the database.
type StateBlockNIDList struct {
StateSnapshotNID StateSnapshotNID
StateDataNIDs []StateDataNID
StateBlockNIDs []StateBlockNID
}
// StateEntryList is used to return the result of bulk state entry lookups from the database.
type StateEntryList struct {
StateDataNID StateDataNID
StateEntries []StateEntry
StateBlockNID StateBlockNID
StateEntries []StateEntry
}