mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-07 06:03:09 -06:00
Update SQL queries to include history_visibility
This commit is contained in:
parent
989825b8fa
commit
f9c7013061
|
|
@ -67,7 +67,9 @@ type Database interface {
|
||||||
// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
|
// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
|
||||||
// Returns an error if there was a problem inserting this event.
|
// Returns an error if there was a problem inserting this event.
|
||||||
WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []*gomatrixserverlib.HeaderedEvent,
|
WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []*gomatrixserverlib.HeaderedEvent,
|
||||||
addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error)
|
addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool,
|
||||||
|
historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||||
|
) (types.StreamPosition, error)
|
||||||
// PurgeRoomState completely purges room state from the sync API. This is done when
|
// PurgeRoomState completely purges room state from the sync API. This is done when
|
||||||
// receiving an output event that completely resets the state.
|
// receiving an output event that completely resets the state.
|
||||||
PurgeRoomState(ctx context.Context, roomID string) error
|
PurgeRoomState(ctx context.Context, roomID string) error
|
||||||
|
|
|
||||||
|
|
@ -104,7 +104,7 @@ const selectEventsWithEventIDsSQL = "" +
|
||||||
// the rowsToStreamEvents expects there to be exactly six columns. We need to
|
// the rowsToStreamEvents expects there to be exactly six columns. We need to
|
||||||
// figure out if these really need to be in the DB, and if so, we need a
|
// figure out if these really need to be in the DB, and if so, we need a
|
||||||
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
||||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
|
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, 3 as history_visibility" +
|
||||||
" FROM syncapi_current_room_state WHERE event_id = ANY($1)"
|
" FROM syncapi_current_room_state WHERE event_id = ANY($1)"
|
||||||
|
|
||||||
type currentRoomStateStatements struct {
|
type currentRoomStateStatements struct {
|
||||||
|
|
|
||||||
|
|
@ -68,8 +68,8 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
|
||||||
-- that relates to the moment these were retrieved rather than the moment these
|
-- that relates to the moment these were retrieved rather than the moment these
|
||||||
-- were emitted.
|
-- were emitted.
|
||||||
exclude_from_sync BOOL DEFAULT FALSE,
|
exclude_from_sync BOOL DEFAULT FALSE,
|
||||||
-- The history visibility before this event (0 - world_readable; 1 - shared; 2 - invited; 3 - joined)
|
-- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined)
|
||||||
history_visibility SMALLINT NOT NULL DEFAULT 3
|
history_visibility SMALLINT NOT NULL DEFAULT 2
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type);
|
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type);
|
||||||
|
|
@ -80,16 +80,16 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s
|
||||||
|
|
||||||
const insertEventSQL = "" +
|
const insertEventSQL = "" +
|
||||||
"INSERT INTO syncapi_output_room_events (" +
|
"INSERT INTO syncapi_output_room_events (" +
|
||||||
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
|
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
|
||||||
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " +
|
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
|
||||||
"ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
|
"ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
|
||||||
"RETURNING id"
|
"RETURNING id"
|
||||||
|
|
||||||
const selectEventsSQL = "" +
|
const selectEventsSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
||||||
|
|
||||||
const selectEventsWithFilterSQL = "" +
|
const selectEventsWithFilterSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" +
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)" +
|
||||||
" AND ( $2::text[] IS NULL OR sender = ANY($2) )" +
|
" AND ( $2::text[] IS NULL OR sender = ANY($2) )" +
|
||||||
" AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" +
|
" AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" +
|
||||||
" AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
|
" AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
|
||||||
|
|
@ -98,7 +98,7 @@ const selectEventsWithFilterSQL = "" +
|
||||||
" LIMIT $7"
|
" LIMIT $7"
|
||||||
|
|
||||||
const selectRecentEventsSQL = "" +
|
const selectRecentEventsSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||||
|
|
@ -107,7 +107,7 @@ const selectRecentEventsSQL = "" +
|
||||||
" ORDER BY id DESC LIMIT $8"
|
" ORDER BY id DESC LIMIT $8"
|
||||||
|
|
||||||
const selectRecentEventsForSyncSQL = "" +
|
const selectRecentEventsForSyncSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
|
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
|
||||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||||
|
|
@ -116,7 +116,7 @@ const selectRecentEventsForSyncSQL = "" +
|
||||||
" ORDER BY id DESC LIMIT $8"
|
" ORDER BY id DESC LIMIT $8"
|
||||||
|
|
||||||
const selectEarlyEventsSQL = "" +
|
const selectEarlyEventsSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||||
|
|
@ -132,7 +132,7 @@ const updateEventJSONSQL = "" +
|
||||||
|
|
||||||
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
||||||
const selectStateInRangeSQL = "" +
|
const selectStateInRangeSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
|
||||||
" FROM syncapi_output_room_events" +
|
" FROM syncapi_output_room_events" +
|
||||||
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||||
" AND room_id = ANY($3)" +
|
" AND room_id = ANY($3)" +
|
||||||
|
|
@ -148,10 +148,10 @@ const deleteEventsForRoomSQL = "" +
|
||||||
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
||||||
|
|
||||||
const selectContextEventSQL = "" +
|
const selectContextEventSQL = "" +
|
||||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
||||||
|
|
||||||
const selectContextBeforeEventSQL = "" +
|
const selectContextBeforeEventSQL = "" +
|
||||||
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
|
"SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
|
||||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||||
|
|
@ -159,7 +159,7 @@ const selectContextBeforeEventSQL = "" +
|
||||||
" ORDER BY id DESC LIMIT $3"
|
" ORDER BY id DESC LIMIT $3"
|
||||||
|
|
||||||
const selectContextAfterEventSQL = "" +
|
const selectContextAfterEventSQL = "" +
|
||||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
|
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
|
||||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||||
|
|
@ -248,14 +248,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
eventID string
|
eventID string
|
||||||
streamPos types.StreamPosition
|
streamPos types.StreamPosition
|
||||||
eventBytes []byte
|
eventBytes []byte
|
||||||
excludeFromSync bool
|
excludeFromSync bool
|
||||||
addIDs pq.StringArray
|
addIDs pq.StringArray
|
||||||
delIDs pq.StringArray
|
delIDs pq.StringArray
|
||||||
|
historyVisibility uint8
|
||||||
)
|
)
|
||||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
|
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs, &historyVisibility); err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
// Sanity check for deleted state and whine if we see it. We don't need to do anything
|
// Sanity check for deleted state and whine if we see it. We don't need to do anything
|
||||||
|
|
@ -285,6 +286,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
||||||
needSet[id] = true
|
needSet[id] = true
|
||||||
}
|
}
|
||||||
stateNeeded[ev.RoomID()] = needSet
|
stateNeeded[ev.RoomID()] = needSet
|
||||||
|
ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||||
|
|
||||||
eventIDToEvent[eventID] = types.StreamEvent{
|
eventIDToEvent[eventID] = types.StreamEvent{
|
||||||
HeaderedEvent: &ev,
|
HeaderedEvent: &ev,
|
||||||
|
|
@ -316,7 +318,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
|
||||||
func (s *outputRoomEventsStatements) InsertEvent(
|
func (s *outputRoomEventsStatements) InsertEvent(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
|
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
|
||||||
transactionID *api.TransactionID, excludeFromSync bool,
|
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility uint8,
|
||||||
) (streamPos types.StreamPosition, err error) {
|
) (streamPos types.StreamPosition, err error) {
|
||||||
var txnID *string
|
var txnID *string
|
||||||
var sessionID *int64
|
var sessionID *int64
|
||||||
|
|
@ -353,6 +355,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
||||||
sessionID,
|
sessionID,
|
||||||
txnID,
|
txnID,
|
||||||
excludeFromSync,
|
excludeFromSync,
|
||||||
|
historyVisibility,
|
||||||
).Scan(&streamPos)
|
).Scan(&streamPos)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -506,13 +509,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn
|
||||||
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
|
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
|
||||||
|
|
||||||
var eventAsString string
|
var eventAsString string
|
||||||
if err = row.Scan(&id, &eventAsString); err != nil {
|
var historyVisibility uint8
|
||||||
|
if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil {
|
||||||
return 0, evt, err
|
return 0, evt, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
|
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
|
||||||
return 0, evt, err
|
return 0, evt, err
|
||||||
}
|
}
|
||||||
|
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||||
return id, evt, nil
|
return id, evt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -534,15 +539,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
eventBytes []byte
|
eventBytes []byte
|
||||||
evt *gomatrixserverlib.HeaderedEvent
|
evt *gomatrixserverlib.HeaderedEvent
|
||||||
|
historyVisibility uint8
|
||||||
)
|
)
|
||||||
if err = rows.Scan(&eventBytes); err != nil {
|
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
|
||||||
return evts, err
|
return evts, err
|
||||||
}
|
}
|
||||||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||||
return evts, err
|
return evts, err
|
||||||
}
|
}
|
||||||
|
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||||
evts = append(evts, evt)
|
evts = append(evts, evt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -567,15 +574,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
eventBytes []byte
|
eventBytes []byte
|
||||||
evt *gomatrixserverlib.HeaderedEvent
|
evt *gomatrixserverlib.HeaderedEvent
|
||||||
|
historyVisibility uint8
|
||||||
)
|
)
|
||||||
if err = rows.Scan(&lastID, &eventBytes); err != nil {
|
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
|
||||||
return 0, evts, err
|
return 0, evts, err
|
||||||
}
|
}
|
||||||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||||
return 0, evts, err
|
return 0, evts, err
|
||||||
}
|
}
|
||||||
|
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||||
evts = append(evts, evt)
|
evts = append(evts, evt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -586,15 +595,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
||||||
var result []types.StreamEvent
|
var result []types.StreamEvent
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
eventID string
|
eventID string
|
||||||
streamPos types.StreamPosition
|
streamPos types.StreamPosition
|
||||||
eventBytes []byte
|
eventBytes []byte
|
||||||
excludeFromSync bool
|
excludeFromSync bool
|
||||||
sessionID *int64
|
sessionID *int64
|
||||||
txnID *string
|
txnID *string
|
||||||
transactionID *api.TransactionID
|
transactionID *api.TransactionID
|
||||||
|
historyVisibility uint8
|
||||||
)
|
)
|
||||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil {
|
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// TODO: Handle redacted events
|
// TODO: Handle redacted events
|
||||||
|
|
@ -609,7 +619,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
||||||
TransactionID: *txnID,
|
TransactionID: *txnID,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||||
result = append(result, types.StreamEvent{
|
result = append(result, types.StreamEvent{
|
||||||
HeaderedEvent: &ev,
|
HeaderedEvent: &ev,
|
||||||
StreamPosition: streamPos,
|
StreamPosition: streamPos,
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
|
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SyncServerDatasource represents a sync server datasource which manages
|
// SyncServerDatasource represents a sync server datasource which manages
|
||||||
|
|
@ -42,9 +43,8 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
||||||
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil {
|
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
accountData, err := NewPostgresAccountDataTable(d.db)
|
if _, err = d.db.Exec(outputRoomEventsSchema); err != nil {
|
||||||
if err != nil {
|
logrus.Fatalf("unable to create table: %s", err)
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
events, err := NewPostgresEventsTable(d.db)
|
events, err := NewPostgresEventsTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -105,6 +105,10 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
||||||
if err = m.RunDeltas(d.db, dbProperties); err != nil {
|
if err = m.RunDeltas(d.db, dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
accountData, err := NewPostgresAccountDataTable(d.db)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
d.Database = shared.Database{
|
d.Database = shared.Database{
|
||||||
DB: d.db,
|
DB: d.db,
|
||||||
Writer: d.writer,
|
Writer: d.writer,
|
||||||
|
|
|
||||||
|
|
@ -364,11 +364,12 @@ func (d *Database) WriteEvent(
|
||||||
addStateEvents []*gomatrixserverlib.HeaderedEvent,
|
addStateEvents []*gomatrixserverlib.HeaderedEvent,
|
||||||
addStateEventIDs, removeStateEventIDs []string,
|
addStateEventIDs, removeStateEventIDs []string,
|
||||||
transactionID *api.TransactionID, excludeFromSync bool,
|
transactionID *api.TransactionID, excludeFromSync bool,
|
||||||
|
historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||||
) (pduPosition types.StreamPosition, returnErr error) {
|
) (pduPosition types.StreamPosition, returnErr error) {
|
||||||
returnErr = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
returnErr = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
var err error
|
var err error
|
||||||
pos, err := d.OutputEvents.InsertEvent(
|
pos, err := d.OutputEvents.InsertEvent(
|
||||||
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
|
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, historyVisibility.NumericValue(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
|
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
|
||||||
|
|
|
||||||
|
|
@ -88,7 +88,7 @@ const selectEventsWithEventIDsSQL = "" +
|
||||||
// the rowsToStreamEvents expects there to be exactly six columns. We need to
|
// the rowsToStreamEvents expects there to be exactly six columns. We need to
|
||||||
// figure out if these really need to be in the DB, and if so, we need a
|
// figure out if these really need to be in the DB, and if so, we need a
|
||||||
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
||||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
|
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, 3 AS history_visibility" +
|
||||||
" FROM syncapi_current_room_state WHERE event_id IN ($1)"
|
" FROM syncapi_current_room_state WHERE event_id IN ($1)"
|
||||||
|
|
||||||
type currentRoomStateStatements struct {
|
type currentRoomStateStatements struct {
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
|
||||||
session_id BIGINT,
|
session_id BIGINT,
|
||||||
transaction_id TEXT,
|
transaction_id TEXT,
|
||||||
exclude_from_sync BOOL NOT NULL DEFAULT FALSE,
|
exclude_from_sync BOOL NOT NULL DEFAULT FALSE,
|
||||||
history_visibility SMALLINT NOT NULL DEFAULT 3 -- The history visibility before this event (0 - world_readable; 1 - shared; 2 - invited; 3 - joined)
|
history_visibility SMALLINT NOT NULL DEFAULT 2 -- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type);
|
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type);
|
||||||
|
|
@ -59,27 +59,27 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s
|
||||||
|
|
||||||
const insertEventSQL = "" +
|
const insertEventSQL = "" +
|
||||||
"INSERT INTO syncapi_output_room_events (" +
|
"INSERT INTO syncapi_output_room_events (" +
|
||||||
"id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
|
"id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
|
||||||
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
|
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) " +
|
||||||
"ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $13)"
|
"ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $14)"
|
||||||
|
|
||||||
const selectEventsSQL = "" +
|
const selectEventsSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id IN ($1)"
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id IN ($1)"
|
||||||
|
|
||||||
const selectRecentEventsSQL = "" +
|
const selectRecentEventsSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3"
|
" WHERE room_id = $1 AND id > $2 AND id <= $3"
|
||||||
|
|
||||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||||
|
|
||||||
const selectRecentEventsForSyncSQL = "" +
|
const selectRecentEventsForSyncSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE"
|
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE"
|
||||||
|
|
||||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||||
|
|
||||||
const selectEarlyEventsSQL = "" +
|
const selectEarlyEventsSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3"
|
" WHERE room_id = $1 AND id > $2 AND id <= $3"
|
||||||
|
|
||||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||||
|
|
@ -91,7 +91,7 @@ const updateEventJSONSQL = "" +
|
||||||
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
|
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
|
||||||
|
|
||||||
const selectStateInRangeSQL = "" +
|
const selectStateInRangeSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
|
||||||
" FROM syncapi_output_room_events" +
|
" FROM syncapi_output_room_events" +
|
||||||
" WHERE (id > $1 AND id <= $2)" +
|
" WHERE (id > $1 AND id <= $2)" +
|
||||||
" AND room_id IN ($3)" +
|
" AND room_id IN ($3)" +
|
||||||
|
|
@ -103,15 +103,15 @@ const deleteEventsForRoomSQL = "" +
|
||||||
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
||||||
|
|
||||||
const selectContextEventSQL = "" +
|
const selectContextEventSQL = "" +
|
||||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
||||||
|
|
||||||
const selectContextBeforeEventSQL = "" +
|
const selectContextBeforeEventSQL = "" +
|
||||||
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2"
|
"SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2"
|
||||||
|
|
||||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||||
|
|
||||||
const selectContextAfterEventSQL = "" +
|
const selectContextAfterEventSQL = "" +
|
||||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
|
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
|
||||||
|
|
||||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||||
|
|
||||||
|
|
@ -197,14 +197,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
eventID string
|
eventID string
|
||||||
streamPos types.StreamPosition
|
streamPos types.StreamPosition
|
||||||
eventBytes []byte
|
eventBytes []byte
|
||||||
excludeFromSync bool
|
excludeFromSync bool
|
||||||
addIDsJSON string
|
addIDsJSON string
|
||||||
delIDsJSON string
|
delIDsJSON string
|
||||||
|
historyVisibility uint8
|
||||||
)
|
)
|
||||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil {
|
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON, &historyVisibility); err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -240,6 +241,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
||||||
needSet[id] = true
|
needSet[id] = true
|
||||||
}
|
}
|
||||||
stateNeeded[ev.RoomID()] = needSet
|
stateNeeded[ev.RoomID()] = needSet
|
||||||
|
ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||||
|
|
||||||
eventIDToEvent[eventID] = types.StreamEvent{
|
eventIDToEvent[eventID] = types.StreamEvent{
|
||||||
HeaderedEvent: &ev,
|
HeaderedEvent: &ev,
|
||||||
|
|
@ -271,7 +273,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
|
||||||
func (s *outputRoomEventsStatements) InsertEvent(
|
func (s *outputRoomEventsStatements) InsertEvent(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
|
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
|
||||||
transactionID *api.TransactionID, excludeFromSync bool,
|
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility uint8,
|
||||||
) (types.StreamPosition, error) {
|
) (types.StreamPosition, error) {
|
||||||
var txnID *string
|
var txnID *string
|
||||||
var sessionID *int64
|
var sessionID *int64
|
||||||
|
|
@ -327,6 +329,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
||||||
sessionID,
|
sessionID,
|
||||||
txnID,
|
txnID,
|
||||||
excludeFromSync,
|
excludeFromSync,
|
||||||
|
historyVisibility,
|
||||||
excludeFromSync,
|
excludeFromSync,
|
||||||
)
|
)
|
||||||
return streamPos, err
|
return streamPos, err
|
||||||
|
|
@ -482,15 +485,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
||||||
var result []types.StreamEvent
|
var result []types.StreamEvent
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
eventID string
|
eventID string
|
||||||
streamPos types.StreamPosition
|
streamPos types.StreamPosition
|
||||||
eventBytes []byte
|
eventBytes []byte
|
||||||
excludeFromSync bool
|
excludeFromSync bool
|
||||||
sessionID *int64
|
sessionID *int64
|
||||||
txnID *string
|
txnID *string
|
||||||
transactionID *api.TransactionID
|
transactionID *api.TransactionID
|
||||||
|
historyVisibility uint8
|
||||||
)
|
)
|
||||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil {
|
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// TODO: Handle redacted events
|
// TODO: Handle redacted events
|
||||||
|
|
@ -506,6 +510,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ev.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||||
|
|
||||||
result = append(result, types.StreamEvent{
|
result = append(result, types.StreamEvent{
|
||||||
HeaderedEvent: &ev,
|
HeaderedEvent: &ev,
|
||||||
StreamPosition: streamPos,
|
StreamPosition: streamPos,
|
||||||
|
|
@ -520,13 +526,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(
|
||||||
) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
|
) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
|
||||||
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
|
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
|
||||||
var eventAsString string
|
var eventAsString string
|
||||||
if err = row.Scan(&id, &eventAsString); err != nil {
|
var historyVisibility uint8
|
||||||
|
if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil {
|
||||||
return 0, evt, err
|
return 0, evt, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
|
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
|
||||||
return 0, evt, err
|
return 0, evt, err
|
||||||
}
|
}
|
||||||
|
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||||
return id, evt, nil
|
return id, evt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -551,15 +559,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
eventBytes []byte
|
eventBytes []byte
|
||||||
evt *gomatrixserverlib.HeaderedEvent
|
evt *gomatrixserverlib.HeaderedEvent
|
||||||
|
historyVisibility uint8
|
||||||
)
|
)
|
||||||
if err = rows.Scan(&eventBytes); err != nil {
|
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
|
||||||
return evts, err
|
return evts, err
|
||||||
}
|
}
|
||||||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||||
return evts, err
|
return evts, err
|
||||||
}
|
}
|
||||||
|
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||||
evts = append(evts, evt)
|
evts = append(evts, evt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -587,15 +597,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
eventBytes []byte
|
eventBytes []byte
|
||||||
evt *gomatrixserverlib.HeaderedEvent
|
evt *gomatrixserverlib.HeaderedEvent
|
||||||
|
historyVisibility uint8
|
||||||
)
|
)
|
||||||
if err = rows.Scan(&lastID, &eventBytes); err != nil {
|
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
|
||||||
return 0, evts, err
|
return 0, evts, err
|
||||||
}
|
}
|
||||||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||||
return 0, evts, err
|
return 0, evts, err
|
||||||
}
|
}
|
||||||
|
evt.Visibility = gomatrixserverlib.HistoryVisibilityFromInt(historyVisibility)
|
||||||
evts = append(evts, evt)
|
evts = append(evts, evt)
|
||||||
}
|
}
|
||||||
return lastID, evts, rows.Err()
|
return lastID, evts, rows.Err()
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
|
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SyncServerDatasource represents a sync server datasource which manages
|
// SyncServerDatasource represents a sync server datasource which manages
|
||||||
|
|
@ -52,11 +53,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
||||||
if err = d.streamID.Prepare(d.db); err != nil {
|
if err = d.streamID.Prepare(d.db); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID)
|
if _, err = d.db.Exec(outputRoomEventsSchema); err != nil {
|
||||||
if err != nil {
|
logrus.Fatalf("unable to create table: %s", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
events, err := NewSqliteEventsTable(d.db, &d.streamID)
|
accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -115,6 +115,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
||||||
if err = m.RunDeltas(d.db, dbProperties); err != nil {
|
if err = m.RunDeltas(d.db, dbProperties); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
events, err := NewSqliteEventsTable(d.db, &d.streamID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
d.Database = shared.Database{
|
d.Database = shared.Database{
|
||||||
DB: d.db,
|
DB: d.db,
|
||||||
Writer: d.writer,
|
Writer: d.writer,
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserver
|
||||||
addStateEvents = append(addStateEvents, ev)
|
addStateEvents = append(addStateEvents, ev)
|
||||||
addStateEventIDs = append(addStateEventIDs, ev.EventID())
|
addStateEventIDs = append(addStateEventIDs, ev.EventID())
|
||||||
}
|
}
|
||||||
pos, err := db.WriteEvent(ctx, ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false)
|
pos, err := db.WriteEvent(ctx, ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false, gomatrixserverlib.HistoryVisibilityShared)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("WriteEvent failed: %s", err)
|
t.Fatalf("WriteEvent failed: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -52,7 +52,14 @@ type Peeks interface {
|
||||||
type Events interface {
|
type Events interface {
|
||||||
SelectStateInRange(ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter, roomIDs []string) (map[string]map[string]bool, map[string]types.StreamEvent, error)
|
SelectStateInRange(ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter, roomIDs []string) (map[string]map[string]bool, map[string]types.StreamEvent, error)
|
||||||
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||||
InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error)
|
InsertEvent(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
|
addState, removeState []string,
|
||||||
|
transactionID *api.TransactionID,
|
||||||
|
excludeFromSync bool,
|
||||||
|
historyVisibility uint8,
|
||||||
|
) (streamPos types.StreamPosition, err error)
|
||||||
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
|
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
|
||||||
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
|
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
|
||||||
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
|
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ func TestOutputRoomEventsTable(t *testing.T) {
|
||||||
events := room.Events()
|
events := room.Events()
|
||||||
err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error {
|
err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error {
|
||||||
for _, ev := range events {
|
for _, ev := range events {
|
||||||
_, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false)
|
_, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false, 2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to InsertEvent: %s", err)
|
return fmt.Errorf("failed to InsertEvent: %s", err)
|
||||||
}
|
}
|
||||||
|
|
@ -79,7 +79,7 @@ func TestOutputRoomEventsTable(t *testing.T) {
|
||||||
"body": "test.txt",
|
"body": "test.txt",
|
||||||
"url": "mxc://test.txt",
|
"url": "mxc://test.txt",
|
||||||
})
|
})
|
||||||
if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false); err != nil {
|
if _, err = tab.InsertEvent(ctx, txn, urlEv, nil, nil, nil, false, 2); err != nil {
|
||||||
return fmt.Errorf("failed to InsertEvent: %s", err)
|
return fmt.Errorf("failed to InsertEvent: %s", err)
|
||||||
}
|
}
|
||||||
wantEventID := []string{urlEv.EventID()}
|
wantEventID := []string{urlEv.EventID()}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue