Maybe placate the tests

This commit is contained in:
Neil Alexander 2022-04-26 14:33:45 +01:00
parent 86876781e7
commit 9d34f9738c
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
7 changed files with 55 additions and 69 deletions

View file

@ -161,5 +161,5 @@ type Presence interface {
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error)
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, error)
RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, types.StreamPosition, error)
}

View file

@ -172,7 +172,7 @@ func (p *presenceStatements) GetPresenceAfter(
// GetRecentPresence gets updates from the last five minutes.
func (p *presenceStatements) GetRecentPresence(
ctx context.Context, txn *sql.Tx,
) (presences map[string]*types.PresenceInternal, err error) {
) (presences map[string]*types.PresenceInternal, latest types.StreamPosition, err error) {
presences = make(map[string]*types.PresenceInternal)
stmt := sqlutil.TxStmt(txn, p.selectRecentPresenceStmt)
@ -180,16 +180,19 @@ func (p *presenceStatements) GetRecentPresence(
rows, err := stmt.QueryContext(ctx, sinceTS)
if err != nil {
return nil, err
return nil, 0, err
}
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
defer internal.CloseAndLogIfError(ctx, rows, "GetRecentPresence: failed to close rows")
for rows.Next() {
qryRes := &types.PresenceInternal{}
if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
return nil, err
return nil, 0, err
}
qryRes.ClientFields.Presence = qryRes.Presence.String()
presences[qryRes.UserID] = qryRes
if qryRes.StreamPos > latest {
latest = qryRes.StreamPos
}
}
return presences, rows.Err()
return presences, latest, rows.Err()
}

View file

@ -1060,7 +1060,7 @@ func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition
return s.Presence.GetPresenceAfter(ctx, nil, after)
}
func (s *Database) RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, error) {
func (s *Database) RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, types.StreamPosition, error) {
return s.Presence.GetRecentPresence(ctx, nil)
}

View file

@ -187,7 +187,7 @@ func (p *presenceStatements) GetPresenceAfter(
// GetRecentPresence gets updates from the last five minutes.
func (p *presenceStatements) GetRecentPresence(
ctx context.Context, txn *sql.Tx,
) (presences map[string]*types.PresenceInternal, err error) {
) (presences map[string]*types.PresenceInternal, latest types.StreamPosition, err error) {
presences = make(map[string]*types.PresenceInternal)
stmt := sqlutil.TxStmt(txn, p.selectRecentPresenceStmt)
@ -195,16 +195,19 @@ func (p *presenceStatements) GetRecentPresence(
rows, err := stmt.QueryContext(ctx, sinceTS)
if err != nil {
return nil, err
return nil, 0, err
}
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
defer internal.CloseAndLogIfError(ctx, rows, "GetRecentPresence: failed to close rows")
for rows.Next() {
qryRes := &types.PresenceInternal{}
if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
return nil, err
return nil, 0, err
}
qryRes.ClientFields.Presence = qryRes.Presence.String()
presences[qryRes.UserID] = qryRes
if qryRes.StreamPos > latest {
latest = qryRes.StreamPos
}
}
return presences, rows.Err()
return presences, latest, rows.Err()
}

View file

@ -189,5 +189,5 @@ type Presence interface {
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.PresenceInternal, err error)
GetRecentPresence(ctx context.Context, txn *sql.Tx) (presences map[string]*types.PresenceInternal, err error)
GetRecentPresence(ctx context.Context, txn *sql.Tx) (presences map[string]*types.PresenceInternal, latest types.StreamPosition, err error)
}

View file

