From b4bd0cc0f5440fe6b61de050c49ed723b26d7bb4 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 8 Sep 2020 17:30:05 +0100 Subject: [PATCH 1/3] Track goids when running with tracing enabled (#1413) * Track goids when running with tracing enabled * Linting --- internal/sqlutil/trace.go | 32 ++++++++++++++++++++++++++++ internal/sqlutil/writer_exclusive.go | 6 ++++++ 2 files changed, 38 insertions(+) diff --git a/internal/sqlutil/trace.go b/internal/sqlutil/trace.go index fbd983bec..248dbe38b 100644 --- a/internal/sqlutil/trace.go +++ b/internal/sqlutil/trace.go @@ -22,7 +22,10 @@ import ( "io" "os" "regexp" + "runtime" + "strconv" "strings" + "sync" "time" "github.com/matrix-org/dendrite/internal/config" @@ -31,6 +34,7 @@ import ( ) var tracingEnabled = os.Getenv("DENDRITE_TRACE_SQL") == "1" +var goidToWriter sync.Map type traceInterceptor struct { sqlmw.NullInterceptor @@ -40,6 +44,8 @@ func (in *traceInterceptor) StmtQueryContext(ctx context.Context, stmt driver.St startedAt := time.Now() rows, err := stmt.QueryContext(ctx, args) + trackGoID(query) + logrus.WithField("duration", time.Since(startedAt)).WithField(logrus.ErrorKey, err).Debug("executed sql query ", query, " args: ", args) return rows, err @@ -49,6 +55,8 @@ func (in *traceInterceptor) StmtExecContext(ctx context.Context, stmt driver.Stm startedAt := time.Now() result, err := stmt.ExecContext(ctx, args) + trackGoID(query) + logrus.WithField("duration", time.Since(startedAt)).WithField(logrus.ErrorKey, err).Debug("executed sql query ", query, " args: ", args) return result, err @@ -75,6 +83,19 @@ func (in *traceInterceptor) RowsNext(c context.Context, rows driver.Rows, dest [ return err } +func trackGoID(query string) { + thisGoID := goid() + if _, ok := goidToWriter.Load(thisGoID); ok { + return // we're on a writer goroutine + } + + q := strings.TrimSpace(query) + if strings.HasPrefix(q, "SELECT") { + return // SELECTs can go on other goroutines + } + logrus.Warnf("unsafe goid: SQL executed not on an ExclusiveWriter: %s", q) +} + // Open opens a database specified by its database driver name and a driver-specific data source name, // usually consisting of at least a database name and connection information. Includes tracing driver // if DENDRITE_TRACE_SQL=1 @@ -119,3 +140,14 @@ func Open(dbProperties *config.DatabaseOptions) (*sql.DB, error) { func init() { registerDrivers() } + +func goid() int { + var buf [64]byte + n := runtime.Stack(buf[:], false) + idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0] + id, err := strconv.Atoi(idField) + if err != nil { + panic(fmt.Sprintf("cannot get goroutine id: %v", err)) + } + return id +} diff --git a/internal/sqlutil/writer_exclusive.go b/internal/sqlutil/writer_exclusive.go index 002bc32cf..91dd77e4d 100644 --- a/internal/sqlutil/writer_exclusive.go +++ b/internal/sqlutil/writer_exclusive.go @@ -60,6 +60,12 @@ func (w *ExclusiveWriter) run() { if !w.running.CAS(false, true) { return } + if tracingEnabled { + gid := goid() + goidToWriter.Store(gid, w) + defer goidToWriter.Delete(gid) + } + defer w.running.Store(false) for task := range w.todo { if task.db != nil && task.txn != nil { From a0f2a4510fd6f838e12e1654148a17b653574f8b Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 8 Sep 2020 17:47:54 +0100 Subject: [PATCH 2/3] Exclude deleted keys from selectBatchDeviceKeysSQL (#1412) --- keyserver/storage/postgres/device_keys_table.go | 2 +- keyserver/storage/sqlite3/device_keys_table.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/keyserver/storage/postgres/device_keys_table.go b/keyserver/storage/postgres/device_keys_table.go index 779d02c03..e5bec8f6f 100644 --- a/keyserver/storage/postgres/device_keys_table.go +++ b/keyserver/storage/postgres/device_keys_table.go @@ -53,7 +53,7 @@ const selectDeviceKeysSQL = "" + "SELECT key_json, stream_id, display_name FROM keyserver_device_keys WHERE user_id=$1 AND device_id=$2" const selectBatchDeviceKeysSQL = "" + - "SELECT device_id, key_json, stream_id, display_name FROM keyserver_device_keys WHERE user_id=$1" + "SELECT device_id, key_json, stream_id, display_name FROM keyserver_device_keys WHERE user_id=$1 AND key_json <> ''" const selectMaxStreamForUserSQL = "" + "SELECT MAX(stream_id) FROM keyserver_device_keys WHERE user_id=$1" diff --git a/keyserver/storage/sqlite3/device_keys_table.go b/keyserver/storage/sqlite3/device_keys_table.go index 195429f08..e7ff9976d 100644 --- a/keyserver/storage/sqlite3/device_keys_table.go +++ b/keyserver/storage/sqlite3/device_keys_table.go @@ -50,7 +50,7 @@ const selectDeviceKeysSQL = "" + "SELECT key_json, stream_id, display_name FROM keyserver_device_keys WHERE user_id=$1 AND device_id=$2" const selectBatchDeviceKeysSQL = "" + - "SELECT device_id, key_json, stream_id, display_name FROM keyserver_device_keys WHERE user_id=$1" + "SELECT device_id, key_json, stream_id, display_name FROM keyserver_device_keys WHERE user_id=$1 AND key_json <> ''" const selectMaxStreamForUserSQL = "" + "SELECT MAX(stream_id) FROM keyserver_device_keys WHERE user_id=$1" From 35564dd73c48b16b97cd1a972a9b9bc65ec6d7ef Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 8 Sep 2020 17:48:07 +0100 Subject: [PATCH 3/3] Process membership updates in writers (#1414) --- .../storage/shared/membership_updater.go | 148 ++++++++++-------- 1 file changed, 81 insertions(+), 67 deletions(-) diff --git a/roomserver/storage/shared/membership_updater.go b/roomserver/storage/shared/membership_updater.go index 329813bfc..834af6069 100644 --- a/roomserver/storage/shared/membership_updater.go +++ b/roomserver/storage/shared/membership_updater.go @@ -79,89 +79,103 @@ func (u *MembershipUpdater) IsLeave() bool { // SetToInvite implements types.MembershipUpdater func (u *MembershipUpdater) SetToInvite(event gomatrixserverlib.Event) (bool, error) { - senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, event.Sender()) - if err != nil { - return false, fmt.Errorf("u.d.AssignStateKeyNID: %w", err) - } - inserted, err := u.d.InvitesTable.InsertInviteEvent( - u.ctx, u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(), - ) - if err != nil { - return false, fmt.Errorf("u.d.InvitesTable.InsertInviteEvent: %w", err) - } - if u.membership != tables.MembershipStateInvite { - if err = u.d.MembershipTable.UpdateMembership( - u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateInvite, 0, - ); err != nil { - return false, fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err) + var inserted bool + err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error { + senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, event.Sender()) + if err != nil { + return fmt.Errorf("u.d.AssignStateKeyNID: %w", err) } - } - return inserted, nil + inserted, err = u.d.InvitesTable.InsertInviteEvent( + u.ctx, u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(), + ) + if err != nil { + return fmt.Errorf("u.d.InvitesTable.InsertInviteEvent: %w", err) + } + if u.membership != tables.MembershipStateInvite { + if err = u.d.MembershipTable.UpdateMembership( + u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateInvite, 0, + ); err != nil { + return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err) + } + } + return nil + }) + return inserted, err } // SetToJoin implements types.MembershipUpdater func (u *MembershipUpdater) SetToJoin(senderUserID string, eventID string, isUpdate bool) ([]string, error) { var inviteEventIDs []string - senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID) - if err != nil { - return nil, fmt.Errorf("u.d.AssignStateKeyNID: %w", err) - } - - // If this is a join event update, there is no invite to update - if !isUpdate { - inviteEventIDs, err = u.d.InvitesTable.UpdateInviteRetired( - u.ctx, u.txn, u.roomNID, u.targetUserNID, - ) + err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error { + senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID) if err != nil { - return nil, fmt.Errorf("u.d.InvitesTables.UpdateInviteRetired: %w", err) + return fmt.Errorf("u.d.AssignStateKeyNID: %w", err) } - } - // Look up the NID of the new join event - nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID}) - if err != nil { - return nil, fmt.Errorf("u.d.EventNIDs: %w", err) - } - - if u.membership != tables.MembershipStateJoin || isUpdate { - if err = u.d.MembershipTable.UpdateMembership( - u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, - tables.MembershipStateJoin, nIDs[eventID], - ); err != nil { - return nil, fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err) + // If this is a join event update, there is no invite to update + if !isUpdate { + inviteEventIDs, err = u.d.InvitesTable.UpdateInviteRetired( + u.ctx, u.txn, u.roomNID, u.targetUserNID, + ) + if err != nil { + return fmt.Errorf("u.d.InvitesTables.UpdateInviteRetired: %w", err) + } } - } - return inviteEventIDs, nil + // Look up the NID of the new join event + nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID}) + if err != nil { + return fmt.Errorf("u.d.EventNIDs: %w", err) + } + + if u.membership != tables.MembershipStateJoin || isUpdate { + if err = u.d.MembershipTable.UpdateMembership( + u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, + tables.MembershipStateJoin, nIDs[eventID], + ); err != nil { + return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err) + } + } + + return nil + }) + + return inviteEventIDs, err } // SetToLeave implements types.MembershipUpdater func (u *MembershipUpdater) SetToLeave(senderUserID string, eventID string) ([]string, error) { - senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID) - if err != nil { - return nil, fmt.Errorf("u.d.AssignStateKeyNID: %w", err) - } - inviteEventIDs, err := u.d.InvitesTable.UpdateInviteRetired( - u.ctx, u.txn, u.roomNID, u.targetUserNID, - ) - if err != nil { - return nil, fmt.Errorf("u.d.InvitesTable.updateInviteRetired: %w", err) - } + var inviteEventIDs []string - // Look up the NID of the new leave event - nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID}) - if err != nil { - return nil, fmt.Errorf("u.d.EventNIDs: %w", err) - } - - if u.membership != tables.MembershipStateLeaveOrBan { - if err = u.d.MembershipTable.UpdateMembership( - u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, - tables.MembershipStateLeaveOrBan, nIDs[eventID], - ); err != nil { - return nil, fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err) + err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error { + senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID) + if err != nil { + return fmt.Errorf("u.d.AssignStateKeyNID: %w", err) } - } - return inviteEventIDs, nil + inviteEventIDs, err = u.d.InvitesTable.UpdateInviteRetired( + u.ctx, u.txn, u.roomNID, u.targetUserNID, + ) + if err != nil { + return fmt.Errorf("u.d.InvitesTable.updateInviteRetired: %w", err) + } + + // Look up the NID of the new leave event + nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID}) + if err != nil { + return fmt.Errorf("u.d.EventNIDs: %w", err) + } + + if u.membership != tables.MembershipStateLeaveOrBan { + if err = u.d.MembershipTable.UpdateMembership( + u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, + tables.MembershipStateLeaveOrBan, nIDs[eventID], + ); err != nil { + return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err) + } + } + + return nil + }) + return inviteEventIDs, err }