Move stats to the userapi

This commit is contained in:
Till Faelligen 2022-03-03 16:46:01 +01:00
parent 845363253a
commit b07c31da22
18 changed files with 212 additions and 782 deletions

View file

@ -30,8 +30,7 @@ func SyncAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
base.ProcessContext,
base.PublicClientAPIMux, userAPI, rsAPI,
base.KeyServerHTTPClient(),
federation, cfg,
false,
federation, &cfg.SyncAPI,
)
base.SetupAndServeHTTP(

View file

@ -71,7 +71,6 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, &m.Config.ClientAPI.RateLimiting, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
process, csMux, m.UserAPI, m.RoomserverAPI,
m.KeyAPI, m.FedClient, m.Config,
true,
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
)
}

View file

@ -141,12 +141,4 @@ type Database interface {
SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
DailyE2EEMessages(ctx context.Context, prevID int64) (result int64, err error)
DailySentE2EEMessages(ctx context.Context, prevID int64) (result int64, err error)
DailyMessages(ctx context.Context, prevID int64) (result int64, err error)
DailySentMessages(ctx context.Context, prevID int64) (result int64, err error)
DailyActiveE2EERooms(ctx context.Context, prevID int64) (result int64, err error)
DailyActiveRooms(ctx context.Context, prevID int64) (result int64, err error)
TotalRooms(ctx context.Context) (result int64, err error)
}

View file

@ -1,117 +0,0 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/gomatrixserverlib"
)
const countEventTypesSQL = "" +
"SELECT COUNT(*) FROM syncapi_output_room_events" +
" WHERE type = $1 AND id > $2 AND sender like $3"
const countActiveRoomsSQL = "" +
"SELECT COUNT(DISTINCT room_id) FROM syncapi_output_room_events" +
" WHERE type = $1 AND id > $2"
const countTotalRoomsSQL = "" +
"SELECT COUNT(DISTINCT room_id) FROM syncapi_output_room_events"
type statsStatements struct {
serverName gomatrixserverlib.ServerName
countTypesStmt *sql.Stmt
countActiveRoomsStmt *sql.Stmt
countTotalRoomsStmt *sql.Stmt
}
func PrepareStats(db *sql.DB, serverName gomatrixserverlib.ServerName) (tables.Stats, error) {
s := &statsStatements{
serverName: serverName,
}
return s, sqlutil.StatementList{
{&s.countTypesStmt, countEventTypesSQL},
{&s.countActiveRoomsStmt, countActiveRoomsSQL},
{&s.countTotalRoomsStmt, countTotalRoomsSQL},
}.Prepare(db)
}
func (s *statsStatements) DailyE2EEMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countTypesStmt)
err = stmt.QueryRowContext(ctx,
"m.room.encrypted",
prevID, "%",
).Scan(&result)
return
}
func (s *statsStatements) DailySentE2EEMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countTypesStmt)
err = stmt.QueryRowContext(ctx,
"m.room.encrypted",
prevID,
fmt.Sprintf("%%:%s", s.serverName),
).Scan(&result)
return
}
func (s *statsStatements) DailyMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countTypesStmt)
err = stmt.QueryRowContext(ctx,
"m.room.message",
prevID,
"%",
).Scan(&result)
return
}
func (s *statsStatements) DailySentMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countTypesStmt)
err = stmt.QueryRowContext(ctx,
"m.room.message",
prevID,
fmt.Sprintf("%%:%s", s.serverName),
).Scan(&result)
return
}
func (s *statsStatements) DailyActiveE2EERooms(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countActiveRoomsStmt)
err = stmt.QueryRowContext(ctx,
"m.room.encrypted",
prevID,
).Scan(&result)
return
}
func (s *statsStatements) DailyActiveRooms(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countActiveRoomsStmt)
err = stmt.QueryRowContext(ctx,
"m.room.message",
prevID,
).Scan(&result)
return
}
func (s *statsStatements) TotalRooms(ctx context.Context, txn *sql.Tx) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countTotalRoomsStmt)
err = stmt.QueryRowContext(ctx).Scan(&result)
return
}

View file