@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sync"
"github.com/matrix-org/dendrite/syncapi/notifier"
@ -46,44 +47,18 @@ func (p *PresenceStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
presences, err := p.DB.RecentPresence(ctx)
presences, latest, err := p.DB.RecentPresence(ctx)
if err != nil {
req.Log.WithError(err).Error("p.DB.RecentPresence failed")
return 0
}
for i := range presences {
presence := presences[i]
// Ignore users we don't share a room with
if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) {
continue
}
cacheKey := req.Device.UserID + req.Device.ID + presence.UserID
if _, known := types.PresenceFromString(presence.ClientFields.Presence); known {
presence.ClientFields.LastActiveAgo = presence.LastActiveAgo()
if presence.ClientFields.Presence == "online" {
currentlyActive := presence.CurrentlyActive()
presence.ClientFields.CurrentlyActive = &currentlyActive
}
} else {
presence.ClientFields.Presence = "offline"
}
content, err := json.Marshal(presence.ClientFields)
if err != nil {
return 0
}
req.Response.Presence.Events = append(req.Response.Presence.Events, gomatrixserverlib.ClientEvent{
Content: content,
Sender: presence.UserID,
Type: gomatrixserverlib.MPresence,
})
p.cache.Store(cacheKey, presence)
if len(presences) == 0 {
return latest
}
return p.LatestPosition(ctx)
if err := p.populatePresence(ctx, req, presences, true); err != nil {
return 0
}
return latest
}
func (p *PresenceStreamProvider) IncrementalSync(
@ -96,19 +71,28 @@ func (p *PresenceStreamProvider) IncrementalSync(
req.Log.WithError(err).Error("p.DB.PresenceAfter failed")
return from
}
if len(presences) == 0 {
return to
}
if err := p.populatePresence(ctx, req, presences, false); err != nil {
return from
}
return to
}
func (p *PresenceStreamProvider) populatePresence(
ctx context.Context,
req *types.SyncRequest,
presences map[string]*types.PresenceInternal,
ignoreCache bool,
) error {
// add newly joined rooms user presences
newlyJoined := joinedRooms(req.Response, req.Device.UserID)
if len(newlyJoined) > 0 {
// TODO: This refreshes all lists and is quite expensive
// The notifier should update the lists itself
if err = p.notifier.Load(ctx, p.DB); err != nil {
req.Log.WithError(err).Error("unable to refresh notifier lists")
return from
if err := p.notifier.Load(ctx, p.DB); err != nil {
return err
}
for _, roomID := range newlyJoined {
roomUsers := p.notifier.JoinedUsers(roomID)
@ -117,19 +101,18 @@ func (p *PresenceStreamProvider) IncrementalSync(
if _, ok := presences[roomUsers[i]]; ok {
continue
}
var err error
presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i])
if err != nil {
if err == sql.ErrNoRows {
continue
}
req.Log.WithError(err).Error("unable to query presence for user")
return from
return err
}
}
}
}
lastPos := to
for i := range presences {
presence := presences[i]
// Ignore users we don't share a room with
@ -137,15 +120,15 @@ func (p *PresenceStreamProvider) IncrementalSync(
continue
}
cacheKey := req.Device.UserID + req.Device.ID + presence.UserID
pres, ok := p.cache.Load(cacheKey)
if ok {
// skip already sent presence
prevPresence := pres.(*types.PresenceInternal)
currentlyActive := prevPresence.CurrentlyActive()
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
if skip {
req.Log.Debugf("Skipping presence, no change (%s)", presence.UserID)
continue
if !ignoreCache {
pres, ok := p.cache.Load(cacheKey)
if ok {
// skip already sent presence
prevPresence := pres.(*types.PresenceInternal)
currentlyActive := prevPresence.CurrentlyActive()
if prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID {
continue
}
}
}
@ -161,7 +144,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
content, err := json.Marshal(presence.ClientFields)
if err != nil {
return from
return fmt.Errorf("json.Unmarshal: %w", err)
}
req.Response.Presence.Events = append(req.Response.Presence.Events, gomatrixserverlib.ClientEvent{
@ -169,13 +152,10 @@ func (p *PresenceStreamProvider) IncrementalSync(
Sender: presence.UserID,
Type: gomatrixserverlib.MPresence,
})
if presence.StreamPos > lastPos {
lastPos = presence.StreamPos
}
p.cache.Store(cacheKey, presence)
}
return lastPos
return nil
}
func joinedRooms(res *types.Response, userID string) []string {

View file

@ -34,8 +34,8 @@ func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition)
return map[string]*types.PresenceInternal{}, nil
}
func (d dummyDB) RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, error) {
return map[string]*types.PresenceInternal{}, nil
func (d dummyDB) RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, types.StreamPosition, error) {
return map[string]*types.PresenceInternal{}, 0, nil
}
func (d dummyDB) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {