mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-04 20:53:09 -06:00
Only send recent presence updates (last 5 minutes) when doing a complete sync
This commit is contained in:
parent
e8be2b234f
commit
9953df0e8b
|
|
@ -161,4 +161,5 @@ type Presence interface {
|
||||||
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
|
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
|
||||||
PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error)
|
PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error)
|
||||||
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
|
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
|
||||||
|
RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, error)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ package postgres
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -74,12 +75,18 @@ const selectPresenceAfter = "" +
|
||||||
" FROM syncapi_presence" +
|
" FROM syncapi_presence" +
|
||||||
" WHERE id > $1"
|
" WHERE id > $1"
|
||||||
|
|
||||||
|
const selectRecentPresence = "" +
|
||||||
|
" SELECT id, user_id, presence, status_msg, last_active_ts" +
|
||||||
|
" FROM syncapi_presence" +
|
||||||
|
" WHERE last_active_ts >= $1"
|
||||||
|
|
||||||
type presenceStatements struct {
|
type presenceStatements struct {
|
||||||
upsertPresenceStmt *sql.Stmt
|
upsertPresenceStmt *sql.Stmt
|
||||||
upsertPresenceFromSyncStmt *sql.Stmt
|
upsertPresenceFromSyncStmt *sql.Stmt
|
||||||
selectPresenceForUsersStmt *sql.Stmt
|
selectPresenceForUsersStmt *sql.Stmt
|
||||||
selectMaxPresenceStmt *sql.Stmt
|
selectMaxPresenceStmt *sql.Stmt
|
||||||
selectPresenceAfterStmt *sql.Stmt
|
selectPresenceAfterStmt *sql.Stmt
|
||||||
|
selectRecentPresenceStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) {
|
func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) {
|
||||||
|
|
@ -94,6 +101,7 @@ func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) {
|
||||||
{&s.selectPresenceForUsersStmt, selectPresenceForUserSQL},
|
{&s.selectPresenceForUsersStmt, selectPresenceForUserSQL},
|
||||||
{&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
|
{&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
|
||||||
{&s.selectPresenceAfterStmt, selectPresenceAfter},
|
{&s.selectPresenceAfterStmt, selectPresenceAfter},
|
||||||
|
{&s.selectRecentPresenceStmt, selectRecentPresence},
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -160,3 +168,28 @@ func (p *presenceStatements) GetPresenceAfter(
|
||||||
}
|
}
|
||||||
return presences, rows.Err()
|
return presences, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetRecentPresence gets updates from the last five minutes.
|
||||||
|
func (p *presenceStatements) GetRecentPresence(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
) (presences map[string]*types.PresenceInternal, err error) {
|
||||||
|
presences = make(map[string]*types.PresenceInternal)
|
||||||
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
|
||||||
|
|
||||||
|
sinceTS := gomatrixserverlib.AsTimestamp(time.Now().Add(time.Minute * -5))
|
||||||
|
|
||||||
|
rows, err := stmt.QueryContext(ctx, sinceTS)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
|
||||||
|
for rows.Next() {
|
||||||
|
qryRes := &types.PresenceInternal{}
|
||||||
|
if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
qryRes.ClientFields.Presence = qryRes.Presence.String()
|
||||||
|
presences[qryRes.UserID] = qryRes
|
||||||
|
}
|
||||||
|
return presences, rows.Err()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1060,6 +1060,10 @@ func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition
|
||||||
return s.Presence.GetPresenceAfter(ctx, nil, after)
|
return s.Presence.GetPresenceAfter(ctx, nil, after)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Database) RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, error) {
|
||||||
|
return s.Presence.GetRecentPresence(ctx, nil)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
|
func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
|
||||||
return s.Presence.GetMaxPresenceID(ctx, nil)
|
return s.Presence.GetMaxPresenceID(ctx, nil)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ package sqlite3
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -73,6 +74,11 @@ const selectPresenceAfter = "" +
|
||||||
" FROM syncapi_presence" +
|
" FROM syncapi_presence" +
|
||||||
" WHERE id > $1"
|
" WHERE id > $1"
|
||||||
|
|
||||||
|
const selectRecentPresence = "" +
|
||||||
|
" SELECT id, user_id, presence, status_msg, last_active_ts" +
|
||||||
|
" FROM syncapi_presence" +
|
||||||
|
" WHERE last_active_ts >= $1"
|
||||||
|
|
||||||
type presenceStatements struct {
|
type presenceStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
streamIDStatements *StreamIDStatements
|
streamIDStatements *StreamIDStatements
|
||||||
|
|
@ -81,6 +87,7 @@ type presenceStatements struct {
|
||||||
selectPresenceForUsersStmt *sql.Stmt
|
selectPresenceForUsersStmt *sql.Stmt
|
||||||
selectMaxPresenceStmt *sql.Stmt
|
selectMaxPresenceStmt *sql.Stmt
|
||||||
selectPresenceAfterStmt *sql.Stmt
|
selectPresenceAfterStmt *sql.Stmt
|
||||||
|
selectRecentPresenceStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqlitePresenceTable(db *sql.DB, streamID *StreamIDStatements) (*presenceStatements, error) {
|
func NewSqlitePresenceTable(db *sql.DB, streamID *StreamIDStatements) (*presenceStatements, error) {
|
||||||
|
|
@ -98,6 +105,7 @@ func NewSqlitePresenceTable(db *sql.DB, streamID *StreamIDStatements) (*presence
|
||||||
{&s.selectPresenceForUsersStmt, selectPresenceForUserSQL},
|
{&s.selectPresenceForUsersStmt, selectPresenceForUserSQL},
|
||||||
{&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
|
{&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
|
||||||
{&s.selectPresenceAfterStmt, selectPresenceAfter},
|
{&s.selectPresenceAfterStmt, selectPresenceAfter},
|
||||||
|
{&s.selectRecentPresenceStmt, selectRecentPresence},
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -175,3 +183,28 @@ func (p *presenceStatements) GetPresenceAfter(
|
||||||
}
|
}
|
||||||
return presences, rows.Err()
|
return presences, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetRecentPresence gets updates from the last five minutes.
|
||||||
|
func (p *presenceStatements) GetRecentPresence(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
) (presences map[string]*types.PresenceInternal, err error) {
|
||||||
|
presences = make(map[string]*types.PresenceInternal)
|
||||||
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
|
||||||
|
|
||||||
|
sinceTS := gomatrixserverlib.AsTimestamp(time.Now().Add(time.Minute * -5))
|
||||||
|
|
||||||
|
rows, err := stmt.QueryContext(ctx, sinceTS)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
|
||||||
|
for rows.Next() {
|
||||||
|
qryRes := &types.PresenceInternal{}
|
||||||
|
if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
qryRes.ClientFields.Presence = qryRes.Presence.String()
|
||||||
|
presences[qryRes.UserID] = qryRes
|
||||||
|
}
|
||||||
|
return presences, rows.Err()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -189,4 +189,5 @@ type Presence interface {
|
||||||
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
|
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
|
||||||
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
|
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
|
||||||
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.PresenceInternal, err error)
|
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.PresenceInternal, err error)
|
||||||
|
GetRecentPresence(ctx context.Context, txn *sql.Tx) (presences map[string]*types.PresenceInternal, err error)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,44 @@ func (p *PresenceStreamProvider) CompleteSync(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req *types.SyncRequest,
|
req *types.SyncRequest,
|
||||||
) types.StreamPosition {
|
) types.StreamPosition {
|
||||||
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
|
presences, err := p.DB.RecentPresence(ctx)
|
||||||
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("p.DB.RecentPresence failed")
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range presences {
|
||||||
|
presence := presences[i]
|
||||||
|
// Ignore users we don't share a room with
|
||||||
|
if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cacheKey := req.Device.UserID + req.Device.ID + presence.UserID
|
||||||
|
|
||||||
|
if _, known := types.PresenceFromString(presence.ClientFields.Presence); known {
|
||||||
|
presence.ClientFields.LastActiveAgo = presence.LastActiveAgo()
|
||||||
|
if presence.ClientFields.Presence == "online" {
|
||||||
|
currentlyActive := presence.CurrentlyActive()
|
||||||
|
presence.ClientFields.CurrentlyActive = ¤tlyActive
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
presence.ClientFields.Presence = "offline"
|
||||||
|
}
|
||||||
|
|
||||||
|
content, err := json.Marshal(presence.ClientFields)
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Response.Presence.Events = append(req.Response.Presence.Events, gomatrixserverlib.ClientEvent{
|
||||||
|
Content: content,
|
||||||
|
Sender: presence.UserID,
|
||||||
|
Type: gomatrixserverlib.MPresence,
|
||||||
|
})
|
||||||
|
p.cache.Store(cacheKey, presence)
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.LatestPosition(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PresenceStreamProvider) IncrementalSync(
|
func (p *PresenceStreamProvider) IncrementalSync(
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue