Add user_daily_visits table

This commit is contained in:
Till Faelligen 2022-03-02 11:55:45 +01:00
parent 3f0ed455b0
commit d8611efbd7
3 changed files with 106 additions and 30 deletions

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/storage/tables" "github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
) )
const userDailyVisitsSchema = ` const userDailyVisitsSchema = `
@ -52,7 +53,7 @@ SELECT platform, COUNT(*) FROM (
(SELECT (SELECT
localpart, last_seen_ts, localpart, last_seen_ts,
CASE CASE
WHEN user_agent LIKE '%%Android%%' OR display_name LIKE '%%android%%' THEN 'android' WHEN user_agent LIKE '%%Android%%' THEN 'android'
WHEN user_agent LIKE '%%iOS%%' THEN 'ios' WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
WHEN user_agent LIKE '%%Electron%%' THEN 'electron' WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web' WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
@ -72,26 +73,68 @@ SELECT platform, COUNT(*) FROM (
` `
const countR30UsersV2SQL = ` const countR30UsersV2SQL = `
SELECT
client_type,
count(client_type)
FROM
(
SELECT
user_id,
CASE
WHEN
LOWER(user_agent) LIKE '%%riot%%' OR
LOWER(user_agent) LIKE '%%element%%'
THEN CASE
WHEN LOWER(user_agent) LIKE '%%electron%%' THEN 'electron'
WHEN LOWER(user_agent) LIKE '%%android%%' THEN 'android'
WHEN LOWER(user_agent) LIKE '%%ios%%' THEN 'ios'
ELSE 'unknown'
END
WHEN LOWER(user_agent) LIKE '%%mozilla%%' OR LOWER(user_agent) LIKE '%%gecko%%' THEN 'web'
ELSE 'unknown'
END as client_type
FROM user_daily_visits
WHERE timestamp > $1 AND timestamp < $2
GROUP BY user_id, client_type
HAVING max(timestamp) - min(timestamp) > $3
) AS temp
GROUP BY client_type
` `
const insertUserDailyVists = ` // account_type 1 = users; 3 = admins
INSERT INTO user_daily_visits(localpart, device_id, timestamp, user_agent) VALUES ($1, $2, $3, $4) const updateUserDailyVists = `
ON CONFLICT ON CONSTRAINT localpart_device_timestamp_idx DO NOTHING INSERT INTO user_daily_visits(localpart, device_id, timestamp, user_agent)
SELECT u.localpart, u.device_id, $1, MAX(u.user_agent)
FROM device_devices AS u
LEFT JOIN (
SELECT localpart, device_id, timestamp FROM user_daily_visits
WHERE timestamp = $1
) udv
ON u.localpart = udv.localpart AND u.device_id = udv.device_id
INNER JOIN device_devices d ON d.localpart = u.localpart
INNER JOIN account_accounts a ON a.localpart = u.localpart
WHERE $2 <= d.last_seen_ts AND d.last_seen_ts < $3
AND a.account_type in (1, 3)
GROUP BY u.localpart, u.device_id
ON CONFLICT (localpart, device_id, timestamp) DO NOTHING
;
` `
type statsStatements struct { type statsStatements struct {
serverName gomatrixserverlib.ServerName serverName gomatrixserverlib.ServerName
lastUpdate time.Time
countUsersLastSeenAfterStmt *sql.Stmt countUsersLastSeenAfterStmt *sql.Stmt
countR30UsersStmt *sql.Stmt countR30UsersStmt *sql.Stmt
countR30UsersV2Stmt *sql.Stmt countR30UsersV2Stmt *sql.Stmt
insertUserDailyVisits *sql.Stmt updateUserDailyVisits *sql.Stmt
} }
func NewPostgresStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (tables.StatsTable, error) { func NewPostgresStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName) (tables.StatsTable, error) {
s := &statsStatements{ s := &statsStatements{
serverName: serverName, serverName: serverName,
lastUpdate: time.Now(),
} }
_, err := db.Exec(userDailyVisitsSchema) _, err := db.Exec(userDailyVisitsSchema)
if err != nil { if err != nil {
return nil, err return nil, err
@ -100,10 +143,31 @@ func NewPostgresStatsTable(db *sql.DB, serverName gomatrixserverlib.ServerName)
{&s.countUsersLastSeenAfterStmt, countUsersLastSeenAfterSQL}, {&s.countUsersLastSeenAfterStmt, countUsersLastSeenAfterSQL},
{&s.countR30UsersStmt, countR30UsersSQL}, {&s.countR30UsersStmt, countR30UsersSQL},
{&s.countR30UsersV2Stmt, countR30UsersV2SQL}, {&s.countR30UsersV2Stmt, countR30UsersV2SQL},
{&s.insertUserDailyVisits, insertUserDailyVists}, {&s.updateUserDailyVisits, updateUserDailyVists},
}.Prepare(db) }.Prepare(db)
} }
func (s *statsStatements) startTimers() {
// initial run
time.AfterFunc(time.Minute*5, func() {
logrus.Infof("Executing UpdateUserDailyVisits")
if err := s.UpdateUserDailyVisits(context.Background(), nil); err != nil {
logrus.WithError(err).Error("failed to update daily user visits")
}
})
// every x hours
ticker := time.NewTicker(time.Hour * 3)
for {
select {
case <-ticker.C:
logrus.Infof("Executing UpdateUserDailyVisits")
if err := s.UpdateUserDailyVisits(context.Background(), nil); err != nil {
logrus.WithError(err).Error("failed to update daily user visits")
}
}
}
}
func (s *statsStatements) DailyUsers(ctx context.Context, txn *sql.Tx) (result int64, err error) { func (s *statsStatements) DailyUsers(ctx context.Context, txn *sql.Tx) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countUsersLastSeenAfterStmt) stmt := sqlutil.TxStmt(txn, s.countUsersLastSeenAfterStmt)
lastSeenAfter := time.Now().AddDate(0, 0, -1) lastSeenAfter := time.Now().AddDate(0, 0, -1)
@ -154,7 +218,7 @@ func (s *statsStatements) R30Users(ctx context.Context, txn *sql.Tx) (map[string
result[platform] = count result[platform] = count
} }
return map[string]int64{}, rows.Err() return result, rows.Err()
} }
/* R30UsersV2 counts the number of 30 day retained users, defined as users that: /* R30UsersV2 counts the number of 30 day retained users, defined as users that:
@ -163,13 +227,14 @@ func (s *statsStatements) R30Users(ctx context.Context, txn *sql.Tx) (map[string
*/ */
func (s *statsStatements) R30UsersV2(ctx context.Context, txn *sql.Tx) (map[string]int64, error) { func (s *statsStatements) R30UsersV2(ctx context.Context, txn *sql.Tx) (map[string]int64, error) {
stmt := sqlutil.TxStmt(txn, s.countR30UsersV2Stmt) stmt := sqlutil.TxStmt(txn, s.countR30UsersV2Stmt)
lastSeenAfter := time.Now().AddDate(0, 0, -30) sixtyDaysAgo := time.Now().AddDate(0, 0, -60)
diff := time.Hour * 24 * 30 thirtyDaysAgo := time.Now().AddDate(0, 0, -30)
diff.Milliseconds() tomorrow := time.Now().Add(time.Hour * 24)
rows, err := stmt.QueryContext(ctx, rows, err := stmt.QueryContext(ctx,
gomatrixserverlib.AsTimestamp(lastSeenAfter), gomatrixserverlib.AsTimestamp(sixtyDaysAgo),
diff.Milliseconds(), gomatrixserverlib.AsTimestamp(tomorrow),
gomatrixserverlib.AsTimestamp(thirtyDaysAgo),
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -177,7 +242,13 @@ func (s *statsStatements) R30UsersV2(ctx context.Context, txn *sql.Tx) (map[stri
var platform string var platform string
var count int64 var count int64
var result = make(map[string]int64) var result = map[string]int64{
"ios": 0,
"android": 0,
"web": 0,
"electron": 0,
"all": 0,
}
for rows.Next() { for rows.Next() {
if err := rows.Scan(&platform, &count); err != nil { if err := rows.Scan(&platform, &count); err != nil {
return nil, err return nil, err
@ -189,11 +260,26 @@ func (s *statsStatements) R30UsersV2(ctx context.Context, txn *sql.Tx) (map[stri
result[platform] = count result[platform] = count
} }
return map[string]int64{}, rows.Err() return result, rows.Err()
} }
func (s *statsStatements) InsertUserDailyVisits(ctx context.Context, txn *sql.Tx, localpart, deviceID string, timestamp int64, userAgent string) error { func (s *statsStatements) UpdateUserDailyVisits(ctx context.Context, txn *sql.Tx) error {
stmt := sqlutil.TxStmt(txn, s.insertUserDailyVisits) stmt := sqlutil.TxStmt(txn, s.updateUserDailyVisits)
_, err := stmt.ExecContext(ctx, localpart, deviceID, timestamp, userAgent) _ = stmt
todayStart := time.Now().Truncate(time.Hour * 24)
lastUpdate := s.lastUpdate
// edge case
if todayStart.After(s.lastUpdate) {
todayStart = todayStart.AddDate(0, 0, -1)
}
_, err := stmt.ExecContext(ctx,
gomatrixserverlib.AsTimestamp(todayStart),
gomatrixserverlib.AsTimestamp(lastUpdate),
gomatrixserverlib.AsTimestamp(time.Now()),
)
if err == nil {
s.lastUpdate = time.Now()
}
return err return err
} }

View file

@ -623,16 +623,8 @@ func (d *Database) RemoveAllDevices(
// UpdateDeviceLastSeen updates a last seen timestamp and the ip address. // UpdateDeviceLastSeen updates a last seen timestamp and the ip address.
func (d *Database) UpdateDeviceLastSeen(ctx context.Context, localpart, deviceID, ipAddr, userAgent string) error { func (d *Database) UpdateDeviceLastSeen(ctx context.Context, localpart, deviceID, ipAddr, userAgent string) error {
err := d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.Devices.UpdateDeviceLastSeen(ctx, txn, localpart, deviceID, ipAddr, userAgent)
})
if err != nil {
return err
}
// calculate start of the day
timestamp := time.Now().UTC().Truncate(time.Hour*24).UnixNano() / 1000000
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.Stats.InsertUserDailyVisits(ctx, txn, localpart, deviceID, timestamp, userAgent) return d.Devices.UpdateDeviceLastSeen(ctx, txn, localpart, deviceID, ipAddr, userAgent)
}) })
} }

View file

@ -94,6 +94,4 @@ type ThreePIDTable interface {
DeleteThreePID(ctx context.Context, txn *sql.Tx, threepid string, medium string) (err error) DeleteThreePID(ctx context.Context, txn *sql.Tx, threepid string, medium string) (err error)
} }
type StatsTable interface { type StatsTable interface{}
InsertUserDailyVisits(ctx context.Context, txn *sql.Tx, localpart, deviceID string, timestamp int64, userAgent string, ) error
}