Don't create fictitious presence entries for users that don't have any

This commit is contained in:
Neil Alexander 2022-04-27 10:33:53 +01:00
parent 6c5c6d73d7
commit d3abf04d47
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 25 additions and 16 deletions

View file

@ -127,6 +127,9 @@ func (p *presenceStatements) GetPresenceForUser(
} }
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS) err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
if err == sql.ErrNoRows {
return nil, nil
}
result.ClientFields.Presence = result.Presence.String() result.ClientFields.Presence = result.Presence.String()
return result, err return result, err
} }

View file

@ -142,6 +142,9 @@ func (p *presenceStatements) GetPresenceForUser(
} }
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS) err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
if err == sql.ErrNoRows {
return nil, nil
}
result.ClientFields.Presence = result.Presence.String() result.ClientFields.Presence = result.Presence.String()
return result, err return result, err
} }

View file

@ -16,7 +16,6 @@ package streams
import ( import (
"context" "context"
"database/sql"
"encoding/json" "encoding/json"
"sync" "sync"
@ -80,11 +79,10 @@ func (p *PresenceStreamProvider) IncrementalSync(
if _, ok := presences[roomUsers[i]]; ok { if _, ok := presences[roomUsers[i]]; ok {
continue continue
} }
// Bear in mind that this might return nil, but at least populating
// a nil means that there's a map entry so we won't repeat this call.
presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i]) presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i])
if err != nil { if err != nil {
if err == sql.ErrNoRows {
continue
}
req.Log.WithError(err).Error("unable to query presence for user") req.Log.WithError(err).Error("unable to query presence for user")
return from return from
} }
@ -93,8 +91,10 @@ func (p *PresenceStreamProvider) IncrementalSync(
} }
lastPos := to lastPos := to
for i := range presences { for _, presence := range presences {
presence := presences[i] if presence == nil {
continue
}
// Ignore users we don't share a room with // Ignore users we don't share a room with
if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) { if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) {
continue continue

View file

@ -127,14 +127,23 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
if !ok { // this should almost never happen if !ok { // this should almost never happen
return return
} }
newPresence := types.PresenceInternal{ newPresence := types.PresenceInternal{
ClientFields: types.PresenceClientResponse{
Presence: presenceID.String(),
},
Presence: presenceID, Presence: presenceID,
UserID: userID, UserID: userID,
LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()), LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
} }
// ensure we also send the current status_msg to federated servers and not nil
dbPresence, err := db.GetPresence(context.Background(), userID)
if err != nil && err != sql.ErrNoRows {
return
}
if dbPresence != nil {
newPresence.ClientFields = dbPresence.ClientFields
}
newPresence.ClientFields.Presence = presenceID.String()
defer rp.presence.Store(userID, newPresence) defer rp.presence.Store(userID, newPresence)
// avoid spamming presence updates when syncing // avoid spamming presence updates when syncing
existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence) existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
@ -145,13 +154,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
} }
} }
// ensure we also send the current status_msg to federated servers and not nil if err := rp.producer.SendPresence(userID, presenceID, newPresence.ClientFields.StatusMsg); err != nil {
dbPresence, err := db.GetPresence(context.Background(), userID)
if err != nil && err != sql.ErrNoRows {
return
}
if err := rp.producer.SendPresence(userID, presenceID, dbPresence.ClientFields.StatusMsg); err != nil {
logrus.WithError(err).Error("Unable to publish presence message from sync") logrus.WithError(err).Error("Unable to publish presence message from sync")
return return
} }