Collect message stats

This commit is contained in:
Till Faelligen 2022-09-29 10:44:07 +02:00
parent 3f9e38e80a
commit f48d0fb569
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
8 changed files with 250 additions and 29 deletions

View file

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/matrix-org/gomatrixserverlib"
@ -23,19 +24,23 @@ import (
"github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/storage/tables"
userAPITypes "github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/dendrite/userapi/util"
)
type OutputRoomEventConsumer struct {
ctx context.Context
cfg *config.UserAPI
rsAPI rsapi.UserRoomserverAPI
jetstream nats.JetStreamContext
durable string
db storage.Database
topic string
pgClient pushgateway.Client
syncProducer *producers.SyncAPI
ctx context.Context
cfg *config.UserAPI
rsAPI rsapi.UserRoomserverAPI
jetstream nats.JetStreamContext
durable string
db storage.Database
topic string
pgClient pushgateway.Client
syncProducer *producers.SyncAPI
msgCounts map[gomatrixserverlib.ServerName]userAPITypes.MessageStats
msgCountsLock sync.Mutex
serverName gomatrixserverlib.ServerName
}
func NewOutputRoomEventConsumer(
@ -48,15 +53,18 @@ func NewOutputRoomEventConsumer(
syncProducer *producers.SyncAPI,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
cfg: cfg,
jetstream: js,
db: store,
durable: cfg.Matrix.JetStream.Durable("UserAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
pgClient: pgClient,
rsAPI: rsAPI,
syncProducer: syncProducer,
ctx: process.Context(),
cfg: cfg,
jetstream: js,
db: store,
durable: cfg.Matrix.JetStream.Durable("UserAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
pgClient: pgClient,
rsAPI: rsAPI,
syncProducer: syncProducer,
msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{},
msgCountsLock: sync.Mutex{},
serverName: cfg.Matrix.ServerName,
}
}
@ -87,6 +95,31 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
return true
}
_, sender, _ := gomatrixserverlib.SplitID('@', event.Sender())
switch event.Type() {
case "m.room.message":
s.msgCountsLock.Lock()
msgCount := s.msgCounts[s.serverName]
msgCount.Messages++
if sender == s.serverName {
msgCount.SentMessages++
}
s.msgCounts[s.serverName] = msgCount
s.msgCountsLock.Unlock()
case "m.room.encrypted":
s.msgCountsLock.Lock()
msgCount := s.msgCounts[s.serverName]
msgCount.MessagesE2EE++
if sender == s.serverName {
msgCount.SentMessagesE2EE++
}
s.msgCounts[s.serverName] = msgCount
s.msgCountsLock.Unlock()
}
s.storeMessageStats(ctx)
log.WithFields(log.Fields{
"event_id": event.EventID(),
"event_type": event.Type(),
@ -106,6 +139,33 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
return true
}
func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context) {
s.msgCountsLock.Lock()
defer s.msgCountsLock.Unlock()
var sumStats int64 = 0
for _, stats := range s.msgCounts {
sumStats += stats.Messages + stats.SentMessages + stats.MessagesE2EE + stats.SentMessagesE2EE
}
// Nothing to do
if sumStats == 0 {
return
}
for serverName, stats := range s.msgCounts {
err := s.db.UpsertDailyMessages(ctx, serverName, stats)
if err != nil {
log.WithError(err).Errorf("failed to upsert daily messages")
}
// Clear stats if we successfully stored it
if err == nil {
stats.MessagesE2EE = 0
stats.SentMessages = 0
stats.MessagesE2EE = 0
stats.SentMessagesE2EE = 0
s.msgCounts[serverName] = stats
}
}
}
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, streamPos uint64) error {
members, roomSize, err := s.localRoomMembers(ctx, event.RoomID())
if err != nil {

View file

@ -19,6 +19,8 @@ import (
"encoding/json"
"errors"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/userapi/api"
@ -144,6 +146,8 @@ type Database interface {
type Statistics interface {
UserStatistics(ctx context.Context) (*types.UserStatistics, *types.DatabaseEngine, error)
DailyMessages(ctx context.Context, serverName gomatrixserverlib.ServerName) (types.MessageStats, error)
UpsertDailyMessages(ctx context.Context, serverName gomatrixserverlib.ServerName, stats types.MessageStats) error
}
// Err3PIDInUse is the error returned when trying to save an association involving

View file

@ -20,13 +20,14 @@ import (
"time"
"github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
const userDailyVisitsSchema = `
@ -43,6 +44,32 @@ CREATE INDEX IF NOT EXISTS userapi_daily_visits_timestamp_idx ON userapi_daily_v
CREATE INDEX IF NOT EXISTS userapi_daily_visits_localpart_timestamp_idx ON userapi_daily_visits(localpart, timestamp);
`
const messagesDailySchema = `
CREATE TABLE IF NOT EXISTS userapi_daily_messages (
timestamp BIGINT NOT NULL,
server_name TEXT NOT NULL,
daily_messages BIGINT NOT NULL,
daily_sent_messages BIGINT NOT NULL,
daily_e2ee_messages BIGINT NOT NULL,
daily_sent_e2ee_messages BIGINT NOT NULL,
CONSTRAINT daily_messages_unique UNIQUE (timestamp, server_name)
);
`
const upsertDailyMessagesSQL = `
INSERT INTO userapi_daily_messages AS u (timestamp, server_name, daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT daily_messages_unique
DO UPDATE SET
daily_messages=u.daily_messages+excluded.daily_messages, daily_sent_messages=u.daily_sent_messages+excluded.daily_sent_messages,
daily_e2ee_messages=u.daily_e2ee_messages+excluded.daily_e2ee_messages, daily_sent_e2ee_messages=u.daily_sent_e2ee_messages+excluded.daily_sent_e2ee_messages
`
const selectDailyMessagesSQL = `
SELECT daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages
FROM userapi_daily_messages
WHERE server_name = $1 AND timestamp = $2;
`
const countUsersLastSeenAfterSQL = "" +
"SELECT COUNT(*) FROM (" +
" SELECT localpart FROM device_devices WHERE last_seen_ts > $1 " +
@ -170,6 +197,8 @@ type statsStatements struct {
countUserByAccountTypeStmt *sql.Stmt
countRegisteredUserByTypeStmt *sql.Stmt
dbEngineVersionStmt *sql.Stmt
upsertMessagesStmt *sql.Stmt
selectDailyMessagesStmt *sql.Stmt
}
func NewPostgresStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (tables.StatsTable, error) {
@ -182,6 +211,10 @@ func NewPostgresStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName)
if err != nil {
return nil, err
}
_, err = db.Exec(messagesDailySchema)
if err != nil {
return nil, err
}
go s.startTimers()
return s, sqlutil.StatementList{
{&s.countUsersLastSeenAfterStmt, countUsersLastSeenAfterSQL},
@ -191,6 +224,8 @@ func NewPostgresStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName)
{&s.countUserByAccountTypeStmt, countUserByAccountTypeSQL},
{&s.countRegisteredUserByTypeStmt, countRegisteredUserByTypeStmt},
{&s.dbEngineVersionStmt, queryDBEngineVersion},
{&s.upsertMessagesStmt, upsertDailyMessagesSQL},
{&s.selectDailyMessagesStmt, selectDailyMessagesSQL},
}.Prepare(db)
}
@ -435,3 +470,33 @@ func (s *statsStatements) UpdateUserDailyVisits(
}
return err
}
func (s *statsStatements) UpsertDailyMessages(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, stats types.MessageStats,
) error {
stmt := sqlutil.TxStmt(txn, s.upsertMessagesStmt)
timestamp := time.Now().Truncate(time.Hour * 24)
_, err := stmt.ExecContext(ctx,
gomatrixserverlib.AsTimestamp(timestamp),
serverName,
stats.Messages, stats.SentMessages, stats.MessagesE2EE, stats.SentMessagesE2EE,
)
return err
}
func (s *statsStatements) DailyMessages(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName,
) (types.MessageStats, error) {
stmt := sqlutil.TxStmt(txn, s.selectDailyMessagesStmt)
timestamp := time.Now().Truncate(time.Hour * 24)
res := types.MessageStats{}
err := stmt.QueryRowContext(ctx, serverName, gomatrixserverlib.AsTimestamp(timestamp)).
Scan(&res.Messages, &res.SentMessages, &res.MessagesE2EE, &res.SentMessagesE2EE)
if err != nil && err != sql.ErrNoRows {
return res, err
}
return res, nil
}

View file

@ -29,13 +29,12 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"golang.org/x/crypto/bcrypt"
"github.com/matrix-org/dendrite/userapi/types"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/userapi/types"
)
// Database represents an account database
@ -801,3 +800,15 @@ func (d *Database) RemovePushers(
func (d *Database) UserStatistics(ctx context.Context) (*types.UserStatistics, *types.DatabaseEngine, error) {
return d.Stats.UserStatistics(ctx, nil)
}
func (d *Database) UpsertDailyMessages(ctx context.Context, serverName gomatrixserverlib.ServerName, stats types.MessageStats) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.Stats.UpsertDailyMessages(ctx, txn, serverName, stats)
})
}
func (d *Database) DailyMessages(
ctx context.Context, serverName gomatrixserverlib.ServerName,
) (types.MessageStats, error) {
return d.Stats.DailyMessages(ctx, nil, serverName)
}

