Rename table, also report active rooms

This commit is contained in:
Till Faelligen 2022-09-29 12:52:34 +02:00
parent 78398775fd
commit 264789ddb9
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
8 changed files with 152 additions and 81 deletions

View file

@ -29,18 +29,20 @@ import (
)
type OutputRoomEventConsumer struct {
ctx context.Context
cfg *config.UserAPI
rsAPI rsapi.UserRoomserverAPI
jetstream nats.JetStreamContext
durable string
db storage.Database
topic string
pgClient pushgateway.Client
syncProducer *producers.SyncAPI
msgCounts map[gomatrixserverlib.ServerName]userAPITypes.MessageStats
msgCountsLock sync.Mutex
serverName gomatrixserverlib.ServerName
ctx context.Context
cfg *config.UserAPI
rsAPI rsapi.UserRoomserverAPI
jetstream nats.JetStreamContext
durable string
db storage.Database
topic string
pgClient pushgateway.Client
syncProducer *producers.SyncAPI
msgCounts map[gomatrixserverlib.ServerName]userAPITypes.MessageStats
roomCounts map[gomatrixserverlib.ServerName]map[string]bool // map from serverName to map from rommID to "isEncrypted"
lastUpdate time.Time
countsLock sync.Mutex
serverName gomatrixserverlib.ServerName
}
func NewOutputRoomEventConsumer(
@ -53,18 +55,20 @@ func NewOutputRoomEventConsumer(
syncProducer *producers.SyncAPI,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
cfg: cfg,
jetstream: js,
db: store,
durable: cfg.Matrix.JetStream.Durable("UserAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
pgClient: pgClient,
rsAPI: rsAPI,
syncProducer: syncProducer,
msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{},
msgCountsLock: sync.Mutex{},
serverName: cfg.Matrix.ServerName,
ctx: process.Context(),
cfg: cfg,
jetstream: js,
db: store,
durable: cfg.Matrix.JetStream.Durable("UserAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
pgClient: pgClient,
rsAPI: rsAPI,
syncProducer: syncProducer,
msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{},
roomCounts: map[gomatrixserverlib.ServerName]map[string]bool{},
lastUpdate: time.Now(),
countsLock: sync.Mutex{},
serverName: cfg.Matrix.ServerName,
}
}
@ -95,7 +99,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
return true
}
s.storeMessageStats(ctx, event.Type(), event.Sender())
go s.storeMessageStats(ctx, event.Type(), event.Sender(), event.RoomID())
log.WithFields(log.Fields{
"event_id": event.EventID(),
@ -116,22 +120,33 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
return true
}
func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventType, eventSender string) {
s.msgCountsLock.Lock()
defer s.msgCountsLock.Unlock()
func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventType, eventSender, roomID string) {
s.countsLock.Lock()
defer s.countsLock.Unlock()
// reset the roomCounts on a day change
if s.lastUpdate.Day() != time.Now().Day() {
s.roomCounts[s.serverName] = make(map[string]bool)
}
_, sender, err := gomatrixserverlib.SplitID('@', eventSender)
if err != nil {
return
}
msgCount := s.msgCounts[s.serverName]
roomCount := s.roomCounts[s.serverName]
if roomCount == nil {
roomCount = make(map[string]bool)
}
switch eventType {
case "m.room.message":
roomCount[roomID] = false
msgCount.Messages++
if sender == s.serverName {
msgCount.SentMessages++
}
case "m.room.encrypted":
roomCount[roomID] = true
msgCount.MessagesE2EE++
if sender == s.serverName {
msgCount.SentMessagesE2EE++
@ -140,8 +155,18 @@ func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventTy
return
}
s.msgCounts[s.serverName] = msgCount
s.roomCounts[s.serverName] = roomCount
for serverName, stats := range s.msgCounts {
err := s.db.UpsertDailyMessages(ctx, serverName, stats)
var normalRooms, encryptedRooms int64 = 0, 0
for _, isEncrypted := range s.roomCounts[s.serverName] {
if isEncrypted {
encryptedRooms++
} else {
normalRooms++
}
}
err := s.db.UpsertDailyRoomsMessages(ctx, serverName, stats, normalRooms, encryptedRooms)
if err != nil {
log.WithError(err).Errorf("failed to upsert daily messages")
}

View file

@ -5,6 +5,7 @@ import (
"reflect"
"sync"
"testing"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/stretchr/testify/assert"
@ -135,12 +136,15 @@ func TestMessageStats(t *testing.T) {
type args struct {
eventType string
eventSender string
roomID string
}
tests := []struct {
name string
args args
ourServer gomatrixserverlib.ServerName
wantStats userAPITypes.MessageStats
name string
args args
ourServer gomatrixserverlib.ServerName
lastUpdate time.Time
initRoomCounts map[gomatrixserverlib.ServerName]map[string]bool
wantStats userAPITypes.MessageStats
}{
{
name: "m.room.create does not count as a message",
@ -156,6 +160,7 @@ func TestMessageStats(t *testing.T) {
args: args{
eventType: "m.room.message",
eventSender: "@alice:localhost",
roomID: "normalRoom",
},
wantStats: userAPITypes.MessageStats{Messages: 1, SentMessages: 1},
},
@ -165,6 +170,7 @@ func TestMessageStats(t *testing.T) {
args: args{
eventType: "m.room.encrypted",
eventSender: "@alice:localhost",
roomID: "encryptedRoom",
},
wantStats: userAPITypes.MessageStats{Messages: 1, SentMessages: 1, MessagesE2EE: 1, SentMessagesE2EE: 1},
},
@ -175,6 +181,7 @@ func TestMessageStats(t *testing.T) {
args: args{
eventType: "m.room.message",
eventSender: "@alice:remote",
roomID: "normalRoom",
},
wantStats: userAPITypes.MessageStats{Messages: 2, SentMessages: 1, MessagesE2EE: 1, SentMessagesE2EE: 1},
},
@ -184,9 +191,24 @@ func TestMessageStats(t *testing.T) {
args: args{
eventType: "m.room.encrypted",
eventSender: "@alice:remote",
roomID: "encryptedRoom",
},
wantStats: userAPITypes.MessageStats{Messages: 2, SentMessages: 1, MessagesE2EE: 2, SentMessagesE2EE: 1},
},
{
name: "day change creates a new room map",
ourServer: "localhost",
lastUpdate: time.Now().Add(-time.Hour * 24),
initRoomCounts: map[gomatrixserverlib.ServerName]map[string]bool{
"localhost": {"encryptedRoom": true},
},
args: args{
eventType: "m.room.encrypted",
eventSender: "@alice:remote",
roomID: "someOtherRoom",
},
wantStats: userAPITypes.MessageStats{Messages: 2, SentMessages: 1, MessagesE2EE: 3, SentMessagesE2EE: 1},
},
}
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
@ -195,21 +217,35 @@ func TestMessageStats(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &OutputRoomEventConsumer{
db: db,
msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{},
msgCountsLock: sync.Mutex{},
serverName: tt.ourServer,
if tt.lastUpdate.IsZero() {
tt.lastUpdate = time.Now()
}
s.storeMessageStats(context.Background(), tt.args.eventType, tt.args.eventSender)
gotStats, err := db.DailyMessages(context.Background(), tt.ourServer)
if tt.initRoomCounts == nil {
tt.initRoomCounts = map[gomatrixserverlib.ServerName]map[string]bool{}
}
s := &OutputRoomEventConsumer{
db: db,
msgCounts: map[gomatrixserverlib.ServerName]userAPITypes.MessageStats{},
roomCounts: tt.initRoomCounts,
countsLock: sync.Mutex{},
lastUpdate: tt.lastUpdate,
serverName: tt.ourServer,
}
s.storeMessageStats(context.Background(), tt.args.eventType, tt.args.eventSender, tt.args.roomID)
t.Logf("%+v", s.roomCounts)
gotStats, activeRooms, activeE2EERooms, err := db.DailyRoomsMessages(context.Background(), tt.ourServer)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !reflect.DeepEqual(gotStats, tt.wantStats) {
t.Fatalf("expected %+v, got %+v", tt.wantStats, gotStats)
}
if tt.args.eventType == "m.room.encrypted" && activeE2EERooms != 1 {
t.Fatalf("expected room to be activeE2EE")
}
if tt.args.eventType == "m.room.message" && activeRooms != 1 {
t.Fatalf("expected room to be active")
}
})
}
})

View file

@ -146,8 +146,8 @@ type Database interface {
type Statistics interface {
UserStatistics(ctx context.Context) (*types.UserStatistics, *types.DatabaseEngine, error)
DailyMessages(ctx context.Context, serverName gomatrixserverlib.ServerName) (types.MessageStats, error)
UpsertDailyMessages(ctx context.Context, serverName gomatrixserverlib.ServerName, stats types.MessageStats) error
DailyRoomsMessages(ctx context.Context, serverName gomatrixserverlib.ServerName) (stats types.MessageStats, activeRooms, activeE2EERooms int64, err error)
UpsertDailyRoomsMessages(ctx context.Context, serverName gomatrixserverlib.ServerName, stats types.MessageStats, activeRooms, activeE2EERooms int64) error
}
// Err3PIDInUse is the error returned when trying to save an association involving

View file

@ -45,28 +45,31 @@ CREATE INDEX IF NOT EXISTS userapi_daily_visits_localpart_timestamp_idx ON usera
`
const messagesDailySchema = `
CREATE TABLE IF NOT EXISTS userapi_daily_messages (
CREATE TABLE IF NOT EXISTS userapi_daily_stats (
timestamp BIGINT NOT NULL,
server_name TEXT NOT NULL,
daily_messages BIGINT NOT NULL,
daily_sent_messages BIGINT NOT NULL,
daily_e2ee_messages BIGINT NOT NULL,
daily_sent_e2ee_messages BIGINT NOT NULL,
CONSTRAINT daily_messages_unique UNIQUE (timestamp, server_name)
daily_active_rooms BIGINT NOT NULL,
daily_active_e2ee_rooms BIGINT NOT NULL,
CONSTRAINT daily_stats_unique UNIQUE (timestamp, server_name)
);
`
const upsertDailyMessagesSQL = `
INSERT INTO userapi_daily_messages AS u (timestamp, server_name, daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT daily_messages_unique
INSERT INTO userapi_daily_stats AS u (timestamp, server_name, daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages, daily_active_rooms, daily_active_e2ee_rooms)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT ON CONSTRAINT daily_stats_unique
DO UPDATE SET
daily_messages=u.daily_messages+excluded.daily_messages, daily_sent_messages=u.daily_sent_messages+excluded.daily_sent_messages,
daily_e2ee_messages=u.daily_e2ee_messages+excluded.daily_e2ee_messages, daily_sent_e2ee_messages=u.daily_sent_e2ee_messages+excluded.daily_sent_e2ee_messages
daily_e2ee_messages=u.daily_e2ee_messages+excluded.daily_e2ee_messages, daily_sent_e2ee_messages=u.daily_sent_e2ee_messages+excluded.daily_sent_e2ee_messages,
daily_active_rooms=$7, daily_active_e2ee_rooms=$8
`
const selectDailyMessagesSQL = `
SELECT daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages
FROM userapi_daily_messages
SELECT daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages, daily_active_rooms, daily_active_e2ee_rooms
FROM userapi_daily_stats
WHERE server_name = $1 AND timestamp = $2;
`
@ -471,9 +474,10 @@ func (s *statsStatements) UpdateUserDailyVisits(
return err
}
func (s *statsStatements) UpsertDailyMessages(
func (s *statsStatements) UpsertDailyStats(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, stats types.MessageStats,
activeRooms, activeE2EERooms int64,
) error {
stmt := sqlutil.TxStmt(txn, s.upsertMessagesStmt)
timestamp := time.Now().Truncate(time.Hour * 24)
@ -481,22 +485,22 @@ func (s *statsStatements) UpsertDailyMessages(
gomatrixserverlib.AsTimestamp(timestamp),
serverName,
stats.Messages, stats.SentMessages, stats.MessagesE2EE, stats.SentMessagesE2EE,
activeRooms, activeE2EERooms,
)
return err
}
func (s *statsStatements) DailyMessages(
func (s *statsStatements) DailyRoomsMessages(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName,
) (types.MessageStats, error) {
) (msgStats types.MessageStats, activeRooms, activeE2EERooms int64, err error) {
stmt := sqlutil.TxStmt(txn, s.selectDailyMessagesStmt)
timestamp := time.Now().Truncate(time.Hour * 24)
res := types.MessageStats{}
err := stmt.QueryRowContext(ctx, serverName, gomatrixserverlib.AsTimestamp(timestamp)).
Scan(&res.Messages, &res.SentMessages, &res.MessagesE2EE, &res.SentMessagesE2EE)
err = stmt.QueryRowContext(ctx, serverName, gomatrixserverlib.AsTimestamp(timestamp)).
Scan(&msgStats.Messages, &msgStats.SentMessages, &msgStats.MessagesE2EE, &msgStats.SentMessagesE2EE, &activeRooms, &activeE2EERooms)
if err != nil && err != sql.ErrNoRows {
return res, err
return msgStats, 0, 0, err
}
return res, nil
return msgStats, activeRooms, activeE2EERooms, nil
}

View file

@ -801,14 +801,14 @@ func (d *Database) UserStatistics(ctx context.Context) (*types.UserStatistics, *
return d.Stats.UserStatistics(ctx, nil)
}
func (d *Database) UpsertDailyMessages(ctx context.Context, serverName gomatrixserverlib.ServerName, stats types.MessageStats) error {
func (d *Database) UpsertDailyRoomsMessages(ctx context.Context, serverName gomatrixserverlib.ServerName, stats types.MessageStats, activeRooms, activeE2EERooms int64) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.Stats.UpsertDailyMessages(ctx, txn, serverName, stats)
return d.Stats.UpsertDailyStats(ctx, txn, serverName, stats, activeRooms, activeE2EERooms)
})
}
func (d *Database) DailyMessages(
func (d *Database) DailyRoomsMessages(
ctx context.Context, serverName gomatrixserverlib.ServerName,
) (types.MessageStats, error) {
return d.Stats.DailyMessages(ctx, nil, serverName)
) (stats types.MessageStats, activeRooms, activeE2EERooms int64, err error) {
return d.Stats.DailyRoomsMessages(ctx, nil, serverName)
}

View file

@ -45,28 +45,31 @@ CREATE INDEX IF NOT EXISTS userapi_daily_visits_localpart_timestamp_idx ON usera
`
const messagesDailySchema = `
CREATE TABLE IF NOT EXISTS userapi_daily_messages (
CREATE TABLE IF NOT EXISTS userapi_daily_stats (
timestamp BIGINT NOT NULL,
server_name TEXT NOT NULL,
daily_messages BIGINT NOT NULL,
daily_sent_messages BIGINT NOT NULL,
daily_e2ee_messages BIGINT NOT NULL,
daily_sent_e2ee_messages BIGINT NOT NULL,
CONSTRAINT daily_messages_unique UNIQUE (timestamp, server_name)
daily_active_rooms BIGINT NOT NULL,
daily_active_e2ee_rooms BIGINT NOT NULL,
CONSTRAINT daily_stats_unique UNIQUE (timestamp, server_name)
);
`
const upsertDailyMessagesSQL = `
INSERT INTO userapi_daily_messages (timestamp, server_name, daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (timestamp, server_name)
INSERT INTO userapi_daily_stats (timestamp, server_name, daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages, daily_active_rooms, daily_active_e2ee_rooms)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (timestamp, server_name)
DO UPDATE SET
daily_messages=daily_messages+excluded.daily_messages, daily_sent_messages=daily_sent_messages+excluded.daily_sent_messages,
daily_e2ee_messages=daily_e2ee_messages+excluded.daily_e2ee_messages, daily_sent_e2ee_messages=daily_sent_e2ee_messages+excluded.daily_sent_e2ee_messages
daily_e2ee_messages=daily_e2ee_messages+excluded.daily_e2ee_messages, daily_sent_e2ee_messages=daily_sent_e2ee_messages+excluded.daily_sent_e2ee_messages,
daily_active_rooms=$7, daily_active_e2ee_rooms=$8
`
const selectDailyMessagesSQL = `
SELECT daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages
FROM userapi_daily_messages
SELECT daily_messages, daily_sent_messages, daily_e2ee_messages, daily_sent_e2ee_messages, daily_active_rooms, daily_active_e2ee_rooms
FROM userapi_daily_stats
WHERE server_name = $1 AND timestamp = $2;
`
@ -486,9 +489,10 @@ func (s *statsStatements) UpdateUserDailyVisits(
return err
}
func (s *statsStatements) UpsertDailyMessages(
func (s *statsStatements) UpsertDailyStats(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName, stats types.MessageStats,
activeRooms, activeE2EERooms int64,
) error {
stmt := sqlutil.TxStmt(txn, s.upsertMessagesStmt)
timestamp := time.Now().Truncate(time.Hour * 24)
@ -496,22 +500,22 @@ func (s *statsStatements) UpsertDailyMessages(
gomatrixserverlib.AsTimestamp(timestamp),
serverName,
stats.Messages, stats.SentMessages, stats.MessagesE2EE, stats.SentMessagesE2EE,
activeRooms, activeE2EERooms,
)
return err
}
func (s *statsStatements) DailyMessages(
func (s *statsStatements) DailyRoomsMessages(
ctx context.Context, txn *sql.Tx,
serverName gomatrixserverlib.ServerName,
) (types.MessageStats, error) {
) (msgStats types.MessageStats, activeRooms, activeE2EERooms int64, err error) {
stmt := sqlutil.TxStmt(txn, s.selectDailyMessagesStmt)
timestamp := time.Now().Truncate(time.Hour * 24)
res := types.MessageStats{}
err := stmt.QueryRowContext(ctx, serverName, gomatrixserverlib.AsTimestamp(timestamp)).
Scan(&res.Messages, &res.SentMessages, &res.MessagesE2EE, &res.SentMessagesE2EE)
err = stmt.QueryRowContext(ctx, serverName, gomatrixserverlib.AsTimestamp(timestamp)).
Scan(&msgStats.Messages, &msgStats.SentMessages, &msgStats.MessagesE2EE, &msgStats.SentMessagesE2EE, &activeRooms, &activeE2EERooms)
if err != nil && err != sql.ErrNoRows {
return res, err
return msgStats, 0, 0, err
}
return res, nil
return msgStats, activeRooms, activeE2EERooms, nil
}

View file

@ -117,9 +117,9 @@ type NotificationTable interface {
type StatsTable interface {
UserStatistics(ctx context.Context, txn *sql.Tx) (*types.UserStatistics, *types.DatabaseEngine, error)
DailyMessages(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (types.MessageStats, error)
DailyRoomsMessages(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (msgStats types.MessageStats, activeRooms, activeE2EERooms int64, err error)
UpdateUserDailyVisits(ctx context.Context, txn *sql.Tx, startTime, lastUpdate time.Time) error
UpsertDailyMessages(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, stats types.MessageStats) error
UpsertDailyStats(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, stats types.MessageStats, activeRooms, activeE2EERooms int64) error
}
type NotificationFilter uint32

View file

@ -113,7 +113,7 @@ func (p *phoneHomeStats) collect() {
// TODO: Find a solution to actually set this value
p.stats["total_room_count"] = 0
messageStats, err := p.db.DailyMessages(ctx, p.serverName)
messageStats, activeRooms, activeE2EERooms, err := p.db.DailyRoomsMessages(ctx, p.serverName)
if err != nil {
logrus.WithError(err).Warn("unable to query message stats, using default values")
}
@ -121,6 +121,8 @@ func (p *phoneHomeStats) collect() {
p.stats["daily_sent_messages"] = messageStats.SentMessages
p.stats["daily_e2ee_messages"] = messageStats.MessagesE2EE
p.stats["daily_sent_e2ee_messages"] = messageStats.SentMessagesE2EE
p.stats["daily_active_rooms"] = activeRooms
p.stats["daily_active_e2ee_rooms"] = activeE2EERooms
// user stats and DB engine
userStats, db, err := p.db.UserStatistics(ctx)