mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-21 04:53:14 -06:00
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/helm
This commit is contained in:
commit
80890e3097
|
|
@ -231,9 +231,9 @@ GEM
|
||||||
jekyll-seo-tag (~> 2.1)
|
jekyll-seo-tag (~> 2.1)
|
||||||
minitest (5.15.0)
|
minitest (5.15.0)
|
||||||
multipart-post (2.1.1)
|
multipart-post (2.1.1)
|
||||||
nokogiri (1.13.9-arm64-darwin)
|
nokogiri (1.13.10-arm64-darwin)
|
||||||
racc (~> 1.4)
|
racc (~> 1.4)
|
||||||
nokogiri (1.13.9-x86_64-linux)
|
nokogiri (1.13.10-x86_64-linux)
|
||||||
racc (~> 1.4)
|
racc (~> 1.4)
|
||||||
octokit (4.22.0)
|
octokit (4.22.0)
|
||||||
faraday (>= 0.9)
|
faraday (>= 0.9)
|
||||||
|
|
@ -241,7 +241,7 @@ GEM
|
||||||
pathutil (0.16.2)
|
pathutil (0.16.2)
|
||||||
forwardable-extended (~> 2.6)
|
forwardable-extended (~> 2.6)
|
||||||
public_suffix (4.0.7)
|
public_suffix (4.0.7)
|
||||||
racc (1.6.0)
|
racc (1.6.1)
|
||||||
rb-fsevent (0.11.1)
|
rb-fsevent (0.11.1)
|
||||||
rb-inotify (0.10.1)
|
rb-inotify (0.10.1)
|
||||||
ffi (~> 1.0)
|
ffi (~> 1.0)
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,11 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// logrus is using a global variable when we're using `logrus.AddHook`
|
||||||
|
// this unfortunately results in us adding the same hook multiple times.
|
||||||
|
// This map ensures we only ever add one level hook.
|
||||||
|
var stdLevelLogAdded = make(map[logrus.Level]bool)
|
||||||
|
|
||||||
type utcFormatter struct {
|
type utcFormatter struct {
|
||||||
logrus.Formatter
|
logrus.Formatter
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,16 +22,16 @@ import (
|
||||||
"log/syslog"
|
"log/syslog"
|
||||||
|
|
||||||
"github.com/MFAshby/stdemuxerhook"
|
"github.com/MFAshby/stdemuxerhook"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
lSyslog "github.com/sirupsen/logrus/hooks/syslog"
|
lSyslog "github.com/sirupsen/logrus/hooks/syslog"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetupHookLogging configures the logging hooks defined in the configuration.
|
// SetupHookLogging configures the logging hooks defined in the configuration.
|
||||||
// If something fails here it means that the logging was improperly configured,
|
// If something fails here it means that the logging was improperly configured,
|
||||||
// so we just exit with the error
|
// so we just exit with the error
|
||||||
func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
|
func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
|
||||||
stdLogAdded := false
|
|
||||||
for _, hook := range hooks {
|
for _, hook := range hooks {
|
||||||
// Check we received a proper logging level
|
// Check we received a proper logging level
|
||||||
level, err := logrus.ParseLevel(hook.Level)
|
level, err := logrus.ParseLevel(hook.Level)
|
||||||
|
|
@ -54,14 +54,11 @@ func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
|
||||||
setupSyslogHook(hook, level, componentName)
|
setupSyslogHook(hook, level, componentName)
|
||||||
case "std":
|
case "std":
|
||||||
setupStdLogHook(level)
|
setupStdLogHook(level)
|
||||||
stdLogAdded = true
|
|
||||||
default:
|
default:
|
||||||
logrus.Fatalf("Unrecognised logging hook type: %s", hook.Type)
|
logrus.Fatalf("Unrecognised logging hook type: %s", hook.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !stdLogAdded {
|
setupStdLogHook(logrus.InfoLevel)
|
||||||
setupStdLogHook(logrus.InfoLevel)
|
|
||||||
}
|
|
||||||
// Hooks are now configured for stdout/err, so throw away the default logger output
|
// Hooks are now configured for stdout/err, so throw away the default logger output
|
||||||
logrus.SetOutput(io.Discard)
|
logrus.SetOutput(io.Discard)
|
||||||
}
|
}
|
||||||
|
|
@ -88,7 +85,11 @@ func checkSyslogHookParams(params map[string]interface{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupStdLogHook(level logrus.Level) {
|
func setupStdLogHook(level logrus.Level) {
|
||||||
|
if stdLevelLogAdded[level] {
|
||||||
|
return
|
||||||
|
}
|
||||||
logrus.AddHook(&logLevelHook{level, stdemuxerhook.New(logrus.StandardLogger())})
|
logrus.AddHook(&logLevelHook{level, stdemuxerhook.New(logrus.StandardLogger())})
|
||||||
|
stdLevelLogAdded[level] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupSyslogHook(hook config.LogrusHook, level logrus.Level, componentName string) {
|
func setupSyslogHook(hook config.LogrusHook, level logrus.Level, componentName string) {
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ func (s *PresenceConsumer) Start() error {
|
||||||
// Normal NATS subscription, used by Request/Reply
|
// Normal NATS subscription, used by Request/Reply
|
||||||
_, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) {
|
_, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) {
|
||||||
userID := msg.Header.Get(jetstream.UserID)
|
userID := msg.Header.Get(jetstream.UserID)
|
||||||
presence, err := s.db.GetPresence(context.Background(), userID)
|
presences, err := s.db.GetPresences(context.Background(), []string{userID})
|
||||||
m := &nats.Msg{
|
m := &nats.Msg{
|
||||||
Header: nats.Header{},
|
Header: nats.Header{},
|
||||||
}
|
}
|
||||||
|
|
@ -89,10 +89,12 @@ func (s *PresenceConsumer) Start() error {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if presence == nil {
|
|
||||||
presence = &types.PresenceInternal{
|
presence := &types.PresenceInternal{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
}
|
}
|
||||||
|
if len(presences) > 0 {
|
||||||
|
presence = presences[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
deviceRes := api.QueryDevicesResponse{}
|
deviceRes := api.QueryDevicesResponse{}
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ type DatabaseTransaction interface {
|
||||||
SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
|
SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
|
||||||
// getUserUnreadNotificationCountsForRooms returns the unread notifications for the given rooms
|
// getUserUnreadNotificationCountsForRooms returns the unread notifications for the given rooms
|
||||||
GetUserUnreadNotificationCountsForRooms(ctx context.Context, userID string, roomIDs map[string]string) (map[string]*eventutil.NotificationData, error)
|
GetUserUnreadNotificationCountsForRooms(ctx context.Context, userID string, roomIDs map[string]string) (map[string]*eventutil.NotificationData, error)
|
||||||
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
|
GetPresences(ctx context.Context, userID []string) ([]*types.PresenceInternal, error)
|
||||||
PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error)
|
PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error)
|
||||||
RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, backwards bool, limit int) (events []types.StreamEvent, prevBatch, nextBatch string, err error)
|
RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, backwards bool, limit int) (events []types.StreamEvent, prevBatch, nextBatch string, err error)
|
||||||
}
|
}
|
||||||
|
|
@ -186,7 +186,7 @@ type Database interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Presence interface {
|
type Presence interface {
|
||||||
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
|
GetPresences(ctx context.Context, userIDs []string) ([]*types.PresenceInternal, error)
|
||||||
UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error)
|
UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,10 +19,12 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const presenceSchema = `
|
const presenceSchema = `
|
||||||
|
|
@ -63,9 +65,9 @@ const upsertPresenceFromSyncSQL = "" +
|
||||||
" RETURNING id"
|
" RETURNING id"
|
||||||
|
|
||||||
const selectPresenceForUserSQL = "" +
|
const selectPresenceForUserSQL = "" +
|
||||||
"SELECT presence, status_msg, last_active_ts" +
|
"SELECT user_id, presence, status_msg, last_active_ts" +
|
||||||
" FROM syncapi_presence" +
|
" FROM syncapi_presence" +
|
||||||
" WHERE user_id = $1 LIMIT 1"
|
" WHERE user_id = ANY($1)"
|
||||||
|
|
||||||
const selectMaxPresenceSQL = "" +
|
const selectMaxPresenceSQL = "" +
|
||||||
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
|
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
|
||||||
|
|
@ -119,20 +121,28 @@ func (p *presenceStatements) UpsertPresence(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPresenceForUser returns the current presence of a user.
|
// GetPresenceForUsers returns the current presence for a list of users.
|
||||||
func (p *presenceStatements) GetPresenceForUser(
|
// If the user doesn't have a presence status yet, it is omitted from the response.
|
||||||
|
func (p *presenceStatements) GetPresenceForUsers(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
userID string,
|
userIDs []string,
|
||||||
) (*types.PresenceInternal, error) {
|
) ([]*types.PresenceInternal, error) {
|
||||||
result := &types.PresenceInternal{
|
result := make([]*types.PresenceInternal, 0, len(userIDs))
|
||||||
UserID: userID,
|
|
||||||
}
|
|
||||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||||
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
rows, err := stmt.QueryContext(ctx, pq.Array(userIDs))
|
||||||
if err == sql.ErrNoRows {
|
if err != nil {
|
||||||
return nil, nil
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceForUsers: rows.close() failed")
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
presence := &types.PresenceInternal{}
|
||||||
|
if err = rows.Scan(&presence.UserID, &presence.Presence, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
presence.ClientFields.Presence = presence.Presence.String()
|
||||||
|
result = append(result, presence)
|
||||||
}
|
}
|
||||||
result.ClientFields.Presence = result.Presence.String()
|
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -564,8 +564,8 @@ func (d *Database) UpdatePresence(ctx context.Context, userID string, presence t
|
||||||
return pos, err
|
return pos, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
func (d *Database) GetPresences(ctx context.Context, userIDs []string) ([]*types.PresenceInternal, error) {
|
||||||
return d.Presence.GetPresenceForUser(ctx, nil, userID)
|
return d.Presence.GetPresenceForUsers(ctx, nil, userIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
|
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
|
||||||
|
|
|
||||||
|
|
@ -596,8 +596,8 @@ func (d *DatabaseTransaction) GetUserUnreadNotificationCountsForRooms(ctx contex
|
||||||
return d.NotificationData.SelectUserUnreadCountsForRooms(ctx, d.txn, userID, roomIDs)
|
return d.NotificationData.SelectUserUnreadCountsForRooms(ctx, d.txn, userID, roomIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DatabaseTransaction) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
func (d *DatabaseTransaction) GetPresences(ctx context.Context, userIDs []string) ([]*types.PresenceInternal, error) {
|
||||||
return d.Presence.GetPresenceForUser(ctx, d.txn, userID)
|
return d.Presence.GetPresenceForUsers(ctx, d.txn, userIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DatabaseTransaction) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
|
func (d *DatabaseTransaction) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
|
||||||
|
|
|
||||||
|
|
@ -17,12 +17,14 @@ package sqlite3
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const presenceSchema = `
|
const presenceSchema = `
|
||||||
|
|
@ -62,9 +64,9 @@ const upsertPresenceFromSyncSQL = "" +
|
||||||
" RETURNING id"
|
" RETURNING id"
|
||||||
|
|
||||||
const selectPresenceForUserSQL = "" +
|
const selectPresenceForUserSQL = "" +
|
||||||
"SELECT presence, status_msg, last_active_ts" +
|
"SELECT user_id, presence, status_msg, last_active_ts" +
|
||||||
" FROM syncapi_presence" +
|
" FROM syncapi_presence" +
|
||||||
" WHERE user_id = $1 LIMIT 1"
|
" WHERE user_id IN ($1)"
|
||||||
|
|
||||||
const selectMaxPresenceSQL = "" +
|
const selectMaxPresenceSQL = "" +
|
||||||
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
|
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
|
||||||
|
|
@ -134,20 +136,38 @@ func (p *presenceStatements) UpsertPresence(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPresenceForUser returns the current presence of a user.
|
// GetPresenceForUsers returns the current presence for a list of users.
|
||||||
func (p *presenceStatements) GetPresenceForUser(
|
// If the user doesn't have a presence status yet, it is omitted from the response.
|
||||||
|
func (p *presenceStatements) GetPresenceForUsers(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
userID string,
|
userIDs []string,
|
||||||
) (*types.PresenceInternal, error) {
|
) ([]*types.PresenceInternal, error) {
|
||||||
result := &types.PresenceInternal{
|
qry := strings.Replace(selectPresenceForUserSQL, "($1)", sqlutil.QueryVariadic(len(userIDs)), 1)
|
||||||
UserID: userID,
|
prepStmt, err := p.db.Prepare(qry)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
defer internal.CloseAndLogIfError(ctx, prepStmt, "GetPresenceForUsers: stmt.close() failed")
|
||||||
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
|
||||||
if err == sql.ErrNoRows {
|
params := make([]interface{}, len(userIDs))
|
||||||
return nil, nil
|
for i := range userIDs {
|
||||||
|
params[i] = userIDs[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := sqlutil.TxStmt(txn, prepStmt).QueryContext(ctx, params...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceForUsers: rows.close() failed")
|
||||||
|
result := make([]*types.PresenceInternal, 0, len(userIDs))
|
||||||
|
for rows.Next() {
|
||||||
|
presence := &types.PresenceInternal{}
|
||||||
|
if err = rows.Scan(&presence.UserID, &presence.Presence, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
presence.ClientFields.Presence = presence.Presence.String()
|
||||||
|
result = append(result, presence)
|
||||||
}
|
}
|
||||||
result.ClientFields.Presence = result.Presence.String()
|
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -207,7 +207,7 @@ type Ignores interface {
|
||||||
|
|
||||||
type Presence interface {
|
type Presence interface {
|
||||||
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence types.Presence, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
|
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence types.Presence, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
|
||||||
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
|
GetPresenceForUsers(ctx context.Context, txn *sql.Tx, userIDs []string) (presence []*types.PresenceInternal, err error)
|
||||||
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
|
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
|
||||||
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (presences map[string]*types.PresenceInternal, err error)
|
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (presences map[string]*types.PresenceInternal, err error)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
136
syncapi/storage/tables/presence_table_test.go
Normal file
136
syncapi/storage/tables/presence_table_test.go
Normal file
|
|
@ -0,0 +1,136 @@
|
||||||
|
package tables_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
"github.com/matrix-org/dendrite/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
func mustPresenceTable(t *testing.T, dbType test.DBType) (tables.Presence, func()) {
|
||||||
|
t.Helper()
|
||||||
|
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||||
|
db, err := sqlutil.Open(&config.DatabaseOptions{
|
||||||
|
ConnectionString: config.DataSource(connStr),
|
||||||
|
}, sqlutil.NewExclusiveWriter())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to open db: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var tab tables.Presence
|
||||||
|
switch dbType {
|
||||||
|
case test.DBTypePostgres:
|
||||||
|
tab, err = postgres.NewPostgresPresenceTable(db)
|
||||||
|
case test.DBTypeSQLite:
|
||||||
|
var stream sqlite3.StreamIDStatements
|
||||||
|
if err = stream.Prepare(db); err != nil {
|
||||||
|
t.Fatalf("failed to prepare stream stmts: %s", err)
|
||||||
|
}
|
||||||
|
tab, err = sqlite3.NewSqlitePresenceTable(db, &stream)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to make new table: %s", err)
|
||||||
|
}
|
||||||
|
return tab, close
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPresence(t *testing.T) {
|
||||||
|
alice := test.NewUser(t)
|
||||||
|
bob := test.NewUser(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
statusMsg := "Hello World!"
|
||||||
|
timestamp := gomatrixserverlib.AsTimestamp(time.Now())
|
||||||
|
|
||||||
|
var txn *sql.Tx
|
||||||
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||||
|
tab, closeDB := mustPresenceTable(t, dbType)
|
||||||
|
defer closeDB()
|
||||||
|
|
||||||
|
// Insert some presences
|
||||||
|
pos, err := tab.UpsertPresence(ctx, txn, alice.ID, &statusMsg, types.PresenceOnline, timestamp, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
wantPos := types.StreamPosition(1)
|
||||||
|
if pos != wantPos {
|
||||||
|
t.Errorf("expected pos to be %d, got %d", wantPos, pos)
|
||||||
|
}
|
||||||
|
pos, err = tab.UpsertPresence(ctx, txn, bob.ID, &statusMsg, types.PresenceOnline, timestamp, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
wantPos = 2
|
||||||
|
if pos != wantPos {
|
||||||
|
t.Errorf("expected pos to be %d, got %d", wantPos, pos)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify the expected max presence ID
|
||||||
|
maxPos, err := tab.GetMaxPresenceID(ctx, txn)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if maxPos != wantPos {
|
||||||
|
t.Errorf("expected max pos to be %d, got %d", wantPos, maxPos)
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should increment the position
|
||||||
|
pos, err = tab.UpsertPresence(ctx, txn, bob.ID, &statusMsg, types.PresenceOnline, timestamp, true)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
wantPos = pos
|
||||||
|
if wantPos <= maxPos {
|
||||||
|
t.Errorf("expected pos to be %d incremented, got %d", wantPos, pos)
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should return only Bobs status
|
||||||
|
presences, err := tab.GetPresenceAfter(ctx, txn, maxPos, gomatrixserverlib.EventFilter{Limit: 10})
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if c := len(presences); c > 1 {
|
||||||
|
t.Errorf("expected only one presence, got %d", c)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the response
|
||||||
|
wantPresence := &types.PresenceInternal{
|
||||||
|
UserID: bob.ID,
|
||||||
|
Presence: types.PresenceOnline,
|
||||||
|
StreamPos: wantPos,
|
||||||
|
LastActiveTS: timestamp,
|
||||||
|
ClientFields: types.PresenceClientResponse{
|
||||||
|
LastActiveAgo: 0,
|
||||||
|
Presence: types.PresenceOnline.String(),
|
||||||
|
StatusMsg: &statusMsg,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(wantPresence, presences[bob.ID]) {
|
||||||
|
t.Errorf("unexpected presence result:\n%+v, want\n%+v", presences[bob.ID], wantPresence)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try getting presences for existing and non-existing users
|
||||||
|
getUsers := []string{alice.ID, bob.ID, "@doesntexist:test"}
|
||||||
|
presencesForUsers, err := tab.GetPresenceForUsers(ctx, nil, getUsers)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(presencesForUsers) >= len(getUsers) {
|
||||||
|
t.Errorf("expected less presences, but they are the same/more as requested: %d >= %d", len(presencesForUsers), len(getUsers))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -17,6 +17,7 @@ package streams
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
@ -70,39 +71,25 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
||||||
return from
|
return from
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(presences) == 0 {
|
getPresenceForUsers, err := p.getNeededUsersFromRequest(ctx, req, presences)
|
||||||
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("getNeededUsersFromRequest failed")
|
||||||
|
return from
|
||||||
|
}
|
||||||
|
|
||||||
|
// Got no presence between range and no presence to get from the database
|
||||||
|
if len(getPresenceForUsers) == 0 && len(presences) == 0 {
|
||||||
return to
|
return to
|
||||||
}
|
}
|
||||||
|
|
||||||
// add newly joined rooms user presences
|
dbPresences, err := snapshot.GetPresences(ctx, getPresenceForUsers)
|
||||||
newlyJoined := joinedRooms(req.Response, req.Device.UserID)
|
if err != nil {
|
||||||
if len(newlyJoined) > 0 {
|
req.Log.WithError(err).Error("unable to query presence for user")
|
||||||
// TODO: Check if this is working better than before.
|
_ = snapshot.Rollback()
|
||||||
if err = p.notifier.LoadRooms(ctx, p.DB, newlyJoined); err != nil {
|
return from
|
||||||
req.Log.WithError(err).Error("unable to refresh notifier lists")
|
}
|
||||||
return from
|
for _, presence := range dbPresences {
|
||||||
}
|
presences[presence.UserID] = presence
|
||||||
NewlyJoinedLoop:
|
|
||||||
for _, roomID := range newlyJoined {
|
|
||||||
roomUsers := p.notifier.JoinedUsers(roomID)
|
|
||||||
for i := range roomUsers {
|
|
||||||
// we already got a presence from this user
|
|
||||||
if _, ok := presences[roomUsers[i]]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Bear in mind that this might return nil, but at least populating
|
|
||||||
// a nil means that there's a map entry so we won't repeat this call.
|
|
||||||
presences[roomUsers[i]], err = snapshot.GetPresence(ctx, roomUsers[i])
|
|
||||||
if err != nil {
|
|
||||||
req.Log.WithError(err).Error("unable to query presence for user")
|
|
||||||
_ = snapshot.Rollback()
|
|
||||||
return from
|
|
||||||
}
|
|
||||||
if len(presences) > req.Filter.Presence.Limit {
|
|
||||||
break NewlyJoinedLoop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lastPos := from
|
lastPos := from
|
||||||
|
|
@ -164,6 +151,39 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
||||||
return lastPos
|
return lastPos
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PresenceStreamProvider) getNeededUsersFromRequest(ctx context.Context, req *types.SyncRequest, presences map[string]*types.PresenceInternal) ([]string, error) {
|
||||||
|
getPresenceForUsers := []string{}
|
||||||
|
// Add presence for users which newly joined a room
|
||||||
|
for userID := range req.MembershipChanges {
|
||||||
|
if _, ok := presences[userID]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
getPresenceForUsers = append(getPresenceForUsers, userID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// add newly joined rooms user presences
|
||||||
|
newlyJoined := joinedRooms(req.Response, req.Device.UserID)
|
||||||
|
if len(newlyJoined) == 0 {
|
||||||
|
return getPresenceForUsers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Check if this is working better than before.
|
||||||
|
if err := p.notifier.LoadRooms(ctx, p.DB, newlyJoined); err != nil {
|
||||||
|
return getPresenceForUsers, fmt.Errorf("unable to refresh notifier lists: %w", err)
|
||||||
|
}
|
||||||
|
for _, roomID := range newlyJoined {
|
||||||
|
roomUsers := p.notifier.JoinedUsers(roomID)
|
||||||
|
for i := range roomUsers {
|
||||||
|
// we already got a presence from this user
|
||||||
|
if _, ok := presences[roomUsers[i]]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
getPresenceForUsers = append(getPresenceForUsers, roomUsers[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return getPresenceForUsers, nil
|
||||||
|
}
|
||||||
|
|
||||||
func joinedRooms(res *types.Response, userID string) []string {
|
func joinedRooms(res *types.Response, userID string) []string {
|
||||||
var roomIDs []string
|
var roomIDs []string
|
||||||
for roomID, join := range res.Rooms.Join {
|
for roomID, join := range res.Rooms.Join {
|
||||||
|
|
|
||||||
|
|
@ -145,12 +145,12 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure we also send the current status_msg to federated servers and not nil
|
// ensure we also send the current status_msg to federated servers and not nil
|
||||||
dbPresence, err := db.GetPresence(context.Background(), userID)
|
dbPresence, err := db.GetPresences(context.Background(), []string{userID})
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if dbPresence != nil {
|
if len(dbPresence) > 0 && dbPresence[0] != nil {
|
||||||
newPresence.ClientFields = dbPresence.ClientFields
|
newPresence.ClientFields = dbPresence[0].ClientFields
|
||||||
}
|
}
|
||||||
newPresence.ClientFields.Presence = presenceID.String()
|
newPresence.ClientFields.Presence = presenceID.String()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -29,8 +29,8 @@ func (d dummyDB) UpdatePresence(ctx context.Context, userID string, presence typ
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d dummyDB) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
func (d dummyDB) GetPresences(ctx context.Context, userID []string) ([]*types.PresenceInternal, error) {
|
||||||
return &types.PresenceInternal{}, nil
|
return []*types.PresenceInternal{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
|
func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
|
||||||
|
|
|
||||||
|
|
@ -108,7 +108,7 @@ func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nat
|
||||||
cfg.Global.JetStream.InMemory = true
|
cfg.Global.JetStream.InMemory = true
|
||||||
cfg.SyncAPI.Fulltext.InMemory = true
|
cfg.SyncAPI.Fulltext.InMemory = true
|
||||||
cfg.FederationAPI.KeyPerspectives = nil
|
cfg.FederationAPI.KeyPerspectives = nil
|
||||||
base := base.NewBaseDendrite(cfg, "Tests")
|
base := base.NewBaseDendrite(cfg, "Tests", base.DisableMetrics)
|
||||||
js, jc := base.NATS.Prepare(base.ProcessContext, &cfg.Global.JetStream)
|
js, jc := base.NATS.Prepare(base.ProcessContext, &cfg.Global.JetStream)
|
||||||
return base, js, jc
|
return base, js, jc
|
||||||
}
|
}
|
||||||
|
|
|
||||||
119
userapi/util/notify_test.go
Normal file
119
userapi/util/notify_test.go
Normal file
|
|
@ -0,0 +1,119 @@
|
||||||
|
package util_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
"golang.org/x/crypto/bcrypt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/pushgateway"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/test"
|
||||||
|
"github.com/matrix-org/dendrite/test/testrig"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/storage"
|
||||||
|
userUtil "github.com/matrix-org/dendrite/userapi/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNotifyUserCountsAsync(t *testing.T) {
|
||||||
|
alice := test.NewUser(t)
|
||||||
|
aliceLocalpart, serverName, err := gomatrixserverlib.SplitID('@', alice.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Create a test room, just used to provide events
|
||||||
|
room := test.NewRoom(t, alice)
|
||||||
|
dummyEvent := room.Events()[len(room.Events())-1]
|
||||||
|
|
||||||
|
appID := util.RandomString(8)
|
||||||
|
pushKey := util.RandomString(8)
|
||||||
|
|
||||||
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||||
|
receivedRequest := make(chan bool, 1)
|
||||||
|
// create a test server which responds to our /notify call
|
||||||
|
srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var data pushgateway.NotifyRequest
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
notification := data.Notification
|
||||||
|
// Validate the request
|
||||||
|
if notification.Counts == nil {
|
||||||
|
t.Fatal("no unread notification counts in request")
|
||||||
|
}
|
||||||
|
if unread := notification.Counts.Unread; unread != 1 {
|
||||||
|
t.Errorf("expected one unread notification, got %d", unread)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(notification.Devices) == 0 {
|
||||||
|
t.Fatal("expected devices in request")
|
||||||
|
}
|
||||||
|
|
||||||
|
// We only created one push device, so access it directly
|
||||||
|
device := notification.Devices[0]
|
||||||
|
if device.AppID != appID {
|
||||||
|
t.Errorf("unexpected app_id: %s, want %s", device.AppID, appID)
|
||||||
|
}
|
||||||
|
if device.PushKey != pushKey {
|
||||||
|
t.Errorf("unexpected push_key: %s, want %s", device.PushKey, pushKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return empty result, otherwise the call is handled as failed
|
||||||
|
if _, err := w.Write([]byte("{}")); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
close(receivedRequest)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
// Create DB and Dendrite base
|
||||||
|
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||||
|
defer close()
|
||||||
|
base, _, _ := testrig.Base(nil)
|
||||||
|
defer base.Close()
|
||||||
|
db, err := storage.NewUserAPIDatabase(base, &config.DatabaseOptions{
|
||||||
|
ConnectionString: config.DataSource(connStr),
|
||||||
|
}, "test", bcrypt.MinCost, 0, 0, "")
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare pusher with our test server URL
|
||||||
|
if err := db.UpsertPusher(ctx, api.Pusher{
|
||||||
|
Kind: api.HTTPKind,
|
||||||
|
AppID: appID,
|
||||||
|
PushKey: pushKey,
|
||||||
|
Data: map[string]interface{}{
|
||||||
|
"url": srv.URL,
|
||||||
|
},
|
||||||
|
}, aliceLocalpart, serverName); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert a dummy event
|
||||||
|
if err := db.InsertNotification(ctx, aliceLocalpart, serverName, dummyEvent.EventID(), 0, nil, &api.Notification{
|
||||||
|
Event: gomatrixserverlib.HeaderedToClientEvent(dummyEvent, gomatrixserverlib.FormatAll),
|
||||||
|
}); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify the user about a new notification
|
||||||
|
if err := userUtil.NotifyUserCountsAsync(ctx, pushgateway.NewHTTPClient(true), aliceLocalpart, serverName, db); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
t.Error("timed out waiting for response")
|
||||||
|
case <-receivedRequest:
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -97,12 +97,10 @@ func (p *phoneHomeStats) collect() {
|
||||||
|
|
||||||
// configuration information
|
// configuration information
|
||||||
p.stats["federation_disabled"] = p.cfg.Global.DisableFederation
|
p.stats["federation_disabled"] = p.cfg.Global.DisableFederation
|
||||||
p.stats["nats_embedded"] = true
|
natsEmbedded := len(p.cfg.Global.JetStream.Addresses) == 0
|
||||||
p.stats["nats_in_memory"] = p.cfg.Global.JetStream.InMemory
|
p.stats["nats_embedded"] = natsEmbedded
|
||||||
if len(p.cfg.Global.JetStream.Addresses) > 0 {
|
p.stats["nats_in_memory"] = p.cfg.Global.JetStream.InMemory && natsEmbedded
|
||||||
p.stats["nats_embedded"] = false
|
|
||||||
p.stats["nats_in_memory"] = false // probably
|
|
||||||
}
|
|
||||||
if len(p.cfg.Logging) > 0 {
|
if len(p.cfg.Logging) > 0 {
|
||||||
p.stats["log_level"] = p.cfg.Logging[0].Level
|
p.stats["log_level"] = p.cfg.Logging[0].Level
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
84
userapi/util/phonehomestats_test.go
Normal file
84
userapi/util/phonehomestats_test.go
Normal file
|
|
@ -0,0 +1,84 @@
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/crypto/bcrypt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/test"
|
||||||
|
"github.com/matrix-org/dendrite/test/testrig"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCollect(t *testing.T) {
|
||||||
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||||
|
b, _, _ := testrig.Base(nil)
|
||||||
|
connStr, closeDB := test.PrepareDBConnectionString(t, dbType)
|
||||||
|
defer closeDB()
|
||||||
|
db, err := storage.NewUserAPIDatabase(b, &config.DatabaseOptions{
|
||||||
|
ConnectionString: config.DataSource(connStr),
|
||||||
|
}, "localhost", bcrypt.MinCost, 1000, 1000, "")
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
receivedRequest := make(chan struct{}, 1)
|
||||||
|
// create a test server which responds to our call
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var data map[string]interface{}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
defer r.Body.Close()
|
||||||
|
if _, err := w.Write([]byte("{}")); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify the received data matches our expectations
|
||||||
|
dbEngine, ok := data["database_engine"]
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("missing database_engine in JSON request: %+v", data)
|
||||||
|
}
|
||||||
|
version, ok := data["version"]
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("missing version in JSON request: %+v", data)
|
||||||
|
}
|
||||||
|
if version != internal.VersionString() {
|
||||||
|
t.Errorf("unexpected version: %q, expected %q", version, internal.VersionString())
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case dbType == test.DBTypeSQLite && dbEngine != "SQLite":
|
||||||
|
t.Errorf("unexpected database_engine: %s", dbEngine)
|
||||||
|
case dbType == test.DBTypePostgres && dbEngine != "Postgres":
|
||||||
|
t.Errorf("unexpected database_engine: %s", dbEngine)
|
||||||
|
}
|
||||||
|
close(receivedRequest)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
b.Cfg.Global.ReportStats.Endpoint = srv.URL
|
||||||
|
stats := phoneHomeStats{
|
||||||
|
prevData: timestampToRUUsage{},
|
||||||
|
serverName: "localhost",
|
||||||
|
startTime: time.Now(),
|
||||||
|
cfg: b.Cfg,
|
||||||
|
db: db,
|
||||||
|
isMonolith: false,
|
||||||
|
client: &http.Client{Timeout: time.Second},
|
||||||
|
}
|
||||||
|
|
||||||
|
stats.collect()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
t.Error("timed out waiting for response")
|
||||||
|
case <-receivedRequest:
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue