Separate out INSERT/SELECT statements in place of RETURNING in SQLite

This commit is contained in:
Neil Alexander 2020-01-10 18:28:07 +00:00
parent 3852f5c714
commit 8bb8642560
10 changed files with 97 additions and 29 deletions

View file

@ -94,6 +94,7 @@ func processRoomEvent(
// Check that the event passes authentication checks and work out the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, event, input.AuthEventIDs)
if err != nil {
fmt.Println("failed checkAuthEvents:", err)
return
}
@ -104,6 +105,7 @@ func processRoomEvent(
)
// On error OR event with the transaction already processed/processesing
if err != nil || eventID != "" {
fmt.Println("failed GetTransactionEventID:", err)
return
}
}
@ -111,6 +113,7 @@ func processRoomEvent(
// Store the event
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
if err != nil {
fmt.Println("failed StoreEvent:", err)
return
}
@ -118,6 +121,7 @@ func processRoomEvent(
// For outliers we can stop after we've stored the event itself as it
// doesn't have any associated state to store and we don't need to
// notify anyone about it.
fmt.Println("kind is outlier")
return event.EventID(), nil
}
@ -126,6 +130,7 @@ func processRoomEvent(
// Lets calculate one.
err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event)
if err != nil {
fmt.Println("failed to calculate and set state")
return
}
}
@ -155,15 +160,18 @@ func calculateAndSetState(
// Check that those state events are in the database and store the state.
var entries []types.StateEntry
if entries, err = db.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
fmt.Println("failed StateEntriesForEventIDs")
return err
}
if stateAtEvent.BeforeStateSnapshotNID, err = db.AddState(ctx, roomNID, nil, entries); err != nil {
fmt.Println("failed AddState")
return err
}
} else {
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
if stateAtEvent.BeforeStateSnapshotNID, err = state.CalculateAndStoreStateBeforeEvent(ctx, db, event, roomNID); err != nil {
fmt.Println("Failed CalculateAndStoreStateBeforeEvent")
return err
}
}

View file

@ -558,6 +558,7 @@ func CalculateAndStoreStateBeforeEvent(
prevStates, err := db.StateAtEventIDs(ctx, prevEventIDs)
if err != nil {
fmt.Println("Failed stateAtEventIDs", err)
return 0, err
}
@ -579,6 +580,7 @@ func CalculateAndStoreStateAfterEvents(
// 2) There weren't any prev_events for this event so the state is
// empty.
metrics.algorithm = "empty_state"
fmt.Println("there were't any prev_events!")
return metrics.stop(db.AddState(ctx, roomNID, nil, nil))
}
@ -590,6 +592,7 @@ func CalculateAndStoreStateAfterEvents(
// as the previous events.
// This should be the common case.
metrics.algorithm = "no_change"
fmt.Println("none of the previous events were state events")
return metrics.stop(prevState.BeforeStateSnapshotNID, nil)
}
// The previous event was a state event so we need to store a copy
@ -599,6 +602,7 @@ func CalculateAndStoreStateAfterEvents(
)
if err != nil {
metrics.algorithm = "_load_state_blocks"
fmt.Println("failed StateBlockNIDs", err)
return metrics.stop(0, err)
}
stateBlockNIDs := stateBlockNIDLists[0].StateBlockNIDs
@ -639,6 +643,7 @@ func calculateAndStoreStateAfterManyEvents(
calculateStateAfterManyEvents(ctx, db, prevStates)
metrics.algorithm = algorithm
if err != nil {
fmt.Println("failed calculateStateAfterManyEvents", err)
return metrics.stop(0, err)
}
@ -658,6 +663,7 @@ func calculateStateAfterManyEvents(
combined, err = LoadCombinedStateAfterEvents(ctx, db, prevStates)
if err != nil {
algorithm = "_load_combined_state"
fmt.Println("failed LoadCombinedStateAfterEvents")
return
}
@ -688,6 +694,7 @@ func calculateStateAfterManyEvents(
resolved, err = resolveConflicts(ctx, db, notConflicted, conflicts)
if err != nil {
algorithm = "_resolve_conflicts"
fmt.Println("failed resolveConflicts", err)
return
}
algorithm = "full_state_with_conflicts"

View file

@ -38,6 +38,9 @@ const eventStateKeysSchema = `
const insertEventStateKeyNIDSQL = `
INSERT INTO roomserver_event_state_keys (event_state_key) VALUES ($1)
ON CONFLICT DO NOTHING;
`
const insertEventStateKeyNIDResultSQL = `
SELECT event_state_key_nid FROM roomserver_event_state_keys
WHERE rowid = last_insert_rowid();
`
@ -63,6 +66,7 @@ const bulkSelectEventStateKeySQL = `
type eventStateKeyStatements struct {
insertEventStateKeyNIDStmt *sql.Stmt
insertEventStateKeyNIDResultStmt *sql.Stmt
selectEventStateKeyNIDStmt *sql.Stmt
bulkSelectEventStateKeyNIDStmt *sql.Stmt
bulkSelectEventStateKeyStmt *sql.Stmt
@ -75,6 +79,7 @@ func (s *eventStateKeyStatements) prepare(db *sql.DB) (err error) {
}
return statementList{
{&s.insertEventStateKeyNIDStmt, insertEventStateKeyNIDSQL},
{&s.insertEventStateKeyNIDResultStmt, insertEventStateKeyNIDResultSQL},
{&s.selectEventStateKeyNIDStmt, selectEventStateKeyNIDSQL},
{&s.bulkSelectEventStateKeyNIDStmt, bulkSelectEventStateKeyNIDSQL},
{&s.bulkSelectEventStateKeyStmt, bulkSelectEventStateKeySQL},
@ -85,8 +90,12 @@ func (s *eventStateKeyStatements) insertEventStateKeyNID(
ctx context.Context, txn *sql.Tx, eventStateKey string,
) (types.EventStateKeyNID, error) {
var eventStateKeyNID int64
stmt := common.TxStmt(txn, s.insertEventStateKeyNIDStmt)
err := stmt.QueryRowContext(ctx, eventStateKey).Scan(&eventStateKeyNID)
var err error
insertStmt := common.TxStmt(txn, s.insertEventStateKeyNIDStmt)
selectStmt := common.TxStmt(txn, s.insertEventStateKeyNIDResultStmt)
if _, err = insertStmt.ExecContext(ctx, eventStateKey); err == nil {
err = selectStmt.QueryRowContext(ctx).Scan(&eventStateKeyNID)
}
return types.EventStateKeyNID(eventStateKeyNID), err
}
@ -109,7 +118,6 @@ func (s *eventStateKeyStatements) bulkSelectEventStateKeyNID(
return nil, err
}
defer rows.Close() // nolint: errcheck
result := make(map[string]types.EventStateKeyNID, len(eventStateKeys))
for rows.Next() {
var stateKey string
@ -134,7 +142,6 @@ func (s *eventStateKeyStatements) bulkSelectEventStateKey(
return nil, err
}
defer rows.Close() // nolint: errcheck
result := make(map[types.EventStateKeyNID]string, len(eventStateKeyNIDs))
for rows.Next() {
var stateKey string

View file

@ -54,6 +54,9 @@ const eventTypesSchema = `
const insertEventTypeNIDSQL = `
INSERT INTO roomserver_event_types (event_type) VALUES ($1)
ON CONFLICT DO NOTHING;
`
const insertEventTypeNIDResultSQL = `
SELECT event_type_nid FROM roomserver_event_types
WHERE rowid = last_insert_rowid();
`
@ -71,6 +74,7 @@ const bulkSelectEventTypeNIDSQL = `
type eventTypeStatements struct {
insertEventTypeNIDStmt *sql.Stmt
insertEventTypeNIDResultStmt *sql.Stmt
selectEventTypeNIDStmt *sql.Stmt
bulkSelectEventTypeNIDStmt *sql.Stmt
}
@ -83,6 +87,7 @@ func (s *eventTypeStatements) prepare(db *sql.DB) (err error) {
return statementList{
{&s.insertEventTypeNIDStmt, insertEventTypeNIDSQL},
{&s.insertEventTypeNIDResultStmt, insertEventTypeNIDResultSQL},
{&s.selectEventTypeNIDStmt, selectEventTypeNIDSQL},
{&s.bulkSelectEventTypeNIDStmt, bulkSelectEventTypeNIDSQL},
}.prepare(db)
@ -92,7 +97,10 @@ func (s *eventTypeStatements) insertEventTypeNID(
ctx context.Context, eventType string,
) (types.EventTypeNID, error) {
var eventTypeNID int64
err := s.insertEventTypeNIDStmt.QueryRowContext(ctx, eventType).Scan(&eventTypeNID)
var err error
if _, err = s.insertEventTypeNIDStmt.ExecContext(ctx, eventType); err == nil {
err = s.insertEventTypeNIDResultStmt.QueryRowContext(ctx).Scan(&eventTypeNID)
}
return types.EventTypeNID(eventTypeNID), err
}

View file

@ -37,7 +37,7 @@ const eventsSchema = `
depth INTEGER NOT NULL,
event_id TEXT NOT NULL UNIQUE,
reference_sha256 BLOB NOT NULL,
auth_event_nids TEXT NOT NULL
auth_event_nids TEXT NOT NULL DEFAULT '{}'
);
`
@ -45,6 +45,9 @@ const insertEventSQL = `
INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT DO NOTHING;
`
const insertEventResultSQL = `
SELECT event_nid, state_snapshot_nid FROM roomserver_events
WHERE rowid = last_insert_rowid();
`
@ -94,6 +97,7 @@ const selectMaxEventDepthSQL = "" +
type eventStatements struct {
insertEventStmt *sql.Stmt
insertEventResultStmt *sql.Stmt
selectEventStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt
@ -116,6 +120,7 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) {
return statementList{
{&s.insertEventStmt, insertEventSQL},
{&s.insertEventResultStmt, insertEventResultSQL},
{&s.selectEventStmt, selectEventSQL},
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
@ -143,10 +148,13 @@ func (s *eventStatements) insertEvent(
) (types.EventNID, types.StateSnapshotNID, error) {
var eventNID int64
var stateNID int64
err := s.insertEventStmt.QueryRowContext(
var err error
if _, err = s.insertEventStmt.ExecContext(
ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID),
eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth,
).Scan(&eventNID, &stateNID)
); err == nil {
err = s.insertEventResultStmt.QueryRowContext(ctx).Scan(&eventNID, &stateNID)
}
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
}

View file

@ -0,0 +1,18 @@
package sqlite3
import (
"strconv"
"strings"
"github.com/lib/pq"
)
type SqliteList string
func sqliteIn(a pq.Int64Array) string {
var b []string
for _, n := range a {
b = append(b, strconv.FormatInt(n, 10))
}
return strings.Join(b, ",")
}

View file

@ -28,7 +28,7 @@ const roomsSchema = `
CREATE TABLE IF NOT EXISTS roomserver_rooms (
room_nid INTEGER PRIMARY KEY AUTOINCREMENT,
room_id TEXT NOT NULL UNIQUE,
latest_event_nids TEXT NOT NULL DEFAULT '',
latest_event_nids TEXT NOT NULL DEFAULT '{}',
last_event_sent_nid INTEGER NOT NULL DEFAULT 0,
state_snapshot_nid INTEGER NOT NULL DEFAULT 0
);
@ -38,6 +38,9 @@ const roomsSchema = `
const insertRoomNIDSQL = `
INSERT INTO roomserver_rooms (room_id) VALUES ($1)
ON CONFLICT DO NOTHING;
`
const insertRoomNIDResultSQL = `
SELECT room_nid FROM roomserver_rooms
WHERE rowid = last_insert_rowid();
`
@ -56,6 +59,7 @@ const updateLatestEventNIDsSQL = "" +
type roomStatements struct {
insertRoomNIDStmt *sql.Stmt
insertRoomNIDResultStmt *sql.Stmt
selectRoomNIDStmt *sql.Stmt
selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt
@ -69,6 +73,7 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) {
}
return statementList{
{&s.insertRoomNIDStmt, insertRoomNIDSQL},
{&s.insertRoomNIDResultStmt, insertRoomNIDResultSQL},
{&s.selectRoomNIDStmt, selectRoomNIDSQL},
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
@ -80,8 +85,12 @@ func (s *roomStatements) insertRoomNID(
ctx context.Context, txn *sql.Tx, roomID string,
) (types.RoomNID, error) {
var roomNID int64
stmt := common.TxStmt(txn, s.insertRoomNIDStmt)
err := stmt.QueryRowContext(ctx, roomID).Scan(&roomNID)
var err error
insertStmt := common.TxStmt(txn, s.insertRoomNIDStmt)
resultStmt := common.TxStmt(txn, s.insertRoomNIDResultStmt)
if _, err = insertStmt.ExecContext(ctx, roomID); err == nil {
err = resultStmt.QueryRowContext(ctx).Scan(&roomNID)
}
return types.RoomNID(roomNID), err
}

View file

@ -28,13 +28,16 @@ const stateSnapshotSchema = `
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
state_snapshot_nid INTEGER PRIMARY KEY AUTOINCREMENT,
room_nid INTEGER NOT NULL,
state_block_nids TEXT NOT NULL
state_block_nids TEXT NOT NULL DEFAULT '{}'
);
`
const insertStateSQL = `
INSERT INTO roomserver_state_snapshots (room_nid, state_block_nids)
VALUES ($1, $2);
`
const insertStateResultSQL = `
SELECT state_snapshot_nid FROM roomserver_state_snapshots
WHERE rowid = last_insert_rowid();
`
@ -48,6 +51,7 @@ const bulkSelectStateBlockNIDsSQL = "" +
type stateSnapshotStatements struct {
insertStateStmt *sql.Stmt
insertStateResultStmt *sql.Stmt
bulkSelectStateBlockNIDsStmt *sql.Stmt
}
@ -59,6 +63,7 @@ func (s *stateSnapshotStatements) prepare(db *sql.DB) (err error) {
return statementList{
{&s.insertStateStmt, insertStateSQL},
{&s.insertStateResultStmt, insertStateResultSQL},
{&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL},
}.prepare(db)
}
@ -70,7 +75,9 @@ func (s *stateSnapshotStatements) insertState(
for i := range stateBlockNIDs {
nids[i] = int64(stateBlockNIDs[i])
}
err = s.insertStateStmt.QueryRowContext(ctx, int64(roomNID), pq.Int64Array(nids)).Scan(&stateNID)
if _, err = s.insertStateStmt.ExecContext(ctx, int64(roomNID), pq.Int64Array(nids)); err == nil {
err = s.insertStateResultStmt.QueryRowContext(ctx).Scan(&stateNID)
}
return
}

View file

@ -18,6 +18,7 @@ package sqlite3
import (
"context"
"database/sql"
"fmt"
"net/url"
"github.com/matrix-org/dendrite/roomserver/api"
@ -244,9 +245,11 @@ func (d *Database) AddState(
if len(state) > 0 {
stateBlockNID, err := d.statements.selectNextStateBlockNID(ctx)
if err != nil {
fmt.Println("d.statements.selectNextStateBlockNID", err)
return 0, err
}
if err = d.statements.bulkInsertStateData(ctx, stateBlockNID, state); err != nil {
fmt.Println("d.statements.bulkInsertStateData", err)
return 0, err
}
stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID)

View file

@ -19,6 +19,7 @@ import (
"net/url"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage/postgres"
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3"
"github.com/matrix-org/dendrite/roomserver/types"
@ -26,19 +27,12 @@ import (
)
type Database interface {
state.RoomStateDatabase
StoreEvent(ctx context.Context, event gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error)
StateEntriesForEventIDs(ctx context.Context, eventIDs []string) ([]types.StateEntry, error)
EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error)
EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error)
EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error)
EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error)
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
SetState(ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID) error
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
StateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
StateEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error)
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
EventIDs(ctx context.Context, eventNIDs []types.EventNID) (map[types.EventNID]string, error)
GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, error)
GetTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (string, error)
@ -50,7 +44,6 @@ type Database interface {
GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error)
GetCreatorIDForAlias(ctx context.Context, alias string) (string, error)
RemoveRoomAlias(ctx context.Context, alias string) error
StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
MembershipUpdater(ctx context.Context, roomID, targetUserID string) (types.MembershipUpdater, error)
GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error)
GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool) ([]types.EventNID, error)