From f48d0fb56966bf7201af585abe9b03d3049c335b Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Thu, 29 Sep 2022 10:44:07 +0200 Subject: [PATCH] Collect message stats --- userapi/consumers/roomserver.go | 96 ++++++++++++++++++++----- userapi/storage/interface.go | 4 ++ userapi/storage/postgres/stats_table.go | 69 +++++++++++++++++- userapi/storage/shared/storage.go | 15 +++- userapi/storage/sqlite3/stats_table.go | 64 +++++++++++++++++ userapi/storage/tables/interface.go | 4 ++ userapi/types/statistics.go | 7 ++ userapi/util/phonehomestats.go | 20 ++++-- 8 files changed, 250 insertions(+), 29 deletions(-) diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go index 952de98f7..ab16a5293 100644 --- a/userapi/consumers/roomserver.go +++ b/userapi/consumers/roomserver.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "time" "github.com/matrix-org/gomatrixserverlib" @@ -23,19 +24,23 @@ import ( "github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/storage/tables" + userAPITypes "github.com/matrix-org/dendrite/userapi/types" "github.com/matrix-org/dendrite/userapi/util" ) type OutputRoomEventConsumer struct { - ctx context.Context - cfg *config.UserAPI - rsAPI rsapi.UserRoomserverAPI - jetstream nats.JetStreamContext - durable string - db storage.Database - topic string - pgClient pushgateway.Client - syncProducer *producers.SyncAPI + ctx context.Context + cfg *config.UserAPI + rsAPI rsapi.UserRoomserverAPI + jetstream nats.JetStreamContext + durable string + db storage.Database + topic string + pgClient pushgateway.Client + syncProducer *producers.SyncAPI + msgCounts map[gomatrixserverlib.ServerName]userAPITypes.MessageStats + msgCountsLock sync.Mutex + serverName gomatrixserverlib.ServerName } func NewOutputRoomEventConsumer( @@ -48,15 +53,18 @@ func NewOutputRoomEventConsumer( syncProducer *producers.SyncAPI, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ - ctx: process.Context(), - cfg: cfg, - jetstream: js, - db: store, - durable: cfg.Matrix.JetStream.Durable("UserAPIRoomServerConsumer"), - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), - pgClient: pgClient, - rsAPI: rsAPI, - syncProducer: syncProducer, + ctx: process.Context(), + cfg: cfg, + jetstream: js, + db: store, + durable: cfg.Matrix.JetStream.Durable("UserAPIRoomServerConsumer"), + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), + pgClient: pgClient, + rsAPI: rsAPI, + syncProducer: syncProducer, + msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{}, + msgCountsLock: sync.Mutex{}, + serverName: cfg.Matrix.ServerName, } } @@ -87,6 +95,31 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms return true } + _, sender, _ := gomatrixserverlib.SplitID('@', event.Sender()) + + switch event.Type() { + case "m.room.message": + s.msgCountsLock.Lock() + msgCount := s.msgCounts[s.serverName] + msgCount.Messages++ + if sender == s.serverName { + msgCount.SentMessages++ + } + s.msgCounts[s.serverName] = msgCount + s.msgCountsLock.Unlock() + case "m.room.encrypted": + s.msgCountsLock.Lock() + msgCount := s.msgCounts[s.serverName] + msgCount.MessagesE2EE++ + if sender == s.serverName { + msgCount.SentMessagesE2EE++ + } + s.msgCounts[s.serverName] = msgCount + s.msgCountsLock.Unlock() + } + + s.storeMessageStats(ctx) + log.WithFields(log.Fields{ "event_id": event.EventID(), "event_type": event.Type(), @@ -106,6 +139,33 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms return true } +func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context) { + s.msgCountsLock.Lock() + defer s.msgCountsLock.Unlock() + var sumStats int64 = 0 + for _, stats := range s.msgCounts { + sumStats += stats.Messages + stats.SentMessages + stats.MessagesE2EE + stats.SentMessagesE2EE + } + // Nothing to do + if sumStats == 0 { + return + } + for serverName, stats := range s.msgCounts { + err := s.db.UpsertDailyMessages(ctx, serverName, stats) + if err != nil { + log.WithError(err).Errorf("failed to upsert daily messages") + } + // Clear stats if we successfully stored it + if err == nil { + stats.MessagesE2EE = 0 + stats.SentMessages = 0 + stats.MessagesE2EE = 0 + stats.SentMessagesE2EE = 0 + s.msgCounts[serverName] = stats + } + } +} + func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, streamPos uint64) error { members, roomSize, err := s.localRoomMembers(ctx, event.RoomID()) if err != nil { diff --git a/userapi/storage/interface.go b/userapi/storage/interface.go index 02efe7afe..07b557e43 100644 --- a/userapi/storage/interface.go +++ b/userapi/storage/interface.go @@ -19,6 +19,8 @@ import ( "encoding/json" "errors" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/userapi/api" @@ -144,6 +146,8 @@ type Database interface { type Statistics interface { UserStatistics(ctx context.Context) (*types.UserStatistics, *types.DatabaseEngine, error) + DailyMessages(ctx context.Context, serverName gomatrixserverlib.ServerName) (types.MessageStats, error) + UpsertDailyMessages(ctx context.Context, serverName gomatrixserverlib.ServerName, stats types.MessageStats) error } // Err3PIDInUse is the error returned when trying to save an association involving diff --git a/userapi/storage/postgres/stats_table.go b/userapi/storage/postgres/stats_table.go index c0b317503..f1bb2cd8f 100644 --- a/userapi/storage/postgres/stats_table.go +++ b/userapi/storage/postgres/stats_table.go @@ -20,13 +20,14 @@ import ( "time" "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage/tables" "github.com/matrix-org/dendrite/userapi/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" ) const userDailyVisitsSchema = ` @@ -43,6 +44,32 @@ CREATE INDEX IF NOT EXISTS userapi_daily_visits_timestamp_idx ON userapi_daily_v CREATE INDEX IF NOT EXISTS userapi_daily_visits_localpart_timestamp_idx ON userapi_daily_visits(localpart, timestamp); ` +const messagesDailySchema = ` +CREATE TABLE IF NOT EXISTS userapi_daily_messages ( + timestamp BIGINT NOT NULL, + server_name TEXT NOT NULL, + daily_messages BIGINT NOT NULL, + daily_sent_messages BIGINT NOT NULL, + daily_e2ee_messages BIGINT NOT NULL, + daily_sent_e2ee_messages BIGINT NOT NULL, + CONSTRAINT daily_messages_unique UNIQUE (timestamp, server_name) +); +` + +const upsertDailyMessagesSQL = ` + INSERT INTO userapi_daily_messages AS u (timestamp, server_name, daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages) + VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT daily_messages_unique + DO UPDATE SET + daily_messages=u.daily_messages+excluded.daily_messages, daily_sent_messages=u.daily_sent_messages+excluded.daily_sent_messages, + daily_e2ee_messages=u.daily_e2ee_messages+excluded.daily_e2ee_messages, daily_sent_e2ee_messages=u.daily_sent_e2ee_messages+excluded.daily_sent_e2ee_messages +` + +const selectDailyMessagesSQL = ` + SELECT daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages + FROM userapi_daily_messages + WHERE server_name = $1 AND timestamp = $2; +` + const countUsersLastSeenAfterSQL = "" + "SELECT COUNT(*) FROM (" + " SELECT localpart FROM device_devices WHERE last_seen_ts > $1 " + @@ -170,6 +197,8 @@ type statsStatements struct { countUserByAccountTypeStmt *sql.Stmt countRegisteredUserByTypeStmt *sql.Stmt dbEngineVersionStmt *sql.Stmt + upsertMessagesStmt *sql.Stmt + selectDailyMessagesStmt *sql.Stmt } func NewPostgresStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (tables.StatsTable, error) { @@ -182,6 +211,10 @@ func NewPostgresStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) if err != nil { return nil, err } + _, err = db.Exec(messagesDailySchema) + if err != nil { + return nil, err + } go s.startTimers() return s, sqlutil.StatementList{ {&s.countUsersLastSeenAfterStmt, countUsersLastSeenAfterSQL}, @@ -191,6 +224,8 @@ func NewPostgresStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) {&s.countUserByAccountTypeStmt, countUserByAccountTypeSQL}, {&s.countRegisteredUserByTypeStmt, countRegisteredUserByTypeStmt}, {&s.dbEngineVersionStmt, queryDBEngineVersion}, + {&s.upsertMessagesStmt, upsertDailyMessagesSQL}, + {&s.selectDailyMessagesStmt, selectDailyMessagesSQL}, }.Prepare(db) } @@ -435,3 +470,33 @@ func (s *statsStatements) UpdateUserDailyVisits( } return err } + +func (s *statsStatements) UpsertDailyMessages( + ctx context.Context, txn *sql.Tx, + serverName gomatrixserverlib.ServerName, stats types.MessageStats, +) error { + stmt := sqlutil.TxStmt(txn, s.upsertMessagesStmt) + timestamp := time.Now().Truncate(time.Hour * 24) + _, err := stmt.ExecContext(ctx, + gomatrixserverlib.AsTimestamp(timestamp), + serverName, + stats.Messages, stats.SentMessages, stats.MessagesE2EE, stats.SentMessagesE2EE, + ) + return err +} + +func (s *statsStatements) DailyMessages( + ctx context.Context, txn *sql.Tx, + serverName gomatrixserverlib.ServerName, +) (types.MessageStats, error) { + stmt := sqlutil.TxStmt(txn, s.selectDailyMessagesStmt) + timestamp := time.Now().Truncate(time.Hour * 24) + + res := types.MessageStats{} + err := stmt.QueryRowContext(ctx, serverName, gomatrixserverlib.AsTimestamp(timestamp)). + Scan(&res.Messages, &res.SentMessages, &res.MessagesE2EE, &res.SentMessagesE2EE) + if err != nil && err != sql.ErrNoRows { + return res, err + } + return res, nil +} diff --git a/userapi/storage/shared/storage.go b/userapi/storage/shared/storage.go index 3ff299f1b..3da086f72 100644 --- a/userapi/storage/shared/storage.go +++ b/userapi/storage/shared/storage.go @@ -29,13 +29,12 @@ import ( "github.com/matrix-org/gomatrixserverlib" "golang.org/x/crypto/bcrypt" - "github.com/matrix-org/dendrite/userapi/types" - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage/tables" + "github.com/matrix-org/dendrite/userapi/types" ) // Database represents an account database @@ -801,3 +800,15 @@ func (d *Database) RemovePushers( func (d *Database) UserStatistics(ctx context.Context) (*types.UserStatistics, *types.DatabaseEngine, error) { return d.Stats.UserStatistics(ctx, nil) } + +func (d *Database) UpsertDailyMessages(ctx context.Context, serverName gomatrixserverlib.ServerName, stats types.MessageStats) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.Stats.UpsertDailyMessages(ctx, txn, serverName, stats) + }) +} + +func (d *Database) DailyMessages( + ctx context.Context, serverName gomatrixserverlib.ServerName, +) (types.MessageStats, error) { + return d.Stats.DailyMessages(ctx, nil, serverName) +} diff --git a/userapi/storage/sqlite3/stats_table.go b/userapi/storage/sqlite3/stats_table.go index 8aa1746c5..2322c7649 100644 --- a/userapi/storage/sqlite3/stats_table.go +++ b/userapi/storage/sqlite3/stats_table.go @@ -44,6 +44,32 @@ CREATE INDEX IF NOT EXISTS userapi_daily_visits_timestamp_idx ON userapi_daily_v CREATE INDEX IF NOT EXISTS userapi_daily_visits_localpart_timestamp_idx ON userapi_daily_visits(localpart, timestamp); ` +const messagesDailySchema = ` +CREATE TABLE IF NOT EXISTS userapi_daily_messages ( + timestamp BIGINT NOT NULL, + server_name TEXT NOT NULL, + daily_messages BIGINT NOT NULL, + daily_sent_messages BIGINT NOT NULL, + daily_e2ee_messages BIGINT NOT NULL, + daily_sent_e2ee_messages BIGINT NOT NULL, + CONSTRAINT daily_messages_unique UNIQUE (timestamp, server_name) +); +` + +const upsertDailyMessagesSQL = ` + INSERT INTO userapi_daily_messages (timestamp, server_name, daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages) + VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (timestamp, server_name) + DO UPDATE SET + daily_messages=daily_messages+excluded.daily_messages, daily_sent_messages=daily_sent_messages+excluded.daily_sent_messages, + daily_e2ee_messages=daily_e2ee_messages+excluded.daily_e2ee_messages, daily_sent_e2ee_messages=daily_sent_e2ee_messages+excluded.daily_sent_e2ee_messages +` + +const selectDailyMessagesSQL = ` + SELECT daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages + FROM userapi_daily_messages + WHERE server_name = $1 AND timestamp = $2; +` + const countUsersLastSeenAfterSQL = "" + "SELECT COUNT(*) FROM (" + " SELECT localpart FROM device_devices WHERE last_seen_ts > $1 " + @@ -176,6 +202,8 @@ type statsStatements struct { countUserByAccountTypeStmt *sql.Stmt countRegisteredUserByTypeStmt *sql.Stmt dbEngineVersionStmt *sql.Stmt + upsertMessagesStmt *sql.Stmt + selectDailyMessagesStmt *sql.Stmt } func NewSQLiteStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (tables.StatsTable, error) { @@ -189,6 +217,10 @@ func NewSQLiteStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (t if err != nil { return nil, err } + _, err = db.Exec(messagesDailySchema) + if err != nil { + return nil, err + } go s.startTimers() return s, sqlutil.StatementList{ {&s.countUsersLastSeenAfterStmt, countUsersLastSeenAfterSQL}, @@ -198,6 +230,8 @@ func NewSQLiteStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (t {&s.countUserByAccountTypeStmt, countUserByAccountTypeSQL}, {&s.countRegisteredUserByTypeStmt, countRegisteredUserByTypeSQL}, {&s.dbEngineVersionStmt, queryDBEngineVersion}, + {&s.upsertMessagesStmt, upsertDailyMessagesSQL}, + {&s.selectDailyMessagesStmt, selectDailyMessagesSQL}, }.Prepare(db) } @@ -451,3 +485,33 @@ func (s *statsStatements) UpdateUserDailyVisits( } return err } + +func (s *statsStatements) UpsertDailyMessages( + ctx context.Context, txn *sql.Tx, + serverName gomatrixserverlib.ServerName, stats types.MessageStats, +) error { + stmt := sqlutil.TxStmt(txn, s.upsertMessagesStmt) + timestamp := time.Now().Truncate(time.Hour * 24) + _, err := stmt.ExecContext(ctx, + gomatrixserverlib.AsTimestamp(timestamp), + serverName, + stats.Messages, stats.SentMessages, stats.MessagesE2EE, stats.SentMessagesE2EE, + ) + return err +} + +func (s *statsStatements) DailyMessages( + ctx context.Context, txn *sql.Tx, + serverName gomatrixserverlib.ServerName, +) (types.MessageStats, error) { + stmt := sqlutil.TxStmt(txn, s.selectDailyMessagesStmt) + timestamp := time.Now().Truncate(time.Hour * 24) + + res := types.MessageStats{} + err := stmt.QueryRowContext(ctx, serverName, gomatrixserverlib.AsTimestamp(timestamp)). + Scan(&res.Messages, &res.SentMessages, &res.MessagesE2EE, &res.SentMessagesE2EE) + if err != nil && err != sql.ErrNoRows { + return res, err + } + return res, nil +} diff --git a/userapi/storage/tables/interface.go b/userapi/storage/tables/interface.go index cc4287997..dca812fb5 100644 --- a/userapi/storage/tables/interface.go +++ b/userapi/storage/tables/interface.go @@ -20,6 +20,8 @@ import ( "encoding/json" "time" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/types" @@ -115,7 +117,9 @@ type NotificationTable interface { type StatsTable interface { UserStatistics(ctx context.Context, txn *sql.Tx) (*types.UserStatistics, *types.DatabaseEngine, error) + DailyMessages(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (types.MessageStats, error) UpdateUserDailyVisits(ctx context.Context, txn *sql.Tx, startTime, lastUpdate time.Time) error + UpsertDailyMessages(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, stats types.MessageStats) error } type NotificationFilter uint32 diff --git a/userapi/types/statistics.go b/userapi/types/statistics.go index 09564f78f..b74e32add 100644 --- a/userapi/types/statistics.go +++ b/userapi/types/statistics.go @@ -28,3 +28,10 @@ type DatabaseEngine struct { Engine string Version string } + +type MessageStats struct { + Messages int64 + SentMessages int64 + MessagesE2EE int64 + SentMessagesE2EE int64 +} diff --git a/userapi/util/phonehomestats.go b/userapi/util/phonehomestats.go index b17f62060..4066a5bcd 100644 --- a/userapi/util/phonehomestats.go +++ b/userapi/util/phonehomestats.go @@ -24,11 +24,12 @@ import ( "syscall" "time" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/storage" - "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" ) type phoneHomeStats struct { @@ -109,12 +110,17 @@ func (p *phoneHomeStats) collect() { } // message and room stats - // TODO: Find a solution to actually set these values + // TODO: Find a solution to actually set this value p.stats["total_room_count"] = 0 - p.stats["daily_messages"] = 0 - p.stats["daily_sent_messages"] = 0 - p.stats["daily_e2ee_messages"] = 0 - p.stats["daily_sent_e2ee_messages"] = 0 + + messageStats, err := p.db.DailyMessages(ctx, p.serverName) + if err != nil { + logrus.WithError(err).Warn("unable to query message stats, using default values") + } + p.stats["daily_messages"] = messageStats.Messages + p.stats["daily_sent_messages"] = messageStats.SentMessages + p.stats["daily_e2ee_messages"] = messageStats.MessagesE2EE + p.stats["daily_sent_e2ee_messages"] = messageStats.SentMessagesE2EE // user stats and DB engine userStats, db, err := p.db.UserStatistics(ctx)