diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go index d86078cbf..110813274 100644 --- a/userapi/consumers/syncapi_streamevent.go +++ b/userapi/consumers/syncapi_streamevent.go @@ -139,9 +139,6 @@ func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *g // removing it means we can send all notifications to // e.g. Element's Push gateway in one go. for _, mem := range members { - if p, err := s.db.GetPushers(ctx, mem.Localpart); err != nil || len(p) == 0 { - continue - } if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil { log.WithFields(log.Fields{ "localpart": mem.Localpart, diff --git a/userapi/storage/interface.go b/userapi/storage/interface.go index 6d22fea9d..777067109 100644 --- a/userapi/storage/interface.go +++ b/userapi/storage/interface.go @@ -97,6 +97,7 @@ type Database interface { GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error) GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error) GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error) + DeleteOldNotifications(ctx context.Context) error UpsertPusher(ctx context.Context, p api.Pusher, localpart string) error GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error) diff --git a/userapi/storage/postgres/notifications_table.go b/userapi/storage/postgres/notifications_table.go index 7bcc0f9cd..459bcc59d 100644 --- a/userapi/storage/postgres/notifications_table.go +++ b/userapi/storage/postgres/notifications_table.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "encoding/json" + "time" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -28,12 +29,13 @@ import ( ) type notificationsStatements struct { - insertStmt *sql.Stmt - deleteUpToStmt *sql.Stmt - updateReadStmt *sql.Stmt - selectStmt *sql.Stmt - selectCountStmt *sql.Stmt - selectRoomCountsStmt *sql.Stmt + insertStmt *sql.Stmt + deleteUpToStmt *sql.Stmt + updateReadStmt *sql.Stmt + selectStmt *sql.Stmt + selectCountStmt *sql.Stmt + selectRoomCountsStmt *sql.Stmt + cleanNotificationsStmt *sql.Stmt } const notificationSchema = ` @@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" + "SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " + "WHERE localpart = $1 AND room_id = $2 AND NOT read" +const cleanNotificationsSQL = "" + + "DELETE FROM userapi_notifications WHERE" + + " (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)" + func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) { s := ¬ificationsStatements{} _, err := db.Exec(notificationSchema) @@ -90,9 +96,19 @@ func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) {&s.selectStmt, selectNotificationSQL}, {&s.selectCountStmt, selectNotificationCountSQL}, {&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL}, + {&s.cleanNotificationsStmt, cleanNotificationsSQL}, }.Prepare(db) } +func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error { + _, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext( + ctx, + time.Now().AddDate(0, 0, -1).UnixMilli(), // keep non-highlights for a day + time.Now().AddDate(0, -1, 0).UnixMilli(), // keep highlights for a month + ) + return err +} + // Insert inserts a notification into the database. func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { roomID, tsMS := n.RoomID, n.TS diff --git a/userapi/storage/shared/storage.go b/userapi/storage/shared/storage.go index a58974b41..febf03221 100644 --- a/userapi/storage/shared/storage.go +++ b/userapi/storage/shared/storage.go @@ -705,6 +705,10 @@ func (d *Database) GetRoomNotificationCounts(ctx context.Context, localpart, roo return d.Notifications.SelectRoomCounts(ctx, nil, localpart, roomID) } +func (d *Database) DeleteOldNotifications(ctx context.Context) error { + return d.Notifications.Clean(ctx, nil) +} + func (d *Database) UpsertPusher( ctx context.Context, p api.Pusher, localpart string, ) error { diff --git a/userapi/storage/sqlite3/notifications_table.go b/userapi/storage/sqlite3/notifications_table.go index fcfb1aadc..bf3fd2329 100644 --- a/userapi/storage/sqlite3/notifications_table.go +++ b/userapi/storage/sqlite3/notifications_table.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "encoding/json" + "time" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -28,12 +29,13 @@ import ( ) type notificationsStatements struct { - insertStmt *sql.Stmt - deleteUpToStmt *sql.Stmt - updateReadStmt *sql.Stmt - selectStmt *sql.Stmt - selectCountStmt *sql.Stmt - selectRoomCountsStmt *sql.Stmt + insertStmt *sql.Stmt + deleteUpToStmt *sql.Stmt + updateReadStmt *sql.Stmt + selectStmt *sql.Stmt + selectCountStmt *sql.Stmt + selectRoomCountsStmt *sql.Stmt + cleanNotificationsStmt *sql.Stmt } const notificationSchema = ` @@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" + "SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " + "WHERE localpart = $1 AND room_id = $2 AND NOT read" +const cleanNotificationsSQL = "" + + "DELETE FROM userapi_notifications WHERE" + + " (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)" + func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) { s := ¬ificationsStatements{} _, err := db.Exec(notificationSchema) @@ -90,9 +96,19 @@ func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) { {&s.selectStmt, selectNotificationSQL}, {&s.selectCountStmt, selectNotificationCountSQL}, {&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL}, + {&s.cleanNotificationsStmt, cleanNotificationsSQL}, }.Prepare(db) } +func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error { + _, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext( + ctx, + time.Now().AddDate(0, 0, -1).UnixMilli(), // keep non-highlights for a day + time.Now().AddDate(0, -1, 0).UnixMilli(), // keep highlights for a month + ) + return err +} + // Insert inserts a notification into the database. func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { roomID, tsMS := n.RoomID, n.TS diff --git a/userapi/storage/tables/interface.go b/userapi/storage/tables/interface.go index 815e51193..99c907b85 100644 --- a/userapi/storage/tables/interface.go +++ b/userapi/storage/tables/interface.go @@ -103,6 +103,7 @@ type PusherTable interface { } type NotificationTable interface { + Clean(ctx context.Context, txn *sql.Tx) error Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) diff --git a/userapi/userapi.go b/userapi/userapi.go index 2382e9512..46cbd6b0d 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -15,6 +15,7 @@ package userapi import ( + "context" "time" "github.com/gorilla/mux" @@ -86,5 +87,15 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to start user API streamed event consumer") } + var cleanOldNotifs func() + cleanOldNotifs = func() { + logrus.Infof("Cleaning old notifications") + if err := db.DeleteOldNotifications(context.Background()); err != nil { + logrus.WithError(err).Error("Failed to clean old notifications") + } + time.AfterFunc(time.Hour, cleanOldNotifs) + } + time.AfterFunc(time.Minute, cleanOldNotifs) + return userAPI }