@ -92,10 +92,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
if err != nil {
return nil, err
}
stats, err := PrepareStats(d.db, d.serverName)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -116,7 +113,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
SendToDevice: sendToDevice,
Receipts: receipts,
Memberships: memberships,
Stats: stats,
}
return &d, nil
}

View file

@ -48,7 +48,6 @@ type Database struct {
Filter tables.Filter
Receipts tables.Receipts
Memberships tables.Memberships
Stats tables.Stats
}
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
@ -967,25 +966,3 @@ func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID
func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
}
func (s *Database) DailyE2EEMessages(ctx context.Context, prevID int64) (result int64, err error) {
return s.Stats.DailyE2EEMessages(ctx, nil, prevID)
}
func (s *Database) DailySentE2EEMessages(ctx context.Context, prevID int64) (result int64, err error) {
return s.Stats.DailySentE2EEMessages(ctx, nil, prevID)
}
func (s *Database) DailyMessages(ctx context.Context, prevID int64) (result int64, err error) {
return s.Stats.DailyMessages(ctx, nil, prevID)
}
func (s *Database) DailySentMessages(ctx context.Context, prevID int64) (result int64, err error) {
return s.Stats.DailySentMessages(ctx, nil, prevID)
}
func (s *Database) DailyActiveE2EERooms(ctx context.Context, prevID int64) (result int64, err error) {
return s.Stats.DailyActiveE2EERooms(ctx, nil, prevID)
}
func (s *Database) DailyActiveRooms(ctx context.Context, prevID int64) (result int64, err error) {
return s.Stats.DailyActiveRooms(ctx, nil, prevID)
}
func (s *Database) TotalRooms(ctx context.Context) (result int64, err error) {
return s.Stats.TotalRooms(ctx, nil)
}

View file

@ -1,117 +0,0 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/gomatrixserverlib"
)
const countEventTypesSQL = "" +
"SELECT COUNT(*) FROM syncapi_output_room_events" +
" WHERE type = $1 AND id > $2 AND sender like $3"
const countActiveRoomsSQL = "" +
"SELECT COUNT(DISTINCT room_id) FROM syncapi_output_room_events" +
" WHERE type = $1 AND id > $2"
const countTotalRoomsSQL = "" +
"SELECT COUNT(DISTINCT room_id) FROM syncapi_output_room_events"
type statsStatements struct {
serverName gomatrixserverlib.ServerName
countTypesStmt *sql.Stmt
countActiveRoomsStmt *sql.Stmt
countTotalRoomsStmt *sql.Stmt
}
func PrepareStats(db *sql.DB, serverName gomatrixserverlib.ServerName) (tables.Stats, error) {
s := &statsStatements{
serverName: serverName,
}
return s, sqlutil.StatementList{
{&s.countTypesStmt, countEventTypesSQL},
{&s.countActiveRoomsStmt, countActiveRoomsSQL},
{&s.countTotalRoomsStmt, countTotalRoomsSQL},
}.Prepare(db)
}
func (s *statsStatements) DailyE2EEMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countTypesStmt)
err = stmt.QueryRowContext(ctx,
"m.room.encrypted",
prevID, "%",
).Scan(&result)
return
}
func (s *statsStatements) DailySentE2EEMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countTypesStmt)
err = stmt.QueryRowContext(ctx,
"m.room.encrypted",
prevID,
fmt.Sprintf("%%:%s", s.serverName),
).Scan(&result)
return
}
func (s *statsStatements) DailyMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countTypesStmt)
err = stmt.QueryRowContext(ctx,
"m.room.message",
prevID,
"%",
).Scan(&result)
return
}
func (s *statsStatements) DailySentMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countTypesStmt)
err = stmt.QueryRowContext(ctx,
"m.room.message",
prevID,
fmt.Sprintf("%%:%s", s.serverName),
).Scan(&result)
return
}
func (s *statsStatements) DailyActiveE2EERooms(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countActiveRoomsStmt)
err = stmt.QueryRowContext(ctx,
"m.room.encrypted",
prevID,
).Scan(&result)
return
}
func (s *statsStatements) DailyActiveRooms(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countActiveRoomsStmt)
err = stmt.QueryRowContext(ctx,
"m.room.message",
prevID,
).Scan(&result)
return
}
func (s *statsStatements) TotalRooms(ctx context.Context, txn *sql.Tx) (result int64, err error) {
stmt := sqlutil.TxStmt(txn, s.countTotalRoomsStmt)
err = stmt.QueryRowContext(ctx).Scan(&result)
return
}