View file

@ -44,6 +44,32 @@ CREATE INDEX IF NOT EXISTS userapi_daily_visits_timestamp_idx ON userapi_daily_v
CREATE INDEX IF NOT EXISTS userapi_daily_visits_localpart_timestamp_idx ON userapi_daily_visits(localpart, timestamp);
`
const messagesDailySchema = `
CREATE TABLE IF NOT EXISTS userapi_daily_messages (
timestamp BIGINT NOT NULL,
server_name TEXT NOT NULL,
daily_messages BIGINT NOT NULL,
daily_sent_messages BIGINT NOT NULL,
daily_e2ee_messages BIGINT NOT NULL,
daily_sent_e2ee_messages BIGINT NOT NULL,
CONSTRAINT daily_messages_unique UNIQUE (timestamp, server_name)
);
`
const upsertDailyMessagesSQL = `
INSERT INTO userapi_daily_messages (timestamp, server_name, daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (timestamp, server_name)
DO UPDATE SET
daily_messages=daily_messages+excluded.daily_messages, daily_sent_messages=daily_sent_messages+excluded.daily_sent_messages,
daily_e2ee_messages=daily_e2ee_messages+excluded.daily_e2ee_messages, daily_sent_e2ee_messages=daily_sent_e2ee_messages+excluded.daily_sent_e2ee_messages
`
const selectDailyMessagesSQL = `
SELECT daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages
FROM userapi_daily_messages
WHERE server_name = $1 AND timestamp = $2;
`
const countUsersLastSeenAfterSQL = "" +
"SELECT COUNT(*) FROM (" +
" SELECT localpart FROM device_devices WHERE last_seen_ts > $1 " +
@ -176,6 +202,8 @@ type statsStatements struct {
countUserByAccountTypeStmt *sql.Stmt
countRegisteredUserByTypeStmt *sql.Stmt
dbEngineVersionStmt *sql.Stmt
upsertMessagesStmt *sql.Stmt
selectDailyMessagesStmt *sql.Stmt
}
func NewSQLiteStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (tables.StatsTable, error) {
@ -189,6 +217,10 @@ func NewSQLiteStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (t
if err != nil {
return nil, err
}
_, err = db.Exec(messagesDailySchema)
if err != nil {
return nil, err
}
go s.startTimers()
return s, sqlutil.StatementList{
{&s.countUsersLastSeenAfterStmt, countUsersLastSeenAfterSQL},
@ -198,6 +230,8 @@ func NewSQLiteStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (t
{&s.countUserByAccountTypeStmt, countUserByAccountTypeSQL},
{&s.countRegisteredUserByTypeStmt, countRegisteredUserByTypeSQL},
{&s.dbEngineVersionStmt, queryDBEngineVersion},
{&s.upsertMessagesStmt, upsertDailyMessagesSQL},
{&s.selectDailyMessagesStmt, selectDailyMessagesSQL},
}.Prepare(db)
}
@ -451,3 +485,33 @@ func (s *statsStatements) UpdateUserDailyVisits(
}
return err
}
func (s *statsStatements) UpsertDailyMessages(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, stats types.MessageStats,
) error {
stmt := sqlutil.TxStmt(txn, s.upsertMessagesStmt)
timestamp := time.Now().Truncate(time.Hour * 24)
_, err := stmt.ExecContext(ctx,
gomatrixserverlib.AsTimestamp(timestamp),
serverName,
stats.Messages, stats.SentMessages, stats.MessagesE2EE, stats.SentMessagesE2EE,
)
return err
}
func (s *statsStatements) DailyMessages(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName,
) (types.MessageStats, error) {
stmt := sqlutil.TxStmt(txn, s.selectDailyMessagesStmt)
timestamp := time.Now().Truncate(time.Hour * 24)
res := types.MessageStats{}
err := stmt.QueryRowContext(ctx, serverName, gomatrixserverlib.AsTimestamp(timestamp)).
Scan(&res.Messages, &res.SentMessages, &res.MessagesE2EE, &res.SentMessagesE2EE)
if err != nil && err != sql.ErrNoRows {
return res, err
}
return res, nil
}

View file

@ -20,6 +20,8 @@ import (
"encoding/json"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/types"
@ -115,7 +117,9 @@ type NotificationTable interface {
type StatsTable interface {
UserStatistics(ctx context.Context, txn *sql.Tx) (*types.UserStatistics, *types.DatabaseEngine, error)
DailyMessages(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (types.MessageStats, error)
UpdateUserDailyVisits(ctx context.Context, txn *sql.Tx, startTime, lastUpdate time.Time) error
UpsertDailyMessages(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, stats types.MessageStats) error
}
type NotificationFilter uint32

View file

@ -28,3 +28,10 @@ type DatabaseEngine struct {
Engine string
Version string
}
type MessageStats struct {
Messages int64
SentMessages int64
MessagesE2EE int64
SentMessagesE2EE int64
}

View file

@ -24,11 +24,12 @@ import (
"syscall"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
type phoneHomeStats struct {
@ -109,12 +110,17 @@ func (p *phoneHomeStats) collect() {
}
// message and room stats
// TODO: Find a solution to actually set these values
// TODO: Find a solution to actually set this value
p.stats["total_room_count"] = 0
p.stats["daily_messages"] = 0
p.stats["daily_sent_messages"] = 0
p.stats["daily_e2ee_messages"] = 0
p.stats["daily_sent_e2ee_messages"] = 0
messageStats, err := p.db.DailyMessages(ctx, p.serverName)
if err != nil {
logrus.WithError(err).Warn("unable to query message stats, using default values")
}
p.stats["daily_messages"] = messageStats.Messages
p.stats["daily_sent_messages"] = messageStats.SentMessages
p.stats["daily_e2ee_messages"] = messageStats.MessagesE2EE
p.stats["daily_sent_e2ee_messages"] = messageStats.SentMessagesE2EE
// user stats and DB engine
userStats, db, err := p.db.UserStatistics(ctx)