From 9d34f9738c7fcf6a17ac256e21d5f82abb6fda52 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 26 Apr 2022 14:33:45 +0100 Subject: [PATCH] Maybe placate the tests --- syncapi/storage/interface.go | 2 +- syncapi/storage/postgres/presence_table.go | 13 ++-- syncapi/storage/shared/syncserver.go | 2 +- syncapi/storage/sqlite3/presence_table.go | 13 ++-- syncapi/storage/tables/interface.go | 2 +- syncapi/streams/stream_presence.go | 88 +++++++++------------- syncapi/sync/requestpool_test.go | 4 +- 7 files changed, 55 insertions(+), 69 deletions(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 91ed8eb72..0a6711e5d 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -161,5 +161,5 @@ type Presence interface { GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) - RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, error) + RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, types.StreamPosition, error) } diff --git a/syncapi/storage/postgres/presence_table.go b/syncapi/storage/postgres/presence_table.go index 5f66fb4b1..34d3e9025 100644 --- a/syncapi/storage/postgres/presence_table.go +++ b/syncapi/storage/postgres/presence_table.go @@ -172,7 +172,7 @@ func (p *presenceStatements) GetPresenceAfter( // GetRecentPresence gets updates from the last five minutes. func (p *presenceStatements) GetRecentPresence( ctx context.Context, txn *sql.Tx, -) (presences map[string]*types.PresenceInternal, err error) { +) (presences map[string]*types.PresenceInternal, latest types.StreamPosition, err error) { presences = make(map[string]*types.PresenceInternal) stmt := sqlutil.TxStmt(txn, p.selectRecentPresenceStmt) @@ -180,16 +180,19 @@ func (p *presenceStatements) GetRecentPresence( rows, err := stmt.QueryContext(ctx, sinceTS) if err != nil { - return nil, err + return nil, 0, err } - defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows") + defer internal.CloseAndLogIfError(ctx, rows, "GetRecentPresence: failed to close rows") for rows.Next() { qryRes := &types.PresenceInternal{} if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil { - return nil, err + return nil, 0, err } qryRes.ClientFields.Presence = qryRes.Presence.String() presences[qryRes.UserID] = qryRes + if qryRes.StreamPos > latest { + latest = qryRes.StreamPos + } } - return presences, rows.Err() + return presences, latest, rows.Err() } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 2d16faeec..97d8236b5 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1060,7 +1060,7 @@ func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition return s.Presence.GetPresenceAfter(ctx, nil, after) } -func (s *Database) RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, error) { +func (s *Database) RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, types.StreamPosition, error) { return s.Presence.GetRecentPresence(ctx, nil) } diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go index 848c38fe9..2f6a70b2f 100644 --- a/syncapi/storage/sqlite3/presence_table.go +++ b/syncapi/storage/sqlite3/presence_table.go @@ -187,7 +187,7 @@ func (p *presenceStatements) GetPresenceAfter( // GetRecentPresence gets updates from the last five minutes. func (p *presenceStatements) GetRecentPresence( ctx context.Context, txn *sql.Tx, -) (presences map[string]*types.PresenceInternal, err error) { +) (presences map[string]*types.PresenceInternal, latest types.StreamPosition, err error) { presences = make(map[string]*types.PresenceInternal) stmt := sqlutil.TxStmt(txn, p.selectRecentPresenceStmt) @@ -195,16 +195,19 @@ func (p *presenceStatements) GetRecentPresence( rows, err := stmt.QueryContext(ctx, sinceTS) if err != nil { - return nil, err + return nil, 0, err } - defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows") + defer internal.CloseAndLogIfError(ctx, rows, "GetRecentPresence: failed to close rows") for rows.Next() { qryRes := &types.PresenceInternal{} if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil { - return nil, err + return nil, 0, err } qryRes.ClientFields.Presence = qryRes.Presence.String() presences[qryRes.UserID] = qryRes + if qryRes.StreamPos > latest { + latest = qryRes.StreamPos + } } - return presences, rows.Err() + return presences, latest, rows.Err() } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index abc6f325b..f4ab33ce7 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -189,5 +189,5 @@ type Presence interface { GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.PresenceInternal, err error) - GetRecentPresence(ctx context.Context, txn *sql.Tx) (presences map[string]*types.PresenceInternal, err error) + GetRecentPresence(ctx context.Context, txn *sql.Tx) (presences map[string]*types.PresenceInternal, latest types.StreamPosition, err error) } diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index 23d27b3fd..11316fa09 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "encoding/json" + "fmt" "sync" "github.com/matrix-org/dendrite/syncapi/notifier" @@ -46,44 +47,18 @@ func (p *PresenceStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.StreamPosition { - presences, err := p.DB.RecentPresence(ctx) + presences, latest, err := p.DB.RecentPresence(ctx) if err != nil { req.Log.WithError(err).Error("p.DB.RecentPresence failed") return 0 } - - for i := range presences { - presence := presences[i] - // Ignore users we don't share a room with - if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) { - continue - } - cacheKey := req.Device.UserID + req.Device.ID + presence.UserID - - if _, known := types.PresenceFromString(presence.ClientFields.Presence); known { - presence.ClientFields.LastActiveAgo = presence.LastActiveAgo() - if presence.ClientFields.Presence == "online" { - currentlyActive := presence.CurrentlyActive() - presence.ClientFields.CurrentlyActive = ¤tlyActive - } - } else { - presence.ClientFields.Presence = "offline" - } - - content, err := json.Marshal(presence.ClientFields) - if err != nil { - return 0 - } - - req.Response.Presence.Events = append(req.Response.Presence.Events, gomatrixserverlib.ClientEvent{ - Content: content, - Sender: presence.UserID, - Type: gomatrixserverlib.MPresence, - }) - p.cache.Store(cacheKey, presence) + if len(presences) == 0 { + return latest } - - return p.LatestPosition(ctx) + if err := p.populatePresence(ctx, req, presences, true); err != nil { + return 0 + } + return latest } func (p *PresenceStreamProvider) IncrementalSync( @@ -96,19 +71,28 @@ func (p *PresenceStreamProvider) IncrementalSync( req.Log.WithError(err).Error("p.DB.PresenceAfter failed") return from } - if len(presences) == 0 { return to } + if err := p.populatePresence(ctx, req, presences, false); err != nil { + return from + } + return to +} +func (p *PresenceStreamProvider) populatePresence( + ctx context.Context, + req *types.SyncRequest, + presences map[string]*types.PresenceInternal, + ignoreCache bool, +) error { // add newly joined rooms user presences newlyJoined := joinedRooms(req.Response, req.Device.UserID) if len(newlyJoined) > 0 { // TODO: This refreshes all lists and is quite expensive // The notifier should update the lists itself - if err = p.notifier.Load(ctx, p.DB); err != nil { - req.Log.WithError(err).Error("unable to refresh notifier lists") - return from + if err := p.notifier.Load(ctx, p.DB); err != nil { + return err } for _, roomID := range newlyJoined { roomUsers := p.notifier.JoinedUsers(roomID) @@ -117,19 +101,18 @@ func (p *PresenceStreamProvider) IncrementalSync( if _, ok := presences[roomUsers[i]]; ok { continue } + var err error presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i]) if err != nil { if err == sql.ErrNoRows { continue } - req.Log.WithError(err).Error("unable to query presence for user") - return from + return err } } } } - lastPos := to for i := range presences { presence := presences[i] // Ignore users we don't share a room with @@ -137,15 +120,15 @@ func (p *PresenceStreamProvider) IncrementalSync( continue } cacheKey := req.Device.UserID + req.Device.ID + presence.UserID - pres, ok := p.cache.Load(cacheKey) - if ok { - // skip already sent presence - prevPresence := pres.(*types.PresenceInternal) - currentlyActive := prevPresence.CurrentlyActive() - skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID - if skip { - req.Log.Debugf("Skipping presence, no change (%s)", presence.UserID) - continue + if !ignoreCache { + pres, ok := p.cache.Load(cacheKey) + if ok { + // skip already sent presence + prevPresence := pres.(*types.PresenceInternal) + currentlyActive := prevPresence.CurrentlyActive() + if prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID { + continue + } } } @@ -161,7 +144,7 @@ func (p *PresenceStreamProvider) IncrementalSync( content, err := json.Marshal(presence.ClientFields) if err != nil { - return from + return fmt.Errorf("json.Unmarshal: %w", err) } req.Response.Presence.Events = append(req.Response.Presence.Events, gomatrixserverlib.ClientEvent{ @@ -169,13 +152,10 @@ func (p *PresenceStreamProvider) IncrementalSync( Sender: presence.UserID, Type: gomatrixserverlib.MPresence, }) - if presence.StreamPos > lastPos { - lastPos = presence.StreamPos - } p.cache.Store(cacheKey, presence) } - return lastPos + return nil } func joinedRooms(res *types.Response, userID string) []string { diff --git a/syncapi/sync/requestpool_test.go b/syncapi/sync/requestpool_test.go index 1c6c9558b..8e75f979c 100644 --- a/syncapi/sync/requestpool_test.go +++ b/syncapi/sync/requestpool_test.go @@ -34,8 +34,8 @@ func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition) return map[string]*types.PresenceInternal{}, nil } -func (d dummyDB) RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, error) { - return map[string]*types.PresenceInternal{}, nil +func (d dummyDB) RecentPresence(ctx context.Context) (map[string]*types.PresenceInternal, types.StreamPosition, error) { + return map[string]*types.PresenceInternal{}, 0, nil } func (d dummyDB) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {