From 8bb864256089aba4984d510d2838988196841ba9 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 10 Jan 2020 18:28:07 +0000 Subject: [PATCH] Separate out INSERT/SELECT statements in place of RETURNING in SQLite --- roomserver/input/events.go | 8 +++++++ roomserver/state/state.go | 7 ++++++ .../storage/sqlite3/event_state_keys_table.go | 23 ++++++++++++------- .../storage/sqlite3/event_types_table.go | 16 +++++++++---- roomserver/storage/sqlite3/events_table.go | 14 ++++++++--- roomserver/storage/sqlite3/list.go | 18 +++++++++++++++ roomserver/storage/sqlite3/rooms_table.go | 15 +++++++++--- .../storage/sqlite3/state_snapshot_table.go | 11 +++++++-- roomserver/storage/sqlite3/storage.go | 3 +++ roomserver/storage/storage.go | 11 ++------- 10 files changed, 97 insertions(+), 29 deletions(-) create mode 100644 roomserver/storage/sqlite3/list.go diff --git a/roomserver/input/events.go b/roomserver/input/events.go index b30c39928..8038aee94 100644 --- a/roomserver/input/events.go +++ b/roomserver/input/events.go @@ -94,6 +94,7 @@ func processRoomEvent( // Check that the event passes authentication checks and work out the numeric IDs for the auth events. authEventNIDs, err := checkAuthEvents(ctx, db, event, input.AuthEventIDs) if err != nil { + fmt.Println("failed checkAuthEvents:", err) return } @@ -104,6 +105,7 @@ func processRoomEvent( ) // On error OR event with the transaction already processed/processesing if err != nil || eventID != "" { + fmt.Println("failed GetTransactionEventID:", err) return } } @@ -111,6 +113,7 @@ func processRoomEvent( // Store the event roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) if err != nil { + fmt.Println("failed StoreEvent:", err) return } @@ -118,6 +121,7 @@ func processRoomEvent( // For outliers we can stop after we've stored the event itself as it // doesn't have any associated state to store and we don't need to // notify anyone about it. + fmt.Println("kind is outlier") return event.EventID(), nil } @@ -126,6 +130,7 @@ func processRoomEvent( // Lets calculate one. err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event) if err != nil { + fmt.Println("failed to calculate and set state") return } } @@ -155,15 +160,18 @@ func calculateAndSetState( // Check that those state events are in the database and store the state. var entries []types.StateEntry if entries, err = db.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil { + fmt.Println("failed StateEntriesForEventIDs") return err } if stateAtEvent.BeforeStateSnapshotNID, err = db.AddState(ctx, roomNID, nil, entries); err != nil { + fmt.Println("failed AddState") return err } } else { // We haven't been told what the state at the event is so we need to calculate it from the prev_events if stateAtEvent.BeforeStateSnapshotNID, err = state.CalculateAndStoreStateBeforeEvent(ctx, db, event, roomNID); err != nil { + fmt.Println("Failed CalculateAndStoreStateBeforeEvent") return err } } diff --git a/roomserver/state/state.go b/roomserver/state/state.go index 2a0b7f574..905946ca3 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -558,6 +558,7 @@ func CalculateAndStoreStateBeforeEvent( prevStates, err := db.StateAtEventIDs(ctx, prevEventIDs) if err != nil { + fmt.Println("Failed stateAtEventIDs", err) return 0, err } @@ -579,6 +580,7 @@ func CalculateAndStoreStateAfterEvents( // 2) There weren't any prev_events for this event so the state is // empty. metrics.algorithm = "empty_state" + fmt.Println("there were't any prev_events!") return metrics.stop(db.AddState(ctx, roomNID, nil, nil)) } @@ -590,6 +592,7 @@ func CalculateAndStoreStateAfterEvents( // as the previous events. // This should be the common case. metrics.algorithm = "no_change" + fmt.Println("none of the previous events were state events") return metrics.stop(prevState.BeforeStateSnapshotNID, nil) } // The previous event was a state event so we need to store a copy @@ -599,6 +602,7 @@ func CalculateAndStoreStateAfterEvents( ) if err != nil { metrics.algorithm = "_load_state_blocks" + fmt.Println("failed StateBlockNIDs", err) return metrics.stop(0, err) } stateBlockNIDs := stateBlockNIDLists[0].StateBlockNIDs @@ -639,6 +643,7 @@ func calculateAndStoreStateAfterManyEvents( calculateStateAfterManyEvents(ctx, db, prevStates) metrics.algorithm = algorithm if err != nil { + fmt.Println("failed calculateStateAfterManyEvents", err) return metrics.stop(0, err) } @@ -658,6 +663,7 @@ func calculateStateAfterManyEvents( combined, err = LoadCombinedStateAfterEvents(ctx, db, prevStates) if err != nil { algorithm = "_load_combined_state" + fmt.Println("failed LoadCombinedStateAfterEvents") return } @@ -688,6 +694,7 @@ func calculateStateAfterManyEvents( resolved, err = resolveConflicts(ctx, db, notConflicted, conflicts) if err != nil { algorithm = "_resolve_conflicts" + fmt.Println("failed resolveConflicts", err) return } algorithm = "full_state_with_conflicts" diff --git a/roomserver/storage/sqlite3/event_state_keys_table.go b/roomserver/storage/sqlite3/event_state_keys_table.go index 1f199a0af..0d4756b9c 100644 --- a/roomserver/storage/sqlite3/event_state_keys_table.go +++ b/roomserver/storage/sqlite3/event_state_keys_table.go @@ -38,6 +38,9 @@ const eventStateKeysSchema = ` const insertEventStateKeyNIDSQL = ` INSERT INTO roomserver_event_state_keys (event_state_key) VALUES ($1) ON CONFLICT DO NOTHING; +` + +const insertEventStateKeyNIDResultSQL = ` SELECT event_state_key_nid FROM roomserver_event_state_keys WHERE rowid = last_insert_rowid(); ` @@ -62,10 +65,11 @@ const bulkSelectEventStateKeySQL = ` ` type eventStateKeyStatements struct { - insertEventStateKeyNIDStmt *sql.Stmt - selectEventStateKeyNIDStmt *sql.Stmt - bulkSelectEventStateKeyNIDStmt *sql.Stmt - bulkSelectEventStateKeyStmt *sql.Stmt + insertEventStateKeyNIDStmt *sql.Stmt + insertEventStateKeyNIDResultStmt *sql.Stmt + selectEventStateKeyNIDStmt *sql.Stmt + bulkSelectEventStateKeyNIDStmt *sql.Stmt + bulkSelectEventStateKeyStmt *sql.Stmt } func (s *eventStateKeyStatements) prepare(db *sql.DB) (err error) { @@ -75,6 +79,7 @@ func (s *eventStateKeyStatements) prepare(db *sql.DB) (err error) { } return statementList{ {&s.insertEventStateKeyNIDStmt, insertEventStateKeyNIDSQL}, + {&s.insertEventStateKeyNIDResultStmt, insertEventStateKeyNIDResultSQL}, {&s.selectEventStateKeyNIDStmt, selectEventStateKeyNIDSQL}, {&s.bulkSelectEventStateKeyNIDStmt, bulkSelectEventStateKeyNIDSQL}, {&s.bulkSelectEventStateKeyStmt, bulkSelectEventStateKeySQL}, @@ -85,8 +90,12 @@ func (s *eventStateKeyStatements) insertEventStateKeyNID( ctx context.Context, txn *sql.Tx, eventStateKey string, ) (types.EventStateKeyNID, error) { var eventStateKeyNID int64 - stmt := common.TxStmt(txn, s.insertEventStateKeyNIDStmt) - err := stmt.QueryRowContext(ctx, eventStateKey).Scan(&eventStateKeyNID) + var err error + insertStmt := common.TxStmt(txn, s.insertEventStateKeyNIDStmt) + selectStmt := common.TxStmt(txn, s.insertEventStateKeyNIDResultStmt) + if _, err = insertStmt.ExecContext(ctx, eventStateKey); err == nil { + err = selectStmt.QueryRowContext(ctx).Scan(&eventStateKeyNID) + } return types.EventStateKeyNID(eventStateKeyNID), err } @@ -109,7 +118,6 @@ func (s *eventStateKeyStatements) bulkSelectEventStateKeyNID( return nil, err } defer rows.Close() // nolint: errcheck - result := make(map[string]types.EventStateKeyNID, len(eventStateKeys)) for rows.Next() { var stateKey string @@ -134,7 +142,6 @@ func (s *eventStateKeyStatements) bulkSelectEventStateKey( return nil, err } defer rows.Close() // nolint: errcheck - result := make(map[types.EventStateKeyNID]string, len(eventStateKeyNIDs)) for rows.Next() { var stateKey string diff --git a/roomserver/storage/sqlite3/event_types_table.go b/roomserver/storage/sqlite3/event_types_table.go index da91a115b..6b9e35611 100644 --- a/roomserver/storage/sqlite3/event_types_table.go +++ b/roomserver/storage/sqlite3/event_types_table.go @@ -54,6 +54,9 @@ const eventTypesSchema = ` const insertEventTypeNIDSQL = ` INSERT INTO roomserver_event_types (event_type) VALUES ($1) ON CONFLICT DO NOTHING; +` + +const insertEventTypeNIDResultSQL = ` SELECT event_type_nid FROM roomserver_event_types WHERE rowid = last_insert_rowid(); ` @@ -70,9 +73,10 @@ const bulkSelectEventTypeNIDSQL = ` ` type eventTypeStatements struct { - insertEventTypeNIDStmt *sql.Stmt - selectEventTypeNIDStmt *sql.Stmt - bulkSelectEventTypeNIDStmt *sql.Stmt + insertEventTypeNIDStmt *sql.Stmt + insertEventTypeNIDResultStmt *sql.Stmt + selectEventTypeNIDStmt *sql.Stmt + bulkSelectEventTypeNIDStmt *sql.Stmt } func (s *eventTypeStatements) prepare(db *sql.DB) (err error) { @@ -83,6 +87,7 @@ func (s *eventTypeStatements) prepare(db *sql.DB) (err error) { return statementList{ {&s.insertEventTypeNIDStmt, insertEventTypeNIDSQL}, + {&s.insertEventTypeNIDResultStmt, insertEventTypeNIDResultSQL}, {&s.selectEventTypeNIDStmt, selectEventTypeNIDSQL}, {&s.bulkSelectEventTypeNIDStmt, bulkSelectEventTypeNIDSQL}, }.prepare(db) @@ -92,7 +97,10 @@ func (s *eventTypeStatements) insertEventTypeNID( ctx context.Context, eventType string, ) (types.EventTypeNID, error) { var eventTypeNID int64 - err := s.insertEventTypeNIDStmt.QueryRowContext(ctx, eventType).Scan(&eventTypeNID) + var err error + if _, err = s.insertEventTypeNIDStmt.ExecContext(ctx, eventType); err == nil { + err = s.insertEventTypeNIDResultStmt.QueryRowContext(ctx).Scan(&eventTypeNID) + } return types.EventTypeNID(eventTypeNID), err } diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index 990c7b558..c908a4589 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -37,7 +37,7 @@ const eventsSchema = ` depth INTEGER NOT NULL, event_id TEXT NOT NULL UNIQUE, reference_sha256 BLOB NOT NULL, - auth_event_nids TEXT NOT NULL + auth_event_nids TEXT NOT NULL DEFAULT '{}' ); ` @@ -45,6 +45,9 @@ const insertEventSQL = ` INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING; +` + +const insertEventResultSQL = ` SELECT event_nid, state_snapshot_nid FROM roomserver_events WHERE rowid = last_insert_rowid(); ` @@ -94,6 +97,7 @@ const selectMaxEventDepthSQL = "" + type eventStatements struct { insertEventStmt *sql.Stmt + insertEventResultStmt *sql.Stmt selectEventStmt *sql.Stmt bulkSelectStateEventByIDStmt *sql.Stmt bulkSelectStateAtEventByIDStmt *sql.Stmt @@ -116,6 +120,7 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) { return statementList{ {&s.insertEventStmt, insertEventSQL}, + {&s.insertEventResultStmt, insertEventResultSQL}, {&s.selectEventStmt, selectEventSQL}, {&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL}, {&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL}, @@ -143,10 +148,13 @@ func (s *eventStatements) insertEvent( ) (types.EventNID, types.StateSnapshotNID, error) { var eventNID int64 var stateNID int64 - err := s.insertEventStmt.QueryRowContext( + var err error + if _, err = s.insertEventStmt.ExecContext( ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID), eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, - ).Scan(&eventNID, &stateNID) + ); err == nil { + err = s.insertEventResultStmt.QueryRowContext(ctx).Scan(&eventNID, &stateNID) + } return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err } diff --git a/roomserver/storage/sqlite3/list.go b/roomserver/storage/sqlite3/list.go new file mode 100644 index 000000000..4fe4e334b --- /dev/null +++ b/roomserver/storage/sqlite3/list.go @@ -0,0 +1,18 @@ +package sqlite3 + +import ( + "strconv" + "strings" + + "github.com/lib/pq" +) + +type SqliteList string + +func sqliteIn(a pq.Int64Array) string { + var b []string + for _, n := range a { + b = append(b, strconv.FormatInt(n, 10)) + } + return strings.Join(b, ",") +} diff --git a/roomserver/storage/sqlite3/rooms_table.go b/roomserver/storage/sqlite3/rooms_table.go index 5d1fe6810..63ccdcdd0 100644 --- a/roomserver/storage/sqlite3/rooms_table.go +++ b/roomserver/storage/sqlite3/rooms_table.go @@ -28,7 +28,7 @@ const roomsSchema = ` CREATE TABLE IF NOT EXISTS roomserver_rooms ( room_nid INTEGER PRIMARY KEY AUTOINCREMENT, room_id TEXT NOT NULL UNIQUE, - latest_event_nids TEXT NOT NULL DEFAULT '', + latest_event_nids TEXT NOT NULL DEFAULT '{}', last_event_sent_nid INTEGER NOT NULL DEFAULT 0, state_snapshot_nid INTEGER NOT NULL DEFAULT 0 ); @@ -38,6 +38,9 @@ const roomsSchema = ` const insertRoomNIDSQL = ` INSERT INTO roomserver_rooms (room_id) VALUES ($1) ON CONFLICT DO NOTHING; +` + +const insertRoomNIDResultSQL = ` SELECT room_nid FROM roomserver_rooms WHERE rowid = last_insert_rowid(); ` @@ -56,6 +59,7 @@ const updateLatestEventNIDsSQL = "" + type roomStatements struct { insertRoomNIDStmt *sql.Stmt + insertRoomNIDResultStmt *sql.Stmt selectRoomNIDStmt *sql.Stmt selectLatestEventNIDsStmt *sql.Stmt selectLatestEventNIDsForUpdateStmt *sql.Stmt @@ -69,6 +73,7 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) { } return statementList{ {&s.insertRoomNIDStmt, insertRoomNIDSQL}, + {&s.insertRoomNIDResultStmt, insertRoomNIDResultSQL}, {&s.selectRoomNIDStmt, selectRoomNIDSQL}, {&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL}, {&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL}, @@ -80,8 +85,12 @@ func (s *roomStatements) insertRoomNID( ctx context.Context, txn *sql.Tx, roomID string, ) (types.RoomNID, error) { var roomNID int64 - stmt := common.TxStmt(txn, s.insertRoomNIDStmt) - err := stmt.QueryRowContext(ctx, roomID).Scan(&roomNID) + var err error + insertStmt := common.TxStmt(txn, s.insertRoomNIDStmt) + resultStmt := common.TxStmt(txn, s.insertRoomNIDResultStmt) + if _, err = insertStmt.ExecContext(ctx, roomID); err == nil { + err = resultStmt.QueryRowContext(ctx).Scan(&roomNID) + } return types.RoomNID(roomNID), err } diff --git a/roomserver/storage/sqlite3/state_snapshot_table.go b/roomserver/storage/sqlite3/state_snapshot_table.go index c69d772d0..a6467d115 100644 --- a/roomserver/storage/sqlite3/state_snapshot_table.go +++ b/roomserver/storage/sqlite3/state_snapshot_table.go @@ -28,13 +28,16 @@ const stateSnapshotSchema = ` CREATE TABLE IF NOT EXISTS roomserver_state_snapshots ( state_snapshot_nid INTEGER PRIMARY KEY AUTOINCREMENT, room_nid INTEGER NOT NULL, - state_block_nids TEXT NOT NULL + state_block_nids TEXT NOT NULL DEFAULT '{}' ); ` const insertStateSQL = ` INSERT INTO roomserver_state_snapshots (room_nid, state_block_nids) VALUES ($1, $2); +` + +const insertStateResultSQL = ` SELECT state_snapshot_nid FROM roomserver_state_snapshots WHERE rowid = last_insert_rowid(); ` @@ -48,6 +51,7 @@ const bulkSelectStateBlockNIDsSQL = "" + type stateSnapshotStatements struct { insertStateStmt *sql.Stmt + insertStateResultStmt *sql.Stmt bulkSelectStateBlockNIDsStmt *sql.Stmt } @@ -59,6 +63,7 @@ func (s *stateSnapshotStatements) prepare(db *sql.DB) (err error) { return statementList{ {&s.insertStateStmt, insertStateSQL}, + {&s.insertStateResultStmt, insertStateResultSQL}, {&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL}, }.prepare(db) } @@ -70,7 +75,9 @@ func (s *stateSnapshotStatements) insertState( for i := range stateBlockNIDs { nids[i] = int64(stateBlockNIDs[i]) } - err = s.insertStateStmt.QueryRowContext(ctx, int64(roomNID), pq.Int64Array(nids)).Scan(&stateNID) + if _, err = s.insertStateStmt.ExecContext(ctx, int64(roomNID), pq.Int64Array(nids)); err == nil { + err = s.insertStateResultStmt.QueryRowContext(ctx).Scan(&stateNID) + } return } diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 5bff7e94f..9cc33c8fb 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -18,6 +18,7 @@ package sqlite3 import ( "context" "database/sql" + "fmt" "net/url" "github.com/matrix-org/dendrite/roomserver/api" @@ -244,9 +245,11 @@ func (d *Database) AddState( if len(state) > 0 { stateBlockNID, err := d.statements.selectNextStateBlockNID(ctx) if err != nil { + fmt.Println("d.statements.selectNextStateBlockNID", err) return 0, err } if err = d.statements.bulkInsertStateData(ctx, stateBlockNID, state); err != nil { + fmt.Println("d.statements.bulkInsertStateData", err) return 0, err } stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID) diff --git a/roomserver/storage/storage.go b/roomserver/storage/storage.go index 9c0c832d0..1516e2ad7 100644 --- a/roomserver/storage/storage.go +++ b/roomserver/storage/storage.go @@ -19,6 +19,7 @@ import ( "net/url" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/storage/postgres" "github.com/matrix-org/dendrite/roomserver/storage/sqlite3" "github.com/matrix-org/dendrite/roomserver/types" @@ -26,19 +27,12 @@ import ( ) type Database interface { + state.RoomStateDatabase StoreEvent(ctx context.Context, event gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error) StateEntriesForEventIDs(ctx context.Context, eventIDs []string) ([]types.StateEntry, error) - EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error) - EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error) EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error) EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error) - Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error) - AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error) SetState(ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID) error - StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error) - StateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error) - StateEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error) - SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error) EventIDs(ctx context.Context, eventNIDs []types.EventNID) (map[types.EventNID]string, error) GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, error) GetTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (string, error) @@ -50,7 +44,6 @@ type Database interface { GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error) GetCreatorIDForAlias(ctx context.Context, alias string) (string, error) RemoveRoomAlias(ctx context.Context, alias string) error - StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error) MembershipUpdater(ctx context.Context, roomID, targetUserID string) (types.MembershipUpdater, error) GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error) GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool) ([]types.EventNID, error)