Move queries to topology table
This commit is contained in:
parent
bb9e775a1b
commit
4f0d41be9c
|
@ -164,13 +164,6 @@ const selectContextAfterEventSQL = "" +
|
||||||
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||||
" ORDER BY id ASC LIMIT $3"
|
" ORDER BY id ASC LIMIT $3"
|
||||||
|
|
||||||
const selectTopologicalEventSQL = "" +
|
|
||||||
"SELECT se.headered_event_json, st.topological_position, st.stream_position " +
|
|
||||||
" FROM syncapi_output_room_events_topology st " +
|
|
||||||
" JOIN syncapi_output_room_events se ON se.event_id = st.event_id " +
|
|
||||||
" WHERE st.room_id = $1 AND st.topological_position < $2 AND se.type = $3 " +
|
|
||||||
" ORDER BY st.topological_position DESC LIMIT 1"
|
|
||||||
|
|
||||||
type outputRoomEventsStatements struct {
|
type outputRoomEventsStatements struct {
|
||||||
insertEventStmt *sql.Stmt
|
insertEventStmt *sql.Stmt
|
||||||
selectEventsStmt *sql.Stmt
|
selectEventsStmt *sql.Stmt
|
||||||
|
@ -185,7 +178,6 @@ type outputRoomEventsStatements struct {
|
||||||
selectContextEventStmt *sql.Stmt
|
selectContextEventStmt *sql.Stmt
|
||||||
selectContextBeforeEventStmt *sql.Stmt
|
selectContextBeforeEventStmt *sql.Stmt
|
||||||
selectContextAfterEventStmt *sql.Stmt
|
selectContextAfterEventStmt *sql.Stmt
|
||||||
selectTopologicalEventStmt *sql.Stmt
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
||||||
|
@ -208,7 +200,6 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
||||||
{&s.selectContextEventStmt, selectContextEventSQL},
|
{&s.selectContextEventStmt, selectContextEventSQL},
|
||||||
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
|
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
|
||||||
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
|
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
|
||||||
{&s.selectTopologicalEventStmt, selectTopologicalEventSQL},
|
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -589,32 +580,6 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
||||||
return lastID, evts, rows.Err()
|
return lastID, evts, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token.
|
|
||||||
// If not event was found, returns nil and sql.ErrNoRows.
|
|
||||||
func (s *outputRoomEventsStatements) SelectTopologicalEvent(
|
|
||||||
ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string,
|
|
||||||
) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) {
|
|
||||||
var (
|
|
||||||
eventBytes []byte
|
|
||||||
token types.TopologyToken
|
|
||||||
)
|
|
||||||
|
|
||||||
err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt).
|
|
||||||
QueryRowContext(ctx, roomID, topologicalPosition, eventType).
|
|
||||||
Scan(&eventBytes, &token.Depth, &token.PDUPosition)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, types.TopologyToken{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var res *gomatrixserverlib.HeaderedEvent
|
|
||||||
if err = json.Unmarshal(eventBytes, &res); err != nil {
|
|
||||||
return nil, types.TopologyToken{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, token, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
||||||
var result []types.StreamEvent
|
var result []types.StreamEvent
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
|
|
|
@ -17,6 +17,7 @@ package postgres
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
@ -65,8 +66,8 @@ const selectPositionInTopologySQL = "" +
|
||||||
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
|
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
|
||||||
" WHERE event_id = $1"
|
" WHERE event_id = $1"
|
||||||
|
|
||||||
// Select the max topological position for the room, then sort by stream position and take the highest,
|
// Select the max topological position for the room, then sort by stream position and take the highest,
|
||||||
// returning both topological and stream positions.
|
// returning both topological and stream positions.
|
||||||
const selectMaxPositionInTopologySQL = "" +
|
const selectMaxPositionInTopologySQL = "" +
|
||||||
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
|
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
|
||||||
" WHERE topological_position=(" +
|
" WHERE topological_position=(" +
|
||||||
|
@ -79,6 +80,13 @@ const selectStreamToTopologicalPositionAscSQL = "" +
|
||||||
const selectStreamToTopologicalPositionDescSQL = "" +
|
const selectStreamToTopologicalPositionDescSQL = "" +
|
||||||
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
|
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
|
||||||
|
|
||||||
|
const selectTopologicalEventSQL = "" +
|
||||||
|
"SELECT se.headered_event_json, st.topological_position, st.stream_position " +
|
||||||
|
" FROM syncapi_output_room_events_topology st " +
|
||||||
|
" JOIN syncapi_output_room_events se ON se.event_id = st.event_id " +
|
||||||
|
" WHERE st.room_id = $1 AND st.topological_position <= $2 AND se.type = $3 " +
|
||||||
|
" ORDER BY st.topological_position DESC LIMIT 1"
|
||||||
|
|
||||||
type outputRoomEventsTopologyStatements struct {
|
type outputRoomEventsTopologyStatements struct {
|
||||||
insertEventInTopologyStmt *sql.Stmt
|
insertEventInTopologyStmt *sql.Stmt
|
||||||
selectEventIDsInRangeASCStmt *sql.Stmt
|
selectEventIDsInRangeASCStmt *sql.Stmt
|
||||||
|
@ -87,6 +95,7 @@ type outputRoomEventsTopologyStatements struct {
|
||||||
selectMaxPositionInTopologyStmt *sql.Stmt
|
selectMaxPositionInTopologyStmt *sql.Stmt
|
||||||
selectStreamToTopologicalPositionAscStmt *sql.Stmt
|
selectStreamToTopologicalPositionAscStmt *sql.Stmt
|
||||||
selectStreamToTopologicalPositionDescStmt *sql.Stmt
|
selectStreamToTopologicalPositionDescStmt *sql.Stmt
|
||||||
|
selectTopologicalEventStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
|
@ -95,28 +104,16 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil {
|
return s, sqlutil.StatementList{
|
||||||
return nil, err
|
{&s.insertEventInTopologyStmt, insertEventInTopologySQL},
|
||||||
}
|
{&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL},
|
||||||
if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil {
|
{&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL},
|
||||||
return nil, err
|
{&s.selectPositionInTopologyStmt, selectPositionInTopologySQL},
|
||||||
}
|
{&s.selectMaxPositionInTopologyStmt, selectMaxPositionInTopologySQL},
|
||||||
if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil {
|
{&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL},
|
||||||
return nil, err
|
{&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL},
|
||||||
}
|
{&s.selectTopologicalEventStmt, selectTopologicalEventSQL},
|
||||||
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
|
}.Prepare(db)
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return s, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// InsertEventInTopology inserts the given event in the room's topology, based
|
// InsertEventInTopology inserts the given event in the room's topology, based
|
||||||
|
@ -196,3 +193,29 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
|
||||||
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token.
|
||||||
|
// If not event was found, returns nil and sql.ErrNoRows.
|
||||||
|
func (s *outputRoomEventsTopologyStatements) SelectTopologicalEvent(
|
||||||
|
ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string,
|
||||||
|
) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) {
|
||||||
|
var (
|
||||||
|
eventBytes []byte
|
||||||
|
token types.TopologyToken
|
||||||
|
)
|
||||||
|
|
||||||
|
err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt).
|
||||||
|
QueryRowContext(ctx, roomID, topologicalPosition, eventType).
|
||||||
|
Scan(&eventBytes, &token.Depth, &token.PDUPosition)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, types.TopologyToken{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var res *gomatrixserverlib.HeaderedEvent
|
||||||
|
if err = json.Unmarshal(eventBytes, &res); err != nil {
|
||||||
|
return nil, types.TopologyToken{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, token, nil
|
||||||
|
}
|
||||||
|
|
|
@ -46,10 +46,6 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
topology, err := NewPostgresTopologyTable(d.db)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
events, err := NewPostgresEventsTable(d.db)
|
events, err := NewPostgresEventsTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -66,6 +62,10 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
topology, err := NewPostgresTopologyTable(d.db)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
backwardExtremities, err := NewPostgresBackwardsExtremitiesTable(d.db)
|
backwardExtremities, err := NewPostgresBackwardsExtremitiesTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -1069,7 +1069,7 @@ func (d *Database) MaxStreamPositionForPresence(ctx context.Context) (types.Stre
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) SelectTopologicalEvent(ctx context.Context, topologicalPosition int, eventType, roomID string) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) {
|
func (d *Database) SelectTopologicalEvent(ctx context.Context, topologicalPosition int, eventType, roomID string) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) {
|
||||||
return d.OutputEvents.SelectTopologicalEvent(ctx, nil, topologicalPosition, eventType, roomID)
|
return d.Topology.SelectTopologicalEvent(ctx, nil, topologicalPosition, eventType, roomID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
|
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
|
||||||
|
|
|
@ -114,13 +114,6 @@ const selectContextAfterEventSQL = "" +
|
||||||
|
|
||||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||||
|
|
||||||
const selectTopologicalEventSQL = "" +
|
|
||||||
"SELECT headered_event_json, topological_position, stream_position " +
|
|
||||||
" FROM syncapi_output_room_events_topology " +
|
|
||||||
" JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_output_room_events_topology.event_id " +
|
|
||||||
" WHERE syncapi_output_room_events_topology.room_id = $1 AND topological_position < $2 AND type = $3 " +
|
|
||||||
" ORDER BY topological_position DESC LIMIT 1"
|
|
||||||
|
|
||||||
type outputRoomEventsStatements struct {
|
type outputRoomEventsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
streamIDStatements *StreamIDStatements
|
streamIDStatements *StreamIDStatements
|
||||||
|
@ -131,7 +124,6 @@ type outputRoomEventsStatements struct {
|
||||||
selectContextEventStmt *sql.Stmt
|
selectContextEventStmt *sql.Stmt
|
||||||
selectContextBeforeEventStmt *sql.Stmt
|
selectContextBeforeEventStmt *sql.Stmt
|
||||||
selectContextAfterEventStmt *sql.Stmt
|
selectContextAfterEventStmt *sql.Stmt
|
||||||
selectTopologicalEventStmt *sql.Stmt
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) {
|
func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) {
|
||||||
|
@ -151,7 +143,6 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
|
||||||
{&s.selectContextEventStmt, selectContextEventSQL},
|
{&s.selectContextEventStmt, selectContextEventSQL},
|
||||||
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
|
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
|
||||||
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
|
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
|
||||||
{&s.selectTopologicalEventStmt, selectTopologicalEventSQL},
|
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -609,31 +600,6 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
||||||
return lastID, evts, rows.Err()
|
return lastID, evts, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token.
|
|
||||||
// If not event was found, returns nil and sql.ErrNoRows.
|
|
||||||
func (s *outputRoomEventsStatements) SelectTopologicalEvent(
|
|
||||||
ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string,
|
|
||||||
) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) {
|
|
||||||
var (
|
|
||||||
eventBytes []byte
|
|
||||||
token types.TopologyToken
|
|
||||||
)
|
|
||||||
err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt).
|
|
||||||
QueryRowContext(ctx, roomID, topologicalPosition, eventType).
|
|
||||||
Scan(&eventBytes, &token.Depth, &token.PDUPosition)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, types.TopologyToken{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var res *gomatrixserverlib.HeaderedEvent
|
|
||||||
if err = json.Unmarshal(eventBytes, &res); err != nil {
|
|
||||||
return nil, types.TopologyToken{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, token, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs []string, err error) {
|
func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs []string, err error) {
|
||||||
if len(addIDsJSON) > 0 {
|
if len(addIDsJSON) > 0 {
|
||||||
if err = json.Unmarshal([]byte(addIDsJSON), &addIDs); err != nil {
|
if err = json.Unmarshal([]byte(addIDsJSON), &addIDs); err != nil {
|
||||||
|
|
|
@ -17,6 +17,7 @@ package sqlite3
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
|
@ -71,6 +72,13 @@ const selectStreamToTopologicalPositionAscSQL = "" +
|
||||||
const selectStreamToTopologicalPositionDescSQL = "" +
|
const selectStreamToTopologicalPositionDescSQL = "" +
|
||||||
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
|
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
|
||||||
|
|
||||||
|
const selectTopologicalEventSQL = "" +
|
||||||
|
"SELECT headered_event_json, topological_position, stream_position " +
|
||||||
|
" FROM syncapi_output_room_events_topology " +
|
||||||
|
" JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_output_room_events_topology.event_id " +
|
||||||
|
" WHERE syncapi_output_room_events_topology.room_id = $1 AND topological_position <= $2 AND type = $3 " +
|
||||||
|
" ORDER BY topological_position DESC LIMIT 1"
|
||||||
|
|
||||||
type outputRoomEventsTopologyStatements struct {
|
type outputRoomEventsTopologyStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
insertEventInTopologyStmt *sql.Stmt
|
insertEventInTopologyStmt *sql.Stmt
|
||||||
|
@ -80,6 +88,7 @@ type outputRoomEventsTopologyStatements struct {
|
||||||
selectMaxPositionInTopologyStmt *sql.Stmt
|
selectMaxPositionInTopologyStmt *sql.Stmt
|
||||||
selectStreamToTopologicalPositionAscStmt *sql.Stmt
|
selectStreamToTopologicalPositionAscStmt *sql.Stmt
|
||||||
selectStreamToTopologicalPositionDescStmt *sql.Stmt
|
selectStreamToTopologicalPositionDescStmt *sql.Stmt
|
||||||
|
selectTopologicalEventStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
|
func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
|
@ -90,28 +99,16 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil {
|
return s, sqlutil.StatementList{
|
||||||
return nil, err
|
{&s.insertEventInTopologyStmt, insertEventInTopologySQL},
|
||||||
}
|
{&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL},
|
||||||
if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil {
|
{&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL},
|
||||||
return nil, err
|
{&s.selectPositionInTopologyStmt, selectPositionInTopologySQL},
|
||||||
}
|
{&s.selectMaxPositionInTopologyStmt, selectMaxPositionInTopologySQL},
|
||||||
if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil {
|
{&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL},
|
||||||
return nil, err
|
{&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL},
|
||||||
}
|
{&s.selectTopologicalEventStmt, selectTopologicalEventSQL},
|
||||||
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
|
}.Prepare(db)
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return s, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// insertEventInTopology inserts the given event in the room's topology, based
|
// insertEventInTopology inserts the given event in the room's topology, based
|
||||||
|
@ -190,3 +187,28 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
|
||||||
err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token.
|
||||||
|
// If not event was found, returns nil and sql.ErrNoRows.
|
||||||
|
func (s *outputRoomEventsTopologyStatements) SelectTopologicalEvent(
|
||||||
|
ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string,
|
||||||
|
) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) {
|
||||||
|
var (
|
||||||
|
eventBytes []byte
|
||||||
|
token types.TopologyToken
|
||||||
|
)
|
||||||
|
err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt).
|
||||||
|
QueryRowContext(ctx, roomID, topologicalPosition, eventType).
|
||||||
|
Scan(&eventBytes, &token.Depth, &token.PDUPosition)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, types.TopologyToken{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var res *gomatrixserverlib.HeaderedEvent
|
||||||
|
if err = json.Unmarshal(eventBytes, &res); err != nil {
|
||||||
|
return nil, types.TopologyToken{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, token, nil
|
||||||
|
}
|
||||||
|
|
|
@ -56,10 +56,6 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
topology, err := NewSqliteTopologyTable(d.db)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
events, err := NewSqliteEventsTable(d.db, &d.streamID)
|
events, err := NewSqliteEventsTable(d.db, &d.streamID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -76,6 +72,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
topology, err := NewSqliteTopologyTable(d.db)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
bwExtrem, err := NewSqliteBackwardsExtremitiesTable(d.db)
|
bwExtrem, err := NewSqliteBackwardsExtremitiesTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -67,7 +67,6 @@ type Events interface {
|
||||||
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
|
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
|
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectTopologicalEvent(ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Topology keeps track of the depths and stream positions for all events.
|
// Topology keeps track of the depths and stream positions for all events.
|
||||||
|
@ -87,6 +86,7 @@ type Topology interface {
|
||||||
SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error)
|
SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error)
|
||||||
// SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room.
|
// SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room.
|
||||||
SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, forward bool) (topoPos types.StreamPosition, err error)
|
SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, forward bool) (topoPos types.StreamPosition, err error)
|
||||||
|
SelectTopologicalEvent(ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type CurrentRoomState interface {
|
type CurrentRoomState interface {
|
||||||
|
|
|
@ -29,20 +29,12 @@ func newOutputRoomEventsTable(t *testing.T, dbType test.DBType) (tables.Events,
|
||||||
var tab tables.Events
|
var tab tables.Events
|
||||||
switch dbType {
|
switch dbType {
|
||||||
case test.DBTypePostgres:
|
case test.DBTypePostgres:
|
||||||
_, err = postgres.NewPostgresTopologyTable(db) // needed, since there is a join on it
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unable to create table: %s", err)
|
|
||||||
}
|
|
||||||
tab, err = postgres.NewPostgresEventsTable(db)
|
tab, err = postgres.NewPostgresEventsTable(db)
|
||||||
case test.DBTypeSQLite:
|
case test.DBTypeSQLite:
|
||||||
var stream sqlite3.StreamIDStatements
|
var stream sqlite3.StreamIDStatements
|
||||||
if err = stream.Prepare(db); err != nil {
|
if err = stream.Prepare(db); err != nil {
|
||||||
t.Fatalf("failed to prepare stream stmts: %s", err)
|
t.Fatalf("failed to prepare stream stmts: %s", err)
|
||||||
}
|
}
|
||||||
_, err = sqlite3.NewSqliteTopologyTable(db) // needed, since there is a join on it
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unable to create table: %s", err)
|
|
||||||
}
|
|
||||||
tab, err = sqlite3.NewSqliteEventsTable(db, &stream)
|
tab, err = sqlite3.NewSqliteEventsTable(db, &stream)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -28,8 +28,20 @@ func newTopologyTable(t *testing.T, dbType test.DBType) (tables.Topology, *sql.D
|
||||||
var tab tables.Topology
|
var tab tables.Topology
|
||||||
switch dbType {
|
switch dbType {
|
||||||
case test.DBTypePostgres:
|
case test.DBTypePostgres:
|
||||||
|
_, err = postgres.NewPostgresEventsTable(db) // needed for SQL join
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to prepare events table")
|
||||||
|
}
|
||||||
tab, err = postgres.NewPostgresTopologyTable(db)
|
tab, err = postgres.NewPostgresTopologyTable(db)
|
||||||
case test.DBTypeSQLite:
|
case test.DBTypeSQLite:
|
||||||
|
var stream sqlite3.StreamIDStatements
|
||||||
|
if err = stream.Prepare(db); err != nil {
|
||||||
|
t.Fatalf("failed to prepare stream stmts: %s", err)
|
||||||
|
}
|
||||||
|
_, err = sqlite3.NewSqliteEventsTable(db, &stream) // needed for SQL join
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to prepare events table")
|
||||||
|
}
|
||||||
tab, err = sqlite3.NewSqliteTopologyTable(db)
|
tab, err = sqlite3.NewSqliteTopologyTable(db)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue