Merge branch 'main' into neilalexander/v065

This commit is contained in:
Neil Alexander 2022-03-03 16:46:44 +00:00
commit 342e36f8aa
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
9 changed files with 61 additions and 21 deletions

View file

@ -35,3 +35,4 @@ AS-ghosted users can use rooms themselves
# Flakey, need additional investigation
Messages that notify from another user increment notification_count
Messages that highlight from another user increment unread highlight count
Notifications can be viewed with GET /notifications

View file

@ -638,7 +638,6 @@ Rooms with many users are correctly pushed
Don't get pushed for rooms you've muted
Rejected events are not pushed
Test that rejected pushers are removed.
Notifications can be viewed with GET /notifications
Trying to add push rule with no scope fails with 400
Trying to add push rule with invalid scope fails with 400
Forward extremities remain so even after the next events are populated as outliers

View file

@ -139,9 +139,6 @@ func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *g
// removing it means we can send all notifications to
// e.g. Element's Push gateway in one go.
for _, mem := range members {
if p, err := s.db.GetPushers(ctx, mem.Localpart); err != nil || len(p) == 0 {
continue
}
if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil {
log.WithFields(log.Fields{
"localpart": mem.Localpart,

View file

@ -97,6 +97,7 @@ type Database interface {
GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error)
GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error)
GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error)
DeleteOldNotifications(ctx context.Context) error
UpsertPusher(ctx context.Context, p api.Pusher, localpart string) error
GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error)

View file

@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
@ -28,12 +29,13 @@ import (
)
type notificationsStatements struct {
insertStmt *sql.Stmt
deleteUpToStmt *sql.Stmt
updateReadStmt *sql.Stmt
selectStmt *sql.Stmt
selectCountStmt *sql.Stmt
selectRoomCountsStmt *sql.Stmt
insertStmt *sql.Stmt
deleteUpToStmt *sql.Stmt
updateReadStmt *sql.Stmt
selectStmt *sql.Stmt
selectCountStmt *sql.Stmt
selectRoomCountsStmt *sql.Stmt
cleanNotificationsStmt *sql.Stmt
}
const notificationSchema = `
@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" +
"SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " +
"WHERE localpart = $1 AND room_id = $2 AND NOT read"
const cleanNotificationsSQL = "" +
"DELETE FROM userapi_notifications WHERE" +
" (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)"
func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
s := &notificationsStatements{}
_, err := db.Exec(notificationSchema)
@ -90,9 +96,19 @@ func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error)
{&s.selectStmt, selectNotificationSQL},
{&s.selectCountStmt, selectNotificationCountSQL},
{&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL},
{&s.cleanNotificationsStmt, cleanNotificationsSQL},
}.Prepare(db)
}
func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error {
_, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext(
ctx,
time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day
time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month
)
return err
}
// Insert inserts a notification into the database.
func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS

View file

@ -705,6 +705,10 @@ func (d *Database) GetRoomNotificationCounts(ctx context.Context, localpart, roo
return d.Notifications.SelectRoomCounts(ctx, nil, localpart, roomID)
}
func (d *Database) DeleteOldNotifications(ctx context.Context) error {
return d.Notifications.Clean(ctx, nil)
}
func (d *Database) UpsertPusher(
ctx context.Context, p api.Pusher, localpart string,
) error {

View file

@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
@ -28,12 +29,13 @@ import (
)
type notificationsStatements struct {
insertStmt *sql.Stmt
deleteUpToStmt *sql.Stmt
updateReadStmt *sql.Stmt
selectStmt *sql.Stmt
selectCountStmt *sql.Stmt
selectRoomCountsStmt *sql.Stmt
insertStmt *sql.Stmt
deleteUpToStmt *sql.Stmt
updateReadStmt *sql.Stmt
selectStmt *sql.Stmt
selectCountStmt *sql.Stmt
selectRoomCountsStmt *sql.Stmt
cleanNotificationsStmt *sql.Stmt
}
const notificationSchema = `
@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" +
"SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " +
"WHERE localpart = $1 AND room_id = $2 AND NOT read"
const cleanNotificationsSQL = "" +
"DELETE FROM userapi_notifications WHERE" +
" (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)"
func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
s := &notificationsStatements{}
_, err := db.Exec(notificationSchema)
@ -90,9 +96,19 @@ func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
{&s.selectStmt, selectNotificationSQL},
{&s.selectCountStmt, selectNotificationCountSQL},
{&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL},
{&s.cleanNotificationsStmt, cleanNotificationsSQL},
}.Prepare(db)
}
func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error {
_, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext(
ctx,
time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day
time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month
)
return err
}
// Insert inserts a notification into the database.
func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS

View file

@ -103,6 +103,7 @@ type PusherTable interface {
}
type NotificationTable interface {
Clean(ctx context.Context, txn *sql.Tx) error
Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error
DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error)
UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error)

View file

@ -46,11 +46,6 @@ func NewInternalAPI(
appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, pgClient pushgateway.Client,
) api.UserInternalAPI {
db, err := storage.NewDatabase(&cfg.AccountDatabase, cfg.Matrix.ServerName, cfg.BCryptCost, int64(api.DefaultLoginTokenLifetime*time.Millisecond), api.DefaultLoginTokenLifetime)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to device db")
}
js := jetstream.Prepare(&cfg.Matrix.JetStream)
syncProducer := producers.NewSyncAPI(
@ -86,5 +81,15 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start user API streamed event consumer")
}
var cleanOldNotifs func()
cleanOldNotifs = func() {
logrus.Infof("Cleaning old notifications")
if err := db.DeleteOldNotifications(base.Context()); err != nil {
logrus.WithError(err).Error("Failed to clean old notifications")
}
time.AfterFunc(time.Hour, cleanOldNotifs)
}
time.AfterFunc(time.Minute, cleanOldNotifs)
return userAPI
}