From b772f9ee787bd53cb6e3d600035e5b1a22a6c7c7 Mon Sep 17 00:00:00 2001 From: PiotrKozimor <37144818+PiotrKozimor@users.noreply.github.com> Date: Mon, 14 Nov 2022 08:56:52 +0100 Subject: [PATCH] Allow for only one pusher for given pushkey and app ID - remove pushers duplicates (#50) --- .../deltas/2022110311000000_unique_pushers.go | 37 +++++++++++++++++++ userapi/storage/postgres/pusher_table.go | 9 ++--- userapi/storage/postgres/storage.go | 4 ++ 3 files changed, 44 insertions(+), 6 deletions(-) create mode 100644 userapi/storage/postgres/deltas/2022110311000000_unique_pushers.go diff --git a/userapi/storage/postgres/deltas/2022110311000000_unique_pushers.go b/userapi/storage/postgres/deltas/2022110311000000_unique_pushers.go new file mode 100644 index 000000000..6f862a025 --- /dev/null +++ b/userapi/storage/postgres/deltas/2022110311000000_unique_pushers.go @@ -0,0 +1,37 @@ +package deltas + +import ( + "context" + "database/sql" + "fmt" +) + +func UpUniquePusher(ctx context.Context, tx *sql.Tx) error { + rows := tx.QueryRowContext(ctx, "SELECT EXISTS (select * from pg_tables where tablename = 'userapi_pushers')") + tableExists := false + err := rows.Scan(&tableExists) + + if err != nil { + return fmt.Errorf("select table exists: %w", err) + } + if !tableExists { + return nil + } + _, err = tx.ExecContext(ctx, "DELETE FROM userapi_pushers p1 USING userapi_pushers p2 WHERE p1.pushkey_ts_ms < p2.pushkey_ts_ms AND p1.app_id = p2.app_id AND p1.pushkey = p2.pushkey") + if err != nil { + return fmt.Errorf("delete pusher duplicates: %w", err) + } + _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS userapi_pusher_app_id_pushkey_localpart_idx") + if err != nil { + return fmt.Errorf("drop unique index: %w", err) + } + _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS userapi_pusher_app_id_pushkey_idx") + if err != nil { + return fmt.Errorf("drop index: %w", err) + } + return nil +} + +func DownUniquePusher(ctx context.Context, tx *sql.Tx) error { + return nil +} diff --git a/userapi/storage/postgres/pusher_table.go b/userapi/storage/postgres/pusher_table.go index 6fb714fba..42618cf34 100644 --- a/userapi/storage/postgres/pusher_table.go +++ b/userapi/storage/postgres/pusher_table.go @@ -45,20 +45,17 @@ CREATE TABLE IF NOT EXISTS userapi_pushers ( data TEXT NOT NULL ); --- For faster deleting by app_id, pushkey pair. -CREATE INDEX IF NOT EXISTS userapi_pusher_app_id_pushkey_idx ON userapi_pushers(app_id, pushkey); - -- For faster retrieving by localpart. CREATE INDEX IF NOT EXISTS userapi_pusher_localpart_idx ON userapi_pushers(localpart); --- Pushkey must be unique for a given user and app. -CREATE UNIQUE INDEX IF NOT EXISTS userapi_pusher_app_id_pushkey_localpart_idx ON userapi_pushers(app_id, pushkey, localpart); +-- Pushkey must be unique for a given app. +CREATE UNIQUE INDEX IF NOT EXISTS userapi_pusher_app_id_pushkey_idx ON userapi_pushers(app_id, pushkey); ` const insertPusherSQL = "" + "INSERT INTO userapi_pushers (localpart, session_id, pushkey, pushkey_ts_ms, kind, app_id, app_display_name, device_display_name, profile_tag, lang, data)" + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)" + - "ON CONFLICT (app_id, pushkey, localpart) DO UPDATE SET session_id = $2, pushkey_ts_ms = $4, kind = $5, app_display_name = $7, device_display_name = $8, profile_tag = $9, lang = $10, data = $11" + "ON CONFLICT (app_id, pushkey) DO UPDATE SET localpart = $1, session_id = $2, pushkey_ts_ms = $4, kind = $5, app_display_name = $7, device_display_name = $8, profile_tag = $9, lang = $10, data = $11" const selectPushersSQL = "" + "SELECT session_id, pushkey, pushkey_ts_ms, kind, app_id, app_display_name, device_display_name, profile_tag, lang, data FROM userapi_pushers WHERE localpart = $1" diff --git a/userapi/storage/postgres/storage.go b/userapi/storage/postgres/storage.go index c059e3e60..de97e60b1 100644 --- a/userapi/storage/postgres/storage.go +++ b/userapi/storage/postgres/storage.go @@ -42,6 +42,10 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, Version: "userapi: rename tables", Up: deltas.UpRenameTables, Down: deltas.DownRenameTables, + }, sqlutil.Migration{ + Version: "userapi: unique pushers", + Up: deltas.UpUniquePusher, + Down: deltas.DownUniquePusher, }) if err = m.Up(base.Context()); err != nil { return nil, err