diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 0fea88da6..91ed8eb72 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -161,4 +161,5 @@ type Presence interface { GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) + RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, error) } diff --git a/syncapi/storage/postgres/presence_table.go b/syncapi/storage/postgres/presence_table.go index 49336c4eb..1efe88a4f 100644 --- a/syncapi/storage/postgres/presence_table.go +++ b/syncapi/storage/postgres/presence_table.go @@ -17,6 +17,7 @@ package postgres import ( "context" "database/sql" + "time" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -74,12 +75,18 @@ const selectPresenceAfter = "" + " FROM syncapi_presence" + " WHERE id > $1" +const selectRecentPresence = "" + + " SELECT id, user_id, presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE last_active_ts >= $1" + type presenceStatements struct { upsertPresenceStmt *sql.Stmt upsertPresenceFromSyncStmt *sql.Stmt selectPresenceForUsersStmt *sql.Stmt selectMaxPresenceStmt *sql.Stmt selectPresenceAfterStmt *sql.Stmt + selectRecentPresenceStmt *sql.Stmt } func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) { @@ -94,6 +101,7 @@ func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) { {&s.selectPresenceForUsersStmt, selectPresenceForUserSQL}, {&s.selectMaxPresenceStmt, selectMaxPresenceSQL}, {&s.selectPresenceAfterStmt, selectPresenceAfter}, + {&s.selectRecentPresenceStmt, selectRecentPresence}, }.Prepare(db) } @@ -160,3 +168,28 @@ func (p *presenceStatements) GetPresenceAfter( } return presences, rows.Err() } + +// GetRecentPresence gets updates from the last five minutes. +func (p *presenceStatements) GetRecentPresence( + ctx context.Context, txn *sql.Tx, +) (presences map[string]*types.PresenceInternal, err error) { + presences = make(map[string]*types.PresenceInternal) + stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt) + + sinceTS := gomatrixserverlib.AsTimestamp(time.Now().Add(time.Minute * -5)) + + rows, err := stmt.QueryContext(ctx, sinceTS) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows") + for rows.Next() { + qryRes := &types.PresenceInternal{} + if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil { + return nil, err + } + qryRes.ClientFields.Presence = qryRes.Presence.String() + presences[qryRes.UserID] = qryRes + } + return presences, rows.Err() +} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 3c431db48..2d16faeec 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1060,6 +1060,10 @@ func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition return s.Presence.GetPresenceAfter(ctx, nil, after) } +func (s *Database) RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, error) { + return s.Presence.GetRecentPresence(ctx, nil) +} + func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) { return s.Presence.GetMaxPresenceID(ctx, nil) } diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go index 00b16458d..36eabe175 100644 --- a/syncapi/storage/sqlite3/presence_table.go +++ b/syncapi/storage/sqlite3/presence_table.go @@ -17,6 +17,7 @@ package sqlite3 import ( "context" "database/sql" + "time" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -73,6 +74,11 @@ const selectPresenceAfter = "" + " FROM syncapi_presence" + " WHERE id > $1" +const selectRecentPresence = "" + + " SELECT id, user_id, presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE last_active_ts >= $1" + type presenceStatements struct { db *sql.DB streamIDStatements *StreamIDStatements @@ -81,6 +87,7 @@ type presenceStatements struct { selectPresenceForUsersStmt *sql.Stmt selectMaxPresenceStmt *sql.Stmt selectPresenceAfterStmt *sql.Stmt + selectRecentPresenceStmt *sql.Stmt } func NewSqlitePresenceTable(db *sql.DB, streamID *StreamIDStatements) (*presenceStatements, error) { @@ -98,6 +105,7 @@ func NewSqlitePresenceTable(db *sql.DB, streamID *StreamIDStatements) (*presence {&s.selectPresenceForUsersStmt, selectPresenceForUserSQL}, {&s.selectMaxPresenceStmt, selectMaxPresenceSQL}, {&s.selectPresenceAfterStmt, selectPresenceAfter}, + {&s.selectRecentPresenceStmt, selectRecentPresence}, }.Prepare(db) } @@ -175,3 +183,28 @@ func (p *presenceStatements) GetPresenceAfter( } return presences, rows.Err() } + +// GetRecentPresence gets updates from the last five minutes. +func (p *presenceStatements) GetRecentPresence( + ctx context.Context, txn *sql.Tx, +) (presences map[string]*types.PresenceInternal, err error) { + presences = make(map[string]*types.PresenceInternal) + stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt) + + sinceTS := gomatrixserverlib.AsTimestamp(time.Now().Add(time.Minute * -5)) + + rows, err := stmt.QueryContext(ctx, sinceTS) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows") + for rows.Next() { + qryRes := &types.PresenceInternal{} + if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil { + return nil, err + } + qryRes.ClientFields.Presence = qryRes.Presence.String() + presences[qryRes.UserID] = qryRes + } + return presences, rows.Err() +} diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index ac713dd5c..abc6f325b 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -189,4 +189,5 @@ type Presence interface { GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.PresenceInternal, err error) + GetRecentPresence(ctx context.Context, txn *sql.Tx) (presences map[string]*types.PresenceInternal, err error) } diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 9a6c5c130..23d27b3fd 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -46,7 +46,44 @@ func (p *PresenceStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.StreamPosition { - return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) + presences, err := p.DB.RecentPresence(ctx) + if err != nil { + req.Log.WithError(err).Error("p.DB.RecentPresence failed") + return 0 + } + + for i := range presences { + presence := presences[i] + // Ignore users we don't share a room with + if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) { + continue + } + cacheKey := req.Device.UserID + req.Device.ID + presence.UserID + + if _, known := types.PresenceFromString(presence.ClientFields.Presence); known { + presence.ClientFields.LastActiveAgo = presence.LastActiveAgo() + if presence.ClientFields.Presence == "online" { + currentlyActive := presence.CurrentlyActive() + presence.ClientFields.CurrentlyActive = ¤tlyActive + } + } else { + presence.ClientFields.Presence = "offline" + } + + content, err := json.Marshal(presence.ClientFields) + if err != nil { + return 0 + } + + req.Response.Presence.Events = append(req.Response.Presence.Events, gomatrixserverlib.ClientEvent{ + Content: content, + Sender: presence.UserID, + Type: gomatrixserverlib.MPresence, + }) + p.cache.Store(cacheKey, presence) + } + + return p.LatestPosition(ctx) } func (p *PresenceStreamProvider) IncrementalSync(