mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-06 22:43:10 -06:00
Return the requested portions of current state in the query API
This commit is contained in:
parent
1d18da1189
commit
0e84ca1cfe
|
|
@ -16,9 +16,26 @@ type RoomserverQueryAPIDatabase interface {
|
|||
// Returns 0 if the room doesn't exists.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
RoomNID(roomID string) (types.RoomNID, error)
|
||||
// Lookup event references for the latest events in the room.
|
||||
// Lookup event references for the latest events in the room and the current state snapshot.
|
||||
// Returns an error if there was a problem talking to the database.
|
||||
LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, error)
|
||||
LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, error)
|
||||
// Lookup the numeric IDs for a list of string event types.
|
||||
// Returns a map from string event type to numeric ID for the event type.
|
||||
EventTypeNIDs(eventTypes []string) (map[string]types.EventTypeNID, error)
|
||||
// Lookup the numeric IDs for a list of string event state keys.
|
||||
// Returns a map from string state key to numeric ID for the state key.
|
||||
EventStateKeyNIDs(eventStateKeys []string) (map[string]types.EventStateKeyNID, error)
|
||||
// Lookup the numeric state block IDs for each numeric state snapshot ID
|
||||
// The returned slice is sorted by numeric state snapshot ID.
|
||||
StateBlockNIDs(stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
|
||||
// Lookup the state data for the state key tuples for each numeric state block ID
|
||||
// The returned slice is sorted by numeric state block ID.
|
||||
StateEntriesForTuples(stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) (
|
||||
[]types.StateEntryList, error,
|
||||
)
|
||||
// Lookup the Events for a list of numeric event IDs.
|
||||
// Returns a sorted list of events.
|
||||
Events(eventNIDs []types.EventNID) ([]types.Event, error)
|
||||
}
|
||||
|
||||
// RoomserverQueryAPI is an implementation of RoomserverQueryAPI
|
||||
|
|
@ -40,9 +57,38 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
|
|||
return nil
|
||||
}
|
||||
response.RoomExists = true
|
||||
response.LatestEvents, err = r.DB.LatestEventIDs(roomNID)
|
||||
// TODO: look up the current state.
|
||||
return err
|
||||
var currentStateSnapshotNID types.StateSnapshotNID
|
||||
response.LatestEvents, currentStateSnapshotNID, err = r.DB.LatestEventIDs(roomNID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Lookup the currrent state for the requested tuples.
|
||||
stateTuples, err := stringTuplesToNumericTuples(r.DB, request.StateToFetch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stateEntries, err := loadStateAtSnapshotForTuples(r.DB, currentStateSnapshotNID, stateTuples)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eventNIDs := make([]types.EventNID, len(stateEntries))
|
||||
for i := range stateEntries {
|
||||
eventNIDs[i] = stateEntries[i].EventNID
|
||||
}
|
||||
|
||||
stateEvents, err := r.DB.Events(eventNIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
response.StateEvents = make([]gomatrixserverlib.Event, len(stateEvents))
|
||||
for i := range stateEvents {
|
||||
response.StateEvents[i] = stateEvents[i].Event
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetupHTTP adds the RoomserverQueryAPI handlers to the http.ServeMux.
|
||||
|
|
|
|||
130
src/github.com/matrix-org/dendrite/roomserver/query/state.go
Normal file
130
src/github.com/matrix-org/dendrite/roomserver/query/state.go
Normal file
|
|
@ -0,0 +1,130 @@
|
|||
package query
|
||||
|
||||
import (
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"sort"
|
||||
)
|
||||
|
||||
func stringTuplesToNumericTuples(db RoomserverQueryAPIDatabase, stringTuples []api.StateKeyTuple) ([]types.StateKeyTuple, error) {
|
||||
eventTypes := make([]string, len(stringTuples))
|
||||
stateKeys := make([]string, len(stringTuples))
|
||||
for i := range stringTuples {
|
||||
eventTypes[i] = stringTuples[i].EventType
|
||||
stateKeys[i] = stringTuples[i].EventStateKey
|
||||
}
|
||||
sort.Strings(eventTypes)
|
||||
eventTypes = eventTypes[:unique(sort.StringSlice(eventTypes))]
|
||||
eventTypeMap, err := db.EventTypeNIDs(eventTypes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sort.Strings(stateKeys)
|
||||
stateKeys = stateKeys[:unique(sort.StringSlice(stateKeys))]
|
||||
stateKeyMap, err := db.EventStateKeyNIDs(stateKeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []types.StateKeyTuple
|
||||
for _, stringTuple := range stringTuples {
|
||||
var numericTuple types.StateKeyTuple
|
||||
var ok1, ok2 bool
|
||||
numericTuple.EventTypeNID, ok1 = eventTypeMap[stringTuple.EventType]
|
||||
numericTuple.EventStateKeyNID, ok2 = stateKeyMap[stringTuple.EventStateKey]
|
||||
if ok1 && ok2 {
|
||||
result = append(result, numericTuple)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// loadStateAtSnapshot loads the state of a list of event type and state key pairs in a room at a snapshot.
|
||||
// This is typically the state before an event or the current state of a room.
|
||||
// Returns a sorted list of state entries or an error if there was a problem talking to the database.
|
||||
func loadStateAtSnapshotForTuples(db RoomserverQueryAPIDatabase, stateNID types.StateSnapshotNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntry, error) {
|
||||
stateBlockNIDLists, err := db.StateBlockNIDs([]types.StateSnapshotNID{stateNID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stateBlockNIDList := stateBlockNIDLists[0]
|
||||
|
||||
stateEntryLists, err := db.StateEntriesForTuples(stateBlockNIDList.StateBlockNIDs, stateKeyTuples)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stateEntriesMap := stateEntryListMap(stateEntryLists)
|
||||
|
||||
// Combined all the state entries for this snapshot.
|
||||
// The order of state block NIDs in the list tells us the order to combine them in.
|
||||
var fullState []types.StateEntry
|
||||
for _, stateBlockNID := range stateBlockNIDList.StateBlockNIDs {
|
||||
entries, ok := stateEntriesMap.lookup(stateBlockNID)
|
||||
if !ok {
|
||||
// If the block is missing from the map it means that none of its entries matched a requested tuple.
|
||||
continue
|
||||
}
|
||||
fullState = append(fullState, entries...)
|
||||
}
|
||||
|
||||
// Stable sort so that the most recent entry for each state key stays
|
||||
// remains later in the list than the older entries for the same state key.
|
||||
sort.Stable(stateEntryByStateKeySorter(fullState))
|
||||
// Unique returns the last entry and hence the most recent entry for each state key.
|
||||
fullState = fullState[:unique(stateEntryByStateKeySorter(fullState))]
|
||||
return fullState, nil
|
||||
}
|
||||
|
||||
type stateEntryListMap []types.StateEntryList
|
||||
|
||||
func (m stateEntryListMap) lookup(stateBlockNID types.StateBlockNID) (stateEntries []types.StateEntry, ok bool) {
|
||||
list := []types.StateEntryList(m)
|
||||
i := sort.Search(len(list), func(i int) bool {
|
||||
return list[i].StateBlockNID >= stateBlockNID
|
||||
})
|
||||
if i < len(list) && list[i].StateBlockNID == stateBlockNID {
|
||||
ok = true
|
||||
stateEntries = list[i].StateEntries
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type stateEntryByStateKeySorter []types.StateEntry
|
||||
|
||||
func (s stateEntryByStateKeySorter) Len() int { return len(s) }
|
||||
func (s stateEntryByStateKeySorter) Less(i, j int) bool {
|
||||
return s[i].StateKeyTuple.LessThan(s[j].StateKeyTuple)
|
||||
}
|
||||
func (s stateEntryByStateKeySorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||
|
||||
// Remove duplicate items from a sorted list.
|
||||
// Takes the same interface as sort.Sort
|
||||
// Returns the length of the data without duplicates
|
||||
// Uses the last occurance of a duplicate.
|
||||
// O(n).
|
||||
func unique(data sort.Interface) int {
|
||||
if data.Len() == 0 {
|
||||
return 0
|
||||
}
|
||||
length := data.Len()
|
||||
// j is the next index to output an element to.
|
||||
j := 0
|
||||
for i := 1; i < length; i++ {
|
||||
// If the previous element is less than this element then they are
|
||||
// not equal. Otherwise they must be equal because the list is sorted.
|
||||
// If they are equal then we move onto the next element.
|
||||
if data.Less(i-1, i) {
|
||||
// "Write" the previous element to the output position by swaping
|
||||
// the elements.
|
||||
// Note that if the list has no duplicates then i-1 == j so the
|
||||
// swap does nothing. (This assumes that data.Swap(a,b) nops if a==b)
|
||||
data.Swap(i-1, j)
|
||||
// Advance to the next output position in the list.
|
||||
j++
|
||||
}
|
||||
}
|
||||
// Output the last element.
|
||||
data.Swap(length-1, j)
|
||||
return j + 1
|
||||
}
|
||||
|
|
@ -365,7 +365,12 @@ func main() {
|
|||
testRoomserver(input, want, func(q api.RoomserverQueryAPI) {
|
||||
var response api.QueryLatestEventsAndStateResponse
|
||||
if err := q.QueryLatestEventsAndState(
|
||||
&api.QueryLatestEventsAndStateRequest{RoomID: "!HCXfdvrfksxuYnIFiJ:matrix.org"},
|
||||
&api.QueryLatestEventsAndStateRequest{
|
||||
RoomID: "!HCXfdvrfksxuYnIFiJ:matrix.org",
|
||||
StateToFetch: []api.StateKeyTuple{
|
||||
{"m.room.member", "@richvdh:matrix.org"},
|
||||
},
|
||||
},
|
||||
&response,
|
||||
); err != nil {
|
||||
panic(err)
|
||||
|
|
@ -376,6 +381,9 @@ func main() {
|
|||
if len(response.LatestEvents) != 1 || response.LatestEvents[0].EventID != "$1463671339126270PnVwC:matrix.org" {
|
||||
panic(fmt.Errorf(`Wanted "$1463671339126270PnVwC:matrix.org" to be the latest event got %#v`, response.LatestEvents))
|
||||
}
|
||||
if len(response.StateEvents) != 1 || response.StateEvents[0].EventID() != "$1463671339126270PnVwC:matrix.org" {
|
||||
panic(fmt.Errorf(`Wanted "$1463671339126270PnVwC:matrix.org" to be the state event got %#v`, response.StateEvents))
|
||||
}
|
||||
})
|
||||
|
||||
fmt.Println("==PASSED==", os.Args[0])
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
||||
|
|
@ -66,9 +67,16 @@ const insertEventTypeNIDSQL = "" +
|
|||
const selectEventTypeNIDSQL = "" +
|
||||
"SELECT event_type_nid FROM event_types WHERE event_type = $1"
|
||||
|
||||
// Bulk lookup from string event type to numeric ID for that event type.
|
||||
// Takes an array of strings as the query parameter.
|
||||
const bulkSelectEventTypeNIDSQL = "" +
|
||||
"SELECT event_type, event_type_nid FROM event_types" +
|
||||
" WHERE event_type = ANY($1)"
|
||||
|
||||
type eventTypeStatements struct {
|
||||
insertEventTypeNIDStmt *sql.Stmt
|
||||
selectEventTypeNIDStmt *sql.Stmt
|
||||
insertEventTypeNIDStmt *sql.Stmt
|
||||
selectEventTypeNIDStmt *sql.Stmt
|
||||
bulkSelectEventTypeNIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *eventTypeStatements) prepare(db *sql.DB) (err error) {
|
||||
|
|
@ -80,6 +88,7 @@ func (s *eventTypeStatements) prepare(db *sql.DB) (err error) {
|
|||
return statementList{
|
||||
{&s.insertEventTypeNIDStmt, insertEventTypeNIDSQL},
|
||||
{&s.selectEventTypeNIDStmt, selectEventTypeNIDSQL},
|
||||
{&s.bulkSelectEventTypeNIDStmt, bulkSelectEventTypeNIDSQL},
|
||||
}.prepare(db)
|
||||
}
|
||||
|
||||
|
|
@ -94,3 +103,22 @@ func (s *eventTypeStatements) selectEventTypeNID(eventType string) (types.EventT
|
|||
err := s.selectEventTypeNIDStmt.QueryRow(eventType).Scan(&eventTypeNID)
|
||||
return types.EventTypeNID(eventTypeNID), err
|
||||
}
|
||||
|
||||
func (s *eventTypeStatements) bulkSelectEventTypeNID(eventTypes []string) (map[string]types.EventTypeNID, error) {
|
||||
rows, err := s.bulkSelectEventTypeNIDStmt.Query(pq.StringArray(eventTypes))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
result := make(map[string]types.EventTypeNID, len(eventTypes))
|
||||
for rows.Next() {
|
||||
var eventType string
|
||||
var eventTypeNID int64
|
||||
if err := rows.Scan(&eventType, &eventTypeNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result[eventType] = types.EventTypeNID(eventTypeNID)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ const selectRoomNIDSQL = "" +
|
|||
"SELECT room_nid FROM rooms WHERE room_id = $1"
|
||||
|
||||
const selectLatestEventNIDsSQL = "" +
|
||||
"SELECT latest_event_nids FROM rooms WHERE room_nid = $1"
|
||||
"SELECT latest_event_nids, state_snapshot_nid FROM rooms WHERE room_nid = $1"
|
||||
|
||||
const selectLatestEventNIDsForUpdateSQL = "" +
|
||||
"SELECT latest_event_nids, last_event_sent_nid, state_snapshot_nid FROM rooms WHERE room_nid = $1 FOR UPDATE"
|
||||
|
|
@ -77,17 +77,18 @@ func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) {
|
|||
return types.RoomNID(roomNID), err
|
||||
}
|
||||
|
||||
func (s *roomStatements) selectLatestEventNIDs(roomNID types.RoomNID) ([]types.EventNID, error) {
|
||||
func (s *roomStatements) selectLatestEventNIDs(roomNID types.RoomNID) ([]types.EventNID, types.StateSnapshotNID, error) {
|
||||
var nids pq.Int64Array
|
||||
err := s.selectLatestEventNIDsStmt.QueryRow(int64(roomNID)).Scan(&nids)
|
||||
var stateSnapshotNID int64
|
||||
err := s.selectLatestEventNIDsStmt.QueryRow(int64(roomNID)).Scan(&nids, &stateSnapshotNID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, 0, err
|
||||
}
|
||||
eventNIDs := make([]types.EventNID, len(nids))
|
||||
for i := range nids {
|
||||
eventNIDs[i] = types.EventNID(nids[i])
|
||||
}
|
||||
return eventNIDs, nil
|
||||
return eventNIDs, types.StateSnapshotNID(stateSnapshotNID), nil
|
||||
}
|
||||
|
||||
func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) (
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"sort"
|
||||
)
|
||||
|
||||
const stateDataSchema = `
|
||||
|
|
@ -35,21 +36,35 @@ const insertStateDataSQL = "" +
|
|||
const selectNextStateBlockNIDSQL = "" +
|
||||
"SELECT nextval('state_block_nid_seq')"
|
||||
|
||||
// Bulk state lookup by numeric event ID.
|
||||
// Bulk state lookup by numeric state block ID.
|
||||
// Sort by the state_block_nid, event_type_nid, event_state_key_nid
|
||||
// This means that all the entries for a given state_block_nid will appear
|
||||
// together in the list and those entries will sorted by event_type_nid
|
||||
// and event_state_key_nid. This property makes it easier to merge two
|
||||
// state data blocks together.
|
||||
const bulkSelectStateDataEntriesSQL = "" +
|
||||
const bulkSelectStateBlockEntriesSQL = "" +
|
||||
"SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" +
|
||||
" FROM state_block WHERE state_block_nid = ANY($1)" +
|
||||
" ORDER BY state_block_nid, event_type_nid, event_state_key_nid"
|
||||
|
||||
// Bulk state lookup by numeric state block ID.
|
||||
// Filters the rows in each block to the requested types and state keys.
|
||||
// We would like to restrict to particular type state key pairs but we are
|
||||
// restricted by the query language to pull the cross product of a list
|
||||
// of types and a list state_keys. So we have to filter the result in the
|
||||
// application to restrict it to the list of event types and state keys we
|
||||
// actually wanted.
|
||||
const bulkSelectFilteredStateBlockEntriesSQL = "" +
|
||||
"SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" +
|
||||
" FROM state_block WHERE state_block_nid = ANY($1)" +
|
||||
" AND event_type_nid = ANY($2) AND event_state_key_nid = ANY($3)" +
|
||||
" ORDER BY state_block_nid, event_type_nid, event_state_key_nid"
|
||||
|
||||
type stateBlockStatements struct {
|
||||
insertStateDataStmt *sql.Stmt
|
||||
selectNextStateBlockNIDStmt *sql.Stmt
|
||||
bulkSelectStateDataEntriesStmt *sql.Stmt
|
||||
insertStateDataStmt *sql.Stmt
|
||||
selectNextStateBlockNIDStmt *sql.Stmt
|
||||
bulkSelectStateBlockEntriesStmt *sql.Stmt
|
||||
bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *stateBlockStatements) prepare(db *sql.DB) (err error) {
|
||||
|
|
@ -61,7 +76,8 @@ func (s *stateBlockStatements) prepare(db *sql.DB) (err error) {
|
|||
return statementList{
|
||||
{&s.insertStateDataStmt, insertStateDataSQL},
|
||||
{&s.selectNextStateBlockNIDStmt, selectNextStateBlockNIDSQL},
|
||||
{&s.bulkSelectStateDataEntriesStmt, bulkSelectStateDataEntriesSQL},
|
||||
{&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL},
|
||||
{&s.bulkSelectFilteredStateBlockEntriesStmt, bulkSelectFilteredStateBlockEntriesSQL},
|
||||
}.prepare(db)
|
||||
}
|
||||
|
||||
|
|
@ -86,12 +102,12 @@ func (s *stateBlockStatements) selectNextStateBlockNID() (types.StateBlockNID, e
|
|||
return types.StateBlockNID(stateBlockNID), err
|
||||
}
|
||||
|
||||
func (s *stateBlockStatements) bulkSelectStateDataEntries(stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error) {
|
||||
func (s *stateBlockStatements) bulkSelectStateBlockEntries(stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error) {
|
||||
nids := make([]int64, len(stateBlockNIDs))
|
||||
for i := range stateBlockNIDs {
|
||||
nids[i] = int64(stateBlockNIDs[i])
|
||||
}
|
||||
rows, err := s.bulkSelectStateDataEntriesStmt.Query(pq.Int64Array(nids))
|
||||
rows, err := s.bulkSelectStateBlockEntriesStmt.Query(pq.Int64Array(nids))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -131,3 +147,133 @@ func (s *stateBlockStatements) bulkSelectStateDataEntries(stateBlockNIDs []types
|
|||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (s *stateBlockStatements) bulkSelectFilteredStateBlockEntries(
|
||||
stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple,
|
||||
) ([]types.StateEntryList, error) {
|
||||
tuples := stateKeyTupleSorter(stateKeyTuples)
|
||||
sort.Sort(tuples)
|
||||
|
||||
eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays()
|
||||
rows, err := s.bulkSelectFilteredStateBlockEntriesStmt.Query(
|
||||
stateBlockNIDsAsArray(stateBlockNIDs), eventTypeNIDArray, eventStateKeyNIDArray,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
results := make([]types.StateEntryList, len(stateBlockNIDs))
|
||||
// current is a pointer to the StateEntryList to append the state entries to.
|
||||
var current *types.StateEntryList
|
||||
i := 0
|
||||
for rows.Next() {
|
||||
var (
|
||||
stateBlockNID int64
|
||||
eventTypeNID int64
|
||||
eventStateKeyNID int64
|
||||
eventNID int64
|
||||
entry types.StateEntry
|
||||
)
|
||||
if err := rows.Scan(
|
||||
&stateBlockNID, &eventTypeNID, &eventStateKeyNID, &eventNID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
entry.EventTypeNID = types.EventTypeNID(eventTypeNID)
|
||||
entry.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID)
|
||||
entry.EventNID = types.EventNID(eventNID)
|
||||
|
||||
if !tuples.contains(entry.StateKeyTuple) {
|
||||
// The select will return the cross product of types and state keys.
|
||||
// So we need to check if type of the entry is in the list.
|
||||
continue
|
||||
}
|
||||
|
||||
if current == nil || types.StateBlockNID(stateBlockNID) != current.StateBlockNID {
|
||||
// The state entry row is for a different state data block to the current one.
|
||||
// So we start appending to the next entry in the list.
|
||||
current = &results[i]
|
||||
current.StateBlockNID = types.StateBlockNID(stateBlockNID)
|
||||
i++
|
||||
}
|
||||
current.StateEntries = append(current.StateEntries, entry)
|
||||
}
|
||||
// Because we have filtered the list it's possible that some of the blocks were completely removed
|
||||
// from the result.
|
||||
return results[:i], nil
|
||||
}
|
||||
|
||||
func stateBlockNIDsAsArray(stateBlockNIDs []types.StateBlockNID) pq.Int64Array {
|
||||
nids := make([]int64, len(stateBlockNIDs))
|
||||
for i := range stateBlockNIDs {
|
||||
nids[i] = int64(stateBlockNIDs[i])
|
||||
}
|
||||
return pq.Int64Array(nids)
|
||||
}
|
||||
|
||||
type stateKeyTupleSorter []types.StateKeyTuple
|
||||
|
||||
func (s stateKeyTupleSorter) Len() int { return len(s) }
|
||||
func (s stateKeyTupleSorter) Less(i, j int) bool { return s[i].LessThan(s[j]) }
|
||||
func (s stateKeyTupleSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||
|
||||
// Check whether a tuple is in the list. Assumes that the list is sorted.
|
||||
func (s stateKeyTupleSorter) contains(value types.StateKeyTuple) bool {
|
||||
i := sort.Search(len(s), func(i int) bool { return !s[i].LessThan(value) })
|
||||
return i < len(s) && s[i] == value
|
||||
}
|
||||
|
||||
// List the unique eventTypeNIDs and eventStateKeyNIDs.
|
||||
// Assumes that the list is sorted.
|
||||
func (s stateKeyTupleSorter) typesAndStateKeysAsArrays() (eventTypeNIDs pq.Int64Array, eventStateKeyNIDs pq.Int64Array) {
|
||||
eventTypeNIDs = make(pq.Int64Array, len(s))
|
||||
eventStateKeyNIDs = make(pq.Int64Array, len(s))
|
||||
for i := range s {
|
||||
eventTypeNIDs[i] = int64(s[i].EventTypeNID)
|
||||
eventStateKeyNIDs[i] = int64(s[i].EventStateKeyNID)
|
||||
}
|
||||
// The event types are already sorted because the tuples were sorted.
|
||||
eventTypeNIDs = eventTypeNIDs[:unique(int64Sorter(eventTypeNIDs))]
|
||||
// The event state keys need to be sorted.
|
||||
sort.Sort(int64Sorter(eventStateKeyNIDs))
|
||||
eventStateKeyNIDs = eventStateKeyNIDs[:unique(int64Sorter(eventStateKeyNIDs))]
|
||||
return
|
||||
}
|
||||
|
||||
type int64Sorter []int64
|
||||
|
||||
func (s int64Sorter) Len() int { return len(s) }
|
||||
func (s int64Sorter) Less(i, j int) bool { return s[i] < s[j] }
|
||||
func (s int64Sorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||
|
||||
// Remove duplicate items from a sorted list.
|
||||
// Takes the same interface as sort.Sort
|
||||
// Returns the length of the data without duplicates
|
||||
// Uses the last occurance of a duplicate.
|
||||
// O(n).
|
||||
func unique(data sort.Interface) int {
|
||||
if data.Len() == 0 {
|
||||
return 0
|
||||
}
|
||||
length := data.Len()
|
||||
// j is the next index to output an element to.
|
||||
j := 0
|
||||
for i := 1; i < length; i++ {
|
||||
// If the previous element is less than this element then they are
|
||||
// not equal. Otherwise they must be equal because the list is sorted.
|
||||
// If they are equal then we move onto the next element.
|
||||
if data.Less(i-1, i) {
|
||||
// "Write" the previous element to the output position by swaping
|
||||
// the elements.
|
||||
// Note that if the list has no duplicates then i-1 == j so the
|
||||
// swap does nothing. (This assumes that data.Swap(a,b) nops if a==b)
|
||||
data.Swap(i-1, j)
|
||||
// Advance to the next output position in the list.
|
||||
j++
|
||||
}
|
||||
}
|
||||
// Output the last element.
|
||||
data.Swap(length-1, j)
|
||||
return j + 1
|
||||
}
|
||||
|
|
|
|||
|
|
@ -145,7 +145,12 @@ func (d *Database) StateEntriesForEventIDs(eventIDs []string) ([]types.StateEntr
|
|||
return d.statements.bulkSelectStateEventByID(eventIDs)
|
||||
}
|
||||
|
||||
// EventStateKeyNIDs implements input.EventDatabase
|
||||
// EventTypeNIDs implements query.RoomserverQueryAPIDatabase
|
||||
func (d *Database) EventTypeNIDs(eventTypes []string) (map[string]types.EventTypeNID, error) {
|
||||
return d.statements.bulkSelectEventTypeNID(eventTypes)
|
||||
}
|
||||
|
||||
// EventStateKeyNIDs implements input.EventDatabase and query.RoomserverQueryAPIDatabase
|
||||
func (d *Database) EventStateKeyNIDs(eventStateKeys []string) (map[string]types.EventStateKeyNID, error) {
|
||||
return d.statements.bulkSelectEventStateKeyNID(eventStateKeys)
|
||||
}
|
||||
|
|
@ -195,14 +200,14 @@ func (d *Database) StateAtEventIDs(eventIDs []string) ([]types.StateAtEvent, err
|
|||
return d.statements.bulkSelectStateAtEventByID(eventIDs)
|
||||
}
|
||||
|
||||
// StateBlockNIDs implements input.EventDatabase
|
||||
// StateBlockNIDs implements input.EventDatabase and query.RoomserverQueryAPIDatabase
|
||||
func (d *Database) StateBlockNIDs(stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error) {
|
||||
return d.statements.bulkSelectStateBlockNIDs(stateNIDs)
|
||||
}
|
||||
|
||||
// StateEntries implements input.EventDatabase
|
||||
func (d *Database) StateEntries(stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error) {
|
||||
return d.statements.bulkSelectStateDataEntries(stateBlockNIDs)
|
||||
return d.statements.bulkSelectStateBlockEntries(stateBlockNIDs)
|
||||
}
|
||||
|
||||
// EventIDs implements input.RoomEventDatabase
|
||||
|
|
@ -324,10 +329,21 @@ func (d *Database) RoomNID(roomID string) (types.RoomNID, error) {
|
|||
}
|
||||
|
||||
// LatestEventIDs implements query.RoomserverQueryAPIDB
|
||||
func (d *Database) LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, error) {
|
||||
eventNIDs, err := d.statements.selectLatestEventNIDs(roomNID)
|
||||
func (d *Database) LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, error) {
|
||||
eventNIDs, currentStateSnapshotNID, err := d.statements.selectLatestEventNIDs(roomNID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, 0, err
|
||||
}
|
||||
return d.statements.bulkSelectEventReference(eventNIDs)
|
||||
references, err := d.statements.bulkSelectEventReference(eventNIDs)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return references, currentStateSnapshotNID, nil
|
||||
}
|
||||
|
||||
// StateEntriesForTuples implements query.RoomserverQueryAPIDB
|
||||
func (d *Database) StateEntriesForTuples(
|
||||
stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple,
|
||||
) ([]types.StateEntryList, error) {
|
||||
return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples)
|
||||
}
|
||||
|
|
|
|||
2
vendor/manifest
vendored
2
vendor/manifest
vendored
|
|
@ -92,7 +92,7 @@
|
|||
{
|
||||
"importpath": "github.com/matrix-org/gomatrixserverlib",
|
||||
"repository": "https://github.com/matrix-org/gomatrixserverlib",
|
||||
"revision": "48ee56a33d195dc412dd919a0e81af70c9aaf4a3",
|
||||
"revision": "1745e159425b4e060ade220aaa89abc94ed013b3",
|
||||
"branch": "master"
|
||||
},
|
||||
{
|
||||
|
|
|
|||
|
|
@ -326,6 +326,21 @@ func (e Event) Depth() int64 {
|
|||
return e.fields.Depth
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements json.Unmarshaller assuming the Event is from an untrusted source.
|
||||
// This will cause more checks than might be necessary but is probably better to be safe than sorry.
|
||||
func (e *Event) UnmarshalJSON(data []byte) (err error) {
|
||||
*e, err = NewEventFromUntrustedJSON(data)
|
||||
return
|
||||
}
|
||||
|
||||
// MarshalJSON implements json.Marshaller
|
||||
func (e Event) MarshalJSON() ([]byte, error) {
|
||||
if e.eventJSON == nil {
|
||||
return nil, fmt.Errorf("gomatrixserverlib: cannot serialise uninitialised Event")
|
||||
}
|
||||
return e.eventJSON, nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements json.Unmarshaller
|
||||
func (er *EventReference) UnmarshalJSON(data []byte) error {
|
||||
var tuple []rawJSON
|
||||
|
|
|
|||
Loading…
Reference in a new issue