View file

@ -103,10 +103,6 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
return err
}
stats, err := PrepareStats(d.db, d.serverName)
if err != nil {
return err
}
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
@ -128,7 +124,6 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
SendToDevice: sendToDevice,
Receipts: receipts,
Memberships: memberships,
Stats: stats,
}
return nil
}

View file

@ -171,13 +171,3 @@ type Memberships interface {
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
SelectMembership(ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string) (eventID string, streamPos, topologyPos types.StreamPosition, err error)
}
type Stats interface {
DailyE2EEMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error)
DailySentE2EEMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error)
DailyMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error)
DailySentMessages(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error)
DailyActiveE2EERooms(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error)
DailyActiveRooms(ctx context.Context, txn *sql.Tx, prevID int64) (result int64, err error)
TotalRooms(ctx context.Context, txn *sql.Tx) (result int64, err error)
}

View file

@ -15,18 +15,9 @@
package syncapi
import (
"bytes"
"context"
"encoding/json"
"math"
"net/http"
"runtime"
"syscall"
"time"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/eduserver/cache"
@ -55,11 +46,8 @@ func AddPublicRoutes(
rsAPI api.RoomserverInternalAPI,
keyAPI keyapi.KeyInternalAPI,
federation *gomatrixserverlib.FederationClient,
baseCfg *config.Dendrite,
isMonolith bool,
cfg *config.SyncAPI,
) {
cfg := &baseCfg.SyncAPI
startTime := time.Now()
js := jetstream.Prepare(&cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database, cfg.Matrix.ServerName)
@ -121,227 +109,6 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start receipts consumer")
}
if baseCfg.Global.ReportStats {
go startPhoneHomeCollector(startTime, baseCfg, syncDB, userAPI, isMonolith)
}
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}
type phoneHomeStats struct {
prevData timestampToRUUsage
stats map[string]interface{}
serverName gomatrixserverlib.ServerName
userAPI userapi.UserInternalAPI
startTime time.Time
cfg *config.Dendrite
db storage.Database
isMonolith bool
client *http.Client
}
type timestampToRUUsage struct {
timestamp int64
usage syscall.Rusage
}
func startPhoneHomeCollector(startTime time.Time, cfg *config.Dendrite, syncDB storage.Database, userAPI userapi.UserInternalAPI, isMonolith bool) {
p := phoneHomeStats{
startTime: startTime,
serverName: cfg.Global.ServerName,
cfg: cfg,
db: syncDB,
userAPI: userAPI,
isMonolith: isMonolith,
client: &http.Client{
Timeout: time.Second * 30,
},
}
// start initial run after 5min
time.AfterFunc(time.Second*1, func() {
p.collect()
})
// run every 3 hours
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ticker.C:
p.collect()
}
}
}
func (p *phoneHomeStats) collect() {
p.stats = make(map[string]interface{})
// general information
p.stats["homeserver"] = p.serverName
p.stats["monolith"] = p.isMonolith
p.stats["version"] = internal.VersionString()
p.stats["timestamp"] = time.Now().Unix()
p.stats["go_version"] = runtime.Version()
p.stats["go_arch"] = runtime.GOARCH
p.stats["go_os"] = runtime.GOOS
p.stats["num_cpu"] = runtime.NumCPU()
p.stats["num_go_routine"] = runtime.NumGoroutine()
p.stats["uptime_seconds"] = math.Floor(time.Now().Sub(p.startTime).Seconds())
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
defer cancel()
// cpu and memory usage information
err := getMemoryStats(p)
if err != nil {
logrus.WithError(err).Error("unable to get memory/cpu stats")
return
}
// configuration information
p.stats["federation_disabled"] = p.cfg.Global.DisableFederation
p.stats["nats_embedded"] = true
p.stats["nats_in_memory"] = p.cfg.Global.JetStream.InMemory
if len(p.cfg.Global.JetStream.Addresses) > 0 {
p.stats["nats_embedded"] = false
p.stats["nats_in_memory"] = false // probably
}
if len(p.cfg.Logging) > 0 {
p.stats["log_level"] = p.cfg.Logging[0].Level
} else {
p.stats["log_level"] = "info"
}
// database configuration
db, err := sqlutil.Open(&p.cfg.SyncAPI.Database)
if err != nil {
logrus.WithError(err).Error("unable to database")
return
}
defer db.Close()
dbVersion := "unknown"
dbEngine := "unknown"
switch {
case p.cfg.SyncAPI.Database.ConnectionString.IsSQLite():
dbEngine = "SQLite"
row := db.QueryRow("select sqlite_version();")
if err := row.Scan(&dbVersion); err != nil {
logrus.WithError(err).Error("unable to query version")
return
}
case p.cfg.SyncAPI.Database.ConnectionString.IsPostgres():
dbEngine = "Postgres"
row := db.QueryRow("SHOW server_version;")
if err := row.Scan(&dbVersion); err != nil {
logrus.WithError(err).Error("unable to query version")
return
}
}
p.stats["database_engine"] = dbEngine
p.stats["database_server_version"] = dbVersion
// message and room stats
rooms, err := p.db.TotalRooms(ctx)
if err != nil {
logrus.WithError(err).Error("unable to query TotalRooms")
}
p.stats["total_room_count"] = rooms
messages, err := p.db.DailyMessages(ctx, 0)
if err != nil {
logrus.WithError(err).Error("unable to query DailyMessages")
return
}
p.stats["daily_messages"] = messages
messages, err = p.db.DailySentMessages(ctx, 0)
if err != nil {
logrus.WithError(err).Error("unable to query DailySentMessages")
return
}
p.stats["daily_sent_messages"] = messages
messages, err = p.db.DailyE2EEMessages(ctx, 0)
if err != nil {
logrus.WithError(err).Error("unable to query DailyE2EEMessages")
return
}
p.stats["daily_e2ee_messages"] = messages
messages, err = p.db.DailySentE2EEMessages(ctx, 0)
if err != nil {
logrus.WithError(err).Error("unable to query DailySentE2EEMessages")
return
}
p.stats["daily_sent_e2ee_messages"] = messages
integerRes := &userapi.IntegerResponse{}
if err = p.userAPI.AllUsers(ctx, integerRes); err != nil {
logrus.WithError(err).Error("unable to query AllUsers")
return
}
p.stats["total_users"] = integerRes.Count
if err = p.userAPI.NonBridgedUsers(ctx, integerRes); err != nil {
logrus.WithError(err).Error("unable to query NonBridgedUsers")
return
}
p.stats["total_nonbridged_users"] = integerRes.Count
if err = p.userAPI.DailyUsers(ctx, integerRes); err != nil {
logrus.WithError(err).Error("unable to query DailyUsers")
return
}
p.stats["daily_active_users"] = integerRes.Count
if err = p.userAPI.MonthlyUsers(ctx, integerRes); err != nil {
logrus.WithError(err).Error("unable to query MonthlyUsers")
return
}
p.stats["monthly_active_users"] = integerRes.Count
mapRes := &userapi.MapResponse{}
if err = p.userAPI.RegisteredUserByType(ctx, mapRes); err != nil {
logrus.WithError(err).Error("unable to query RegisteredUserByType")
return
}
for t, c := range mapRes.Result {
p.stats["daily_user_type_"+t] = c
}
if err = p.userAPI.R30Users(ctx, mapRes); err != nil {
logrus.WithError(err).Error("unable to query R30Users")
return
}
for t, c := range mapRes.Result {
p.stats["r30_users_"+t] = c
}
if err = p.userAPI.R30UsersV2(ctx, mapRes); err != nil {
logrus.WithError(err).Error("unable to query R30UsersV2")
return
}
for t, c := range mapRes.Result {
p.stats["r30v2_users_"+t] = c
}
output := bytes.Buffer{}
if err = json.NewEncoder(&output).Encode(p.stats); err != nil {
logrus.WithError(err).Error("unable to encode stats")
return
}
logrus.Infof("Reporting stats to %s: %s", p.cfg.Global.ReportStatsEndpoint, output.String())
request, err := http.NewRequest("POST", p.cfg.Global.ReportStatsEndpoint, &output)
if err != nil {
logrus.WithError(err).Error("unable to create phone home stats request")
return
}
request.Header.Set("User-Agent", "Dendrite/"+internal.VersionString())
_, err = p.client.Do(request)
if err != nil {
logrus.WithError(err).Error("unable to send phone home stats")
return
}
}

