Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/phonehomestats

This commit is contained in:
Till Faelligen 2022-03-04 08:21:44 +01:00
commit 623df231ee
11 changed files with 69 additions and 22 deletions

View file

@ -286,7 +286,7 @@ func SetDisplayName(
return jsonerror.InternalServerError()
}
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, cfg.Matrix.ServerName, nil, false); err != nil {
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, cfg.Matrix.ServerName, nil, true); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}

View file

@ -17,6 +17,7 @@ package routing
import (
"database/sql"
"encoding/json"
"fmt"
"net/http"
"strconv"
@ -102,6 +103,12 @@ func Context(
id, requestedEvent, err := syncDB.SelectContextEvent(ctx, roomID, eventID)
if err != nil {
if err == sql.ErrNoRows {
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound(fmt.Sprintf("Event %s not found", eventID)),
}
}
logrus.WithError(err).WithField("eventID", eventID).Error("unable to find requested event")
return jsonerror.InternalServerError()
}

View file

@ -35,3 +35,4 @@ AS-ghosted users can use rooms themselves
# Flakey, need additional investigation
Messages that notify from another user increment notification_count
Messages that highlight from another user increment unread highlight count
Notifications can be viewed with GET /notifications

View file

@ -638,7 +638,6 @@ Rooms with many users are correctly pushed
Don't get pushed for rooms you've muted
Rejected events are not pushed
Test that rejected pushers are removed.
Notifications can be viewed with GET /notifications
Trying to add push rule with no scope fails with 400
Trying to add push rule with invalid scope fails with 400
Forward extremities remain so even after the next events are populated as outliers

View file

@ -139,9 +139,6 @@ func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *g
// removing it means we can send all notifications to
// e.g. Element's Push gateway in one go.
for _, mem := range members {
if p, err := s.db.GetPushers(ctx, mem.Localpart); err != nil || len(p) == 0 {
continue
}
if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil {
log.WithFields(log.Fields{
"localpart": mem.Localpart,

View file

@ -97,6 +97,7 @@ type Database interface {
GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error)
GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error)
GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error)
DeleteOldNotifications(ctx context.Context) error
UpsertPusher(ctx context.Context, p api.Pusher, localpart string) error
GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error)

View file

@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
@ -28,12 +29,13 @@ import (
)
type notificationsStatements struct {
insertStmt *sql.Stmt
deleteUpToStmt *sql.Stmt
updateReadStmt *sql.Stmt
selectStmt *sql.Stmt
selectCountStmt *sql.Stmt
selectRoomCountsStmt *sql.Stmt
insertStmt *sql.Stmt
deleteUpToStmt *sql.Stmt
updateReadStmt *sql.Stmt
selectStmt *sql.Stmt
selectCountStmt *sql.Stmt
selectRoomCountsStmt *sql.Stmt
cleanNotificationsStmt *sql.Stmt
}
const notificationSchema = `
@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" +
"SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " +
"WHERE localpart = $1 AND room_id = $2 AND NOT read"
const cleanNotificationsSQL = "" +
"DELETE FROM userapi_notifications WHERE" +
" (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)"
func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
s := &notificationsStatements{}
_, err := db.Exec(notificationSchema)
@ -90,9 +96,19 @@ func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error)
{&s.selectStmt, selectNotificationSQL},
{&s.selectCountStmt, selectNotificationCountSQL},
{&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL},
{&s.cleanNotificationsStmt, cleanNotificationsSQL},
}.Prepare(db)
}
func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error {
_, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext(
ctx,
time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day
time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month
)
return err
}
// Insert inserts a notification into the database.
func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS

View file

@ -706,6 +706,10 @@ func (d *Database) GetRoomNotificationCounts(ctx context.Context, localpart, roo
return d.Notifications.SelectRoomCounts(ctx, nil, localpart, roomID)
}
func (d *Database) DeleteOldNotifications(ctx context.Context) error {
return d.Notifications.Clean(ctx, nil)
}
func (d *Database) UpsertPusher(
ctx context.Context, p api.Pusher, localpart string,
) error {

View file

@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
@ -28,12 +29,13 @@ import (
)
type notificationsStatements struct {
insertStmt *sql.Stmt
deleteUpToStmt *sql.Stmt
updateReadStmt *sql.Stmt
selectStmt *sql.Stmt
selectCountStmt *sql.Stmt
selectRoomCountsStmt *sql.Stmt
insertStmt *sql.Stmt
deleteUpToStmt *sql.Stmt
updateReadStmt *sql.Stmt
selectStmt *sql.Stmt
selectCountStmt *sql.Stmt
selectRoomCountsStmt *sql.Stmt
cleanNotificationsStmt *sql.Stmt
}
const notificationSchema = `
@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" +
"SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " +
"WHERE localpart = $1 AND room_id = $2 AND NOT read"
const cleanNotificationsSQL = "" +
"DELETE FROM userapi_notifications WHERE" +
" (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)"
func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
s := &notificationsStatements{}
_, err := db.Exec(notificationSchema)
@ -90,9 +96,19 @@ func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
{&s.selectStmt, selectNotificationSQL},
{&s.selectCountStmt, selectNotificationCountSQL},
{&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL},
{&s.cleanNotificationsStmt, cleanNotificationsSQL},
}.Prepare(db)
}
func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error {
_, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext(
ctx,
time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day
time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month
)
return err
}
// Insert inserts a notification into the database.
func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS

View file

@ -103,6 +103,7 @@ type PusherTable interface {
}
type NotificationTable interface {
Clean(ctx context.Context, txn *sql.Tx) error
Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error
DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error)
UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error)

View file

@ -56,11 +56,6 @@ func NewInternalAPI(
appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, pgClient pushgateway.Client,
) api.UserInternalAPI {
db, err := storage.NewDatabase(&cfg.AccountDatabase, cfg.Matrix.ServerName, cfg.BCryptCost, int64(api.DefaultLoginTokenLifetime*time.Millisecond), api.DefaultLoginTokenLifetime)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to device db")
}
js := jetstream.Prepare(&cfg.Matrix.JetStream)
syncProducer := producers.NewSyncAPI(
@ -96,6 +91,16 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start user API streamed event consumer")
}
var cleanOldNotifs func()
cleanOldNotifs = func() {
logrus.Infof("Cleaning old notifications")
if err := db.DeleteOldNotifications(base.Context()); err != nil {
logrus.WithError(err).Error("Failed to clean old notifications")
}
time.AfterFunc(time.Hour, cleanOldNotifs)
}
time.AfterFunc(time.Minute, cleanOldNotifs)
return userAPI
}