diff --git a/sytest-blacklist b/sytest-blacklist index 978dcde84..becc500ec 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -35,3 +35,4 @@ AS-ghosted users can use rooms themselves # Flakey, need additional investigation Messages that notify from another user increment notification_count Messages that highlight from another user increment unread highlight count +Notifications can be viewed with GET /notifications \ No newline at end of file diff --git a/sytest-whitelist b/sytest-whitelist index 602f86465..63d779bf1 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -638,7 +638,6 @@ Rooms with many users are correctly pushed Don't get pushed for rooms you've muted Rejected events are not pushed Test that rejected pushers are removed. -Notifications can be viewed with GET /notifications Trying to add push rule with no scope fails with 400 Trying to add push rule with invalid scope fails with 400 Forward extremities remain so even after the next events are populated as outliers diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go index d86078cbf..110813274 100644 --- a/userapi/consumers/syncapi_streamevent.go +++ b/userapi/consumers/syncapi_streamevent.go @@ -139,9 +139,6 @@ func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *g // removing it means we can send all notifications to // e.g. Element's Push gateway in one go. for _, mem := range members { - if p, err := s.db.GetPushers(ctx, mem.Localpart); err != nil || len(p) == 0 { - continue - } if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil { log.WithFields(log.Fields{ "localpart": mem.Localpart, diff --git a/userapi/storage/interface.go b/userapi/storage/interface.go index 6d22fea9d..777067109 100644 --- a/userapi/storage/interface.go +++ b/userapi/storage/interface.go @@ -97,6 +97,7 @@ type Database interface { GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error) GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error) GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error) + DeleteOldNotifications(ctx context.Context) error UpsertPusher(ctx context.Context, p api.Pusher, localpart string) error GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error) diff --git a/userapi/storage/postgres/notifications_table.go b/userapi/storage/postgres/notifications_table.go index 7bcc0f9cd..a27c1125e 100644 --- a/userapi/storage/postgres/notifications_table.go +++ b/userapi/storage/postgres/notifications_table.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "encoding/json" + "time" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -28,12 +29,13 @@ import ( ) type notificationsStatements struct { - insertStmt *sql.Stmt - deleteUpToStmt *sql.Stmt - updateReadStmt *sql.Stmt - selectStmt *sql.Stmt - selectCountStmt *sql.Stmt - selectRoomCountsStmt *sql.Stmt + insertStmt *sql.Stmt + deleteUpToStmt *sql.Stmt + updateReadStmt *sql.Stmt + selectStmt *sql.Stmt + selectCountStmt *sql.Stmt + selectRoomCountsStmt *sql.Stmt + cleanNotificationsStmt *sql.Stmt } const notificationSchema = ` @@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" + "SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " + "WHERE localpart = $1 AND room_id = $2 AND NOT read" +const cleanNotificationsSQL = "" + + "DELETE FROM userapi_notifications WHERE" + + " (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)" + func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) { s := ¬ificationsStatements{} _, err := db.Exec(notificationSchema) @@ -90,9 +96,19 @@ func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) {&s.selectStmt, selectNotificationSQL}, {&s.selectCountStmt, selectNotificationCountSQL}, {&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL}, + {&s.cleanNotificationsStmt, cleanNotificationsSQL}, }.Prepare(db) } +func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error { + _, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext( + ctx, + time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day + time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month + ) + return err +} + // Insert inserts a notification into the database. func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { roomID, tsMS := n.RoomID, n.TS diff --git a/userapi/storage/shared/storage.go b/userapi/storage/shared/storage.go index a58974b41..febf03221 100644 --- a/userapi/storage/shared/storage.go +++ b/userapi/storage/shared/storage.go @@ -705,6 +705,10 @@ func (d *Database) GetRoomNotificationCounts(ctx context.Context, localpart, roo return d.Notifications.SelectRoomCounts(ctx, nil, localpart, roomID) } +func (d *Database) DeleteOldNotifications(ctx context.Context) error { + return d.Notifications.Clean(ctx, nil) +} + func (d *Database) UpsertPusher( ctx context.Context, p api.Pusher, localpart string, ) error { diff --git a/userapi/storage/sqlite3/notifications_table.go b/userapi/storage/sqlite3/notifications_table.go index fcfb1aadc..df8260251 100644 --- a/userapi/storage/sqlite3/notifications_table.go +++ b/userapi/storage/sqlite3/notifications_table.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "encoding/json" + "time" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -28,12 +29,13 @@ import ( ) type notificationsStatements struct { - insertStmt *sql.Stmt - deleteUpToStmt *sql.Stmt - updateReadStmt *sql.Stmt - selectStmt *sql.Stmt - selectCountStmt *sql.Stmt - selectRoomCountsStmt *sql.Stmt + insertStmt *sql.Stmt + deleteUpToStmt *sql.Stmt + updateReadStmt *sql.Stmt + selectStmt *sql.Stmt + selectCountStmt *sql.Stmt + selectRoomCountsStmt *sql.Stmt + cleanNotificationsStmt *sql.Stmt } const notificationSchema = ` @@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" + "SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " + "WHERE localpart = $1 AND room_id = $2 AND NOT read" +const cleanNotificationsSQL = "" + + "DELETE FROM userapi_notifications WHERE" + + " (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)" + func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) { s := ¬ificationsStatements{} _, err := db.Exec(notificationSchema) @@ -90,9 +96,19 @@ func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) { {&s.selectStmt, selectNotificationSQL}, {&s.selectCountStmt, selectNotificationCountSQL}, {&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL}, + {&s.cleanNotificationsStmt, cleanNotificationsSQL}, }.Prepare(db) } +func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error { + _, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext( + ctx, + time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day + time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month + ) + return err +} + // Insert inserts a notification into the database. func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { roomID, tsMS := n.RoomID, n.TS diff --git a/userapi/storage/tables/interface.go b/userapi/storage/tables/interface.go index 815e51193..99c907b85 100644 --- a/userapi/storage/tables/interface.go +++ b/userapi/storage/tables/interface.go @@ -103,6 +103,7 @@ type PusherTable interface { } type NotificationTable interface { + Clean(ctx context.Context, txn *sql.Tx) error Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) diff --git a/userapi/userapi.go b/userapi/userapi.go index 8dbc095fb..251a4edad 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -15,6 +15,8 @@ package userapi import ( + "time" + "github.com/gorilla/mux" "github.com/matrix-org/dendrite/internal/pushgateway" keyapi "github.com/matrix-org/dendrite/keyserver/api" @@ -79,5 +81,15 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to start user API streamed event consumer") } + var cleanOldNotifs func() + cleanOldNotifs = func() { + logrus.Infof("Cleaning old notifications") + if err := db.DeleteOldNotifications(base.Context()); err != nil { + logrus.WithError(err).Error("Failed to clean old notifications") + } + time.AfterFunc(time.Hour, cleanOldNotifs) + } + time.AfterFunc(time.Minute, cleanOldNotifs) + return userAPI }