View file

@ -45,24 +45,6 @@ type UserInternalAPI interface {
QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error
QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error
QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error
AllUsers(ctx context.Context, res *IntegerResponse) error
NonBridgedUsers(ctx context.Context, res *IntegerResponse) error
RegisteredUserByType(ctx context.Context, res *MapResponse) error
DailyUsers(ctx context.Context, res *IntegerResponse) error
MonthlyUsers(ctx context.Context, res *IntegerResponse) error
R30Users(ctx context.Context, res *MapResponse) error
R30UsersV2(ctx context.Context, res *MapResponse) error
}
type EmptyRequest struct{}
type IntegerResponse struct {
Count int64
}
type MapResponse struct {
Result map[string]int64
}
type PerformKeyBackupRequest struct {

View file

@ -119,48 +119,6 @@ func (t *UserInternalAPITrace) QueryOpenIDToken(ctx context.Context, req *QueryO
return err
}
func (t *UserInternalAPITrace) AllUsers(ctx context.Context, res *IntegerResponse) error {
err := t.Impl.AllUsers(ctx, res)
util.GetLogger(ctx).Infof("QueryOpenIDToken res=%+v", js(res))
return err
}
func (t *UserInternalAPITrace) NonBridgedUsers(ctx context.Context, res *IntegerResponse) error {
err := t.Impl.NonBridgedUsers(ctx, res)
util.GetLogger(ctx).Infof("NonBridgedUsers res=%+v", js(res))
return err
}
func (t *UserInternalAPITrace) RegisteredUserByType(ctx context.Context, res *MapResponse) error {
err := t.Impl.RegisteredUserByType(ctx, res)
util.GetLogger(ctx).Infof("RegisteredUserByType res=%+v", js(res))
return err
}
func (t *UserInternalAPITrace) DailyUsers(ctx context.Context, res *IntegerResponse) error {
err := t.Impl.DailyUsers(ctx, res)
util.GetLogger(ctx).Infof("DailyUsers res=%+v", js(res))
return err
}
func (t *UserInternalAPITrace) MonthlyUsers(ctx context.Context, res *IntegerResponse) error {
err := t.Impl.MonthlyUsers(ctx, res)
util.GetLogger(ctx).Infof("MonthlyUsers res=%+v", js(res))
return err
}
func (t *UserInternalAPITrace) R30Users(ctx context.Context, res *MapResponse) error {
err := t.Impl.R30Users(ctx, res)
util.GetLogger(ctx).Infof("R30Users res=%+v", js(res))
return err
}
func (t *UserInternalAPITrace) R30UsersV2(ctx context.Context, res *MapResponse) error {
err := t.Impl.R30UsersV2(ctx, res)
util.GetLogger(ctx).Infof("R30UsersV2 res=%+v", js(res))
return err
}
func js(thing interface{}) string {
b, err := json.Marshal(thing)
if err != nil {

View file

@ -595,66 +595,3 @@ func (a *UserInternalAPI) QueryKeyBackup(ctx context.Context, req *api.QueryKeyB
}
res.Keys = result
}
func (a *UserInternalAPI) AllUsers(ctx context.Context, res *api.IntegerResponse) error {
count, err := a.DB.AllUsers(ctx)
if err != nil {
return err
}
res.Count = count
return nil
}
func (a *UserInternalAPI) NonBridgedUsers(ctx context.Context, res *api.IntegerResponse) error {
count, err := a.DB.NonBridgedUsers(ctx)
if err != nil {
return err
}
res.Count = count
return nil
}
func (a *UserInternalAPI) RegisteredUserByType(ctx context.Context, res *api.MapResponse) error {
data, err := a.DB.RegisteredUserByType(ctx)
if err != nil {
return err
}
res.Result = data
return nil
}
func (a *UserInternalAPI) DailyUsers(ctx context.Context, res *api.IntegerResponse) error {
count, err := a.DB.DailyUsers(ctx)
if err != nil {
return err
}
res.Count = count
return nil
}
func (a *UserInternalAPI) MonthlyUsers(ctx context.Context, res *api.IntegerResponse) error {
count, err := a.DB.MonthlyUsers(ctx)
if err != nil {
return err
}
res.Count = count
return nil
}
func (a *UserInternalAPI) R30Users(ctx context.Context, res *api.MapResponse) error {
data, err := a.DB.R30Users(ctx)
if err != nil {
return err
}
res.Result = data
return nil
}
func (a *UserInternalAPI) R30UsersV2(ctx context.Context, res *api.MapResponse) error {
data, err := a.DB.R30UsersV2(ctx)
if err != nil {
return err
}
res.Result = data
return nil
}

View file

@ -46,14 +46,6 @@ const (
QueryDeviceInfosPath = "/userapi/queryDeviceInfos"
QuerySearchProfilesPath = "/userapi/querySearchProfiles"
QueryOpenIDTokenPath = "/userapi/queryOpenIDToken"
StatsAllUsers = "/userapi/stats/allUsers"
StatsNonBridgedUsers = "/userapi/stats/nonBridgedUsers"
StatsRegisteredUserByType = "/userapi/stats/registeredUserByType"
StatsDailyUsers = "/userapi/stats/dailyUsers"
StatsMonthlyUsers = "/userapi/stats/monthlyUsers"
StatsR30Users = "/userapi/stats/r30Users"
StatsR30UsersV2 = "/userapi/stats/r30UsersV2"
)
// NewUserAPIClient creates a UserInternalAPI implemented by talking to a HTTP POST API.
@ -256,60 +248,4 @@ func (h *httpUserInternalAPI) QueryKeyBackup(ctx context.Context, req *api.Query
if err != nil {
res.Error = err.Error()
}
}
func (h *httpUserInternalAPI) AllUsers(ctx context.Context, res *api.IntegerResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "AllUsers")
defer span.Finish()
apiURL := h.apiURL + StatsAllUsers
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, api.EmptyRequest{}, res)
}
func (h *httpUserInternalAPI) NonBridgedUsers(ctx context.Context, res *api.IntegerResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "NonBridgedUsers")
defer span.Finish()
apiURL := h.apiURL + StatsNonBridgedUsers
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, api.EmptyRequest{}, res)
}
func (h *httpUserInternalAPI) RegisteredUserByType(ctx context.Context, res *api.MapResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "RegisteredUserByType")
defer span.Finish()
apiURL := h.apiURL + StatsRegisteredUserByType
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, api.EmptyRequest{}, res)
}
func (h *httpUserInternalAPI) DailyUsers(ctx context.Context, res *api.IntegerResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "DailyUsers")
defer span.Finish()
apiURL := h.apiURL + StatsDailyUsers
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, api.EmptyRequest{}, res)
}
func (h *httpUserInternalAPI) MonthlyUsers(ctx context.Context, res *api.IntegerResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "MonthlyUsers")
defer span.Finish()
apiURL := h.apiURL + StatsMonthlyUsers
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, api.EmptyRequest{}, res)
}
func (h *httpUserInternalAPI) R30Users(ctx context.Context, res *api.MapResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "R30Users")
defer span.Finish()
apiURL := h.apiURL + StatsR30Users
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, api.EmptyRequest{}, res)
}
func (h *httpUserInternalAPI) R30UsersV2(ctx context.Context, res *api.MapResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "R30UsersV2")
defer span.Finish()
apiURL := h.apiURL + StatsR30UsersV2
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, api.EmptyRequest{}, res)
}

