diff --git a/syncapi/storage/postgres/presence_table.go b/syncapi/storage/postgres/presence_table.go index 7194afea6..1ba44ad0d 100644 --- a/syncapi/storage/postgres/presence_table.go +++ b/syncapi/storage/postgres/presence_table.go @@ -76,12 +76,22 @@ const selectPresenceAfter = "" + " WHERE id > $1 AND last_active_ts >= $2" + " ORDER BY id ASC LIMIT $3" +const expirePresenceSQL = `UPDATE syncapi_presence SET + id = nextval('syncapi_presence_id'), + presence = 3 +WHERE + to_timestamp(last_active_ts / 1000) < NOW() - INTERVAL '5 minutes' +AND + presence != 3 +` + type presenceStatements struct { upsertPresenceStmt *sql.Stmt upsertPresenceFromSyncStmt *sql.Stmt selectPresenceForUsersStmt *sql.Stmt selectMaxPresenceStmt *sql.Stmt selectPresenceAfterStmt *sql.Stmt + expirePresenceStmt *sql.Stmt } func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) { @@ -96,6 +106,7 @@ func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) { {&s.selectPresenceForUsersStmt, selectPresenceForUserSQL}, {&s.selectMaxPresenceStmt, selectMaxPresenceSQL}, {&s.selectPresenceAfterStmt, selectPresenceAfter}, + {&s.expirePresenceStmt, expirePresenceSQL}, }.Prepare(db) } @@ -166,3 +177,10 @@ func (p *presenceStatements) GetPresenceAfter( } return presences, rows.Err() } + +func (p *presenceStatements) ExpirePresence( + ctx context.Context, +) error { + _, err := p.expirePresenceStmt.Exec() + return err +} diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 9cfe7c070..bf47610a4 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -16,7 +16,9 @@ package postgres import ( + "context" "database/sql" + "time" // Import the postgres database driver. _ "github.com/lib/pq" @@ -25,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/shared" + "github.com/sirupsen/logrus" ) // SyncServerDatasource represents a sync server datasource which manages @@ -122,5 +125,17 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) Ignores: ignores, Presence: presence, } + go func() { + ctx := context.Background() + for { + err := d.Database.Presence.ExpirePresence(ctx) + if err != nil { + logrus.WithError(err).Error("failed to expire presence") + } else { + logrus.Info("expired presence") + } + time.Sleep(time.Minute) + } + }() return &d, nil } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index ec5edd355..401b69f53 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1067,3 +1067,7 @@ func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) { return s.Presence.GetMaxPresenceID(ctx, nil) } + +func (s *Database) ExpirePresence(ctx context.Context) error { + return s.Presence.ExpirePresence(ctx) +} diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go index b61a825df..55ab28784 100644 --- a/syncapi/storage/sqlite3/presence_table.go +++ b/syncapi/storage/sqlite3/presence_table.go @@ -180,3 +180,10 @@ func (p *presenceStatements) GetPresenceAfter( } return presences, rows.Err() } + +func (p *presenceStatements) ExpirePresence( + ctx context.Context, +) error { + // TODO implement + return nil +} diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index ccdebfdbd..37884f510 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -191,4 +191,5 @@ type Presence interface { GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (presences map[string]*types.PresenceInternal, err error) + ExpirePresence(ctx context.Context) error }