View file

@ -265,74 +265,4 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(StatsAllUsers,
httputil.MakeInternalAPI("statsAllUsers", func(req *http.Request) util.JSONResponse {
response := api.IntegerResponse{}
err := s.AllUsers(req.Context(), &response)
if err != nil {
return util.JSONResponse{Code: http.StatusBadRequest, JSON: &response}
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(StatsNonBridgedUsers,
httputil.MakeInternalAPI("statsNonBridgedUsers", func(req *http.Request) util.JSONResponse {
response := api.IntegerResponse{}
err := s.NonBridgedUsers(req.Context(), &response)
if err != nil {
return util.JSONResponse{Code: http.StatusBadRequest, JSON: &response}
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(StatsDailyUsers,
httputil.MakeInternalAPI("statsDailyUsers", func(req *http.Request) util.JSONResponse {
response := api.IntegerResponse{}
err := s.DailyUsers(req.Context(), &response)
if err != nil {
return util.JSONResponse{Code: http.StatusBadRequest, JSON: &response}
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(StatsMonthlyUsers,
httputil.MakeInternalAPI("statsMonthlyUsers", func(req *http.Request) util.JSONResponse {
response := api.IntegerResponse{}
err := s.MonthlyUsers(req.Context(), &response)
if err != nil {
return util.JSONResponse{Code: http.StatusBadRequest, JSON: &response}
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(StatsRegisteredUserByType,
httputil.MakeInternalAPI("statsRegisteredUserByType", func(req *http.Request) util.JSONResponse {
response := api.MapResponse{}
err := s.RegisteredUserByType(req.Context(), &response)
if err != nil {
return util.JSONResponse{Code: http.StatusBadRequest, JSON: &response}
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(StatsR30Users,
httputil.MakeInternalAPI("statsR30Users", func(req *http.Request) util.JSONResponse {
response := api.MapResponse{}
err := s.R30Users(req.Context(), &response)
if err != nil {
return util.JSONResponse{Code: http.StatusBadRequest, JSON: &response}
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(StatsR30UsersV2,
httputil.MakeInternalAPI("statsR30UsersV2", func(req *http.Request) util.JSONResponse {
response := api.MapResponse{}
err := s.R30Users(req.Context(), &response)
if err != nil {
return util.JSONResponse{Code: http.StatusBadRequest, JSON: &response}
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -15,7 +15,7 @@
//go:build !wasm
// +build !wasm
package syncapi
package userapi
import (
"math"

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package syncapi
package userapi
// stub, since WASM doesn't support syscall.Getrusage
func getMemoryStats(p *phoneHomeStats) error {

View file

@ -15,15 +15,25 @@
package userapi
import (
"bytes"
"context"
"encoding/json"
"math"
"net/http"
"runtime"
"syscall"
"time"
"github.com/gorilla/mux"
version "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/internal"
"github.com/matrix-org/dendrite/userapi/inthttp"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
@ -59,3 +69,199 @@ func newInternalAPI(
KeyAPI: keyAPI,
}
}
type phoneHomeStats struct {
prevData timestampToRUUsage
stats map[string]interface{}
serverName gomatrixserverlib.ServerName
startTime time.Time
cfg *config.Dendrite
db storage.Database
isMonolith bool
client *http.Client
}
type timestampToRUUsage struct {
timestamp int64
usage syscall.Rusage
}
func startPhoneHomeCollector(startTime time.Time, cfg *config.Dendrite, userDB storage.Database, isMonolith bool) {
p := phoneHomeStats{
startTime: startTime,
serverName: cfg.Global.ServerName,
cfg: cfg,
db: userDB,
isMonolith: isMonolith,
client: &http.Client{
Timeout: time.Second * 30,
},
}
// start initial run after 5min
time.AfterFunc(time.Second*1, func() {
p.collect()
})
// run every 3 hours
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ticker.C:
p.collect()
}
}
}
func (p *phoneHomeStats) collect() {
p.stats = make(map[string]interface{})
// general information
p.stats["homeserver"] = p.serverName
p.stats["monolith"] = p.isMonolith
p.stats["version"] = version.VersionString()
p.stats["timestamp"] = time.Now().Unix()
p.stats["go_version"] = runtime.Version()
p.stats["go_arch"] = runtime.GOARCH
p.stats["go_os"] = runtime.GOOS
p.stats["num_cpu"] = runtime.NumCPU()
p.stats["num_go_routine"] = runtime.NumGoroutine()
p.stats["uptime_seconds"] = math.Floor(time.Now().Sub(p.startTime).Seconds())
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
defer cancel()
// cpu and memory usage information
err := getMemoryStats(p)
if err != nil {
logrus.WithError(err).Error("unable to get memory/cpu stats")
return
}
// configuration information
p.stats["federation_disabled"] = p.cfg.Global.DisableFederation
p.stats["nats_embedded"] = true
p.stats["nats_in_memory"] = p.cfg.Global.JetStream.InMemory
if len(p.cfg.Global.JetStream.Addresses) > 0 {
p.stats["nats_embedded"] = false
p.stats["nats_in_memory"] = false // probably
}
if len(p.cfg.Logging) > 0 {
p.stats["log_level"] = p.cfg.Logging[0].Level
} else {
p.stats["log_level"] = "info"
}
// database configuration
db, err := sqlutil.Open(&p.cfg.SyncAPI.Database)
if err != nil {
logrus.WithError(err).Error("unable to database")
return
}
defer db.Close()
dbVersion := "unknown"
dbEngine := "unknown"
switch {
case p.cfg.SyncAPI.Database.ConnectionString.IsSQLite():
dbEngine = "SQLite"
row := db.QueryRow("select sqlite_version();")
if err := row.Scan(&dbVersion); err != nil {
logrus.WithError(err).Error("unable to query version")
return
}
case p.cfg.SyncAPI.Database.ConnectionString.IsPostgres():
dbEngine = "Postgres"
row := db.QueryRow("SHOW server_version;")
if err := row.Scan(&dbVersion); err != nil {
logrus.WithError(err).Error("unable to query version")
return
}
}
p.stats["database_engine"] = dbEngine
p.stats["database_server_version"] = dbVersion
// message and room stats
// TODO: Find a solution to actually set these values
p.stats["total_room_count"] = 0
p.stats["daily_messages"] = 0
p.stats["daily_sent_messages"] = 0
p.stats["daily_e2ee_messages"] = 0
p.stats["daily_sent_e2ee_messages"] = 0
count, err := p.db.AllUsers(ctx)
if err != nil {
logrus.WithError(err).Error("unable to query AllUsers")
return
}
p.stats["total_users"] = count
count, err = p.db.NonBridgedUsers(ctx)
if err != nil {
logrus.WithError(err).Error("unable to query NonBridgedUsers")
return
}
p.stats["total_nonbridged_users"] = count
count, err = p.db.DailyUsers(ctx)
if err != nil {
logrus.WithError(err).Error("unable to query DailyUsers")
return
}
p.stats["daily_active_users"] = count
count, err = p.db.MonthlyUsers(ctx)
if err != nil {
logrus.WithError(err).Error("unable to query MonthlyUsers")
return
}
p.stats["monthly_active_users"] = count
res, err := p.db.RegisteredUserByType(ctx)
if err != nil {
logrus.WithError(err).Error("unable to query RegisteredUserByType")
return
}
for t, c := range res {
p.stats["daily_user_type_"+t] = c
}
res, err = p.db.R30Users(ctx)
if err != nil {
logrus.WithError(err).Error("unable to query R30Users")
return
}
for t, c := range res {
p.stats["r30_users_"+t] = c
}
res, err = p.db.R30UsersV2(ctx)
if err != nil {
logrus.WithError(err).Error("unable to query R30UsersV2")
return
}
for t, c := range res {
p.stats["r30v2_users_"+t] = c
}
output := bytes.Buffer{}
if err = json.NewEncoder(&output).Encode(p.stats); err != nil {
logrus.WithError(err).Error("unable to encode stats")
return
}
logrus.Infof("Reporting stats to %s: %s", p.cfg.Global.ReportStatsEndpoint, output.String())
request, err := http.NewRequest("POST", p.cfg.Global.ReportStatsEndpoint, &output)
if err != nil {
logrus.WithError(err).Error("unable to create phone home stats request")
return
}
request.Header.Set("User-Agent", "Dendrite/"+version.VersionString())
_, err = p.client.Do(request)
if err != nil {
logrus.WithError(err).Error("unable to send phone home stats")
return
}
}