Don't create fictitious presence entries (#2381)
* Don't create fictitious presence entries for users that don't have any * Update whitelist, since that test probably shouldn't be passing * Fix panics
This commit is contained in:
parent
6c5c6d73d7
commit
66b397b3c6
|
@ -88,6 +88,11 @@ func (s *PresenceConsumer) Start() error {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if presence == nil {
|
||||||
|
presence = &types.PresenceInternal{
|
||||||
|
UserID: userID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
deviceRes := api.QueryDevicesResponse{}
|
deviceRes := api.QueryDevicesResponse{}
|
||||||
if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil {
|
if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil {
|
||||||
|
@ -106,7 +111,9 @@ func (s *PresenceConsumer) Start() error {
|
||||||
|
|
||||||
m.Header.Set(jetstream.UserID, presence.UserID)
|
m.Header.Set(jetstream.UserID, presence.UserID)
|
||||||
m.Header.Set("presence", presence.ClientFields.Presence)
|
m.Header.Set("presence", presence.ClientFields.Presence)
|
||||||
m.Header.Set("status_msg", *presence.ClientFields.StatusMsg)
|
if presence.ClientFields.StatusMsg != nil {
|
||||||
|
m.Header.Set("status_msg", *presence.ClientFields.StatusMsg)
|
||||||
|
}
|
||||||
m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS)))
|
m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS)))
|
||||||
|
|
||||||
if err = msg.RespondMsg(m); err != nil {
|
if err = msg.RespondMsg(m); err != nil {
|
||||||
|
|
|
@ -127,6 +127,9 @@ func (p *presenceStatements) GetPresenceForUser(
|
||||||
}
|
}
|
||||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||||
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
result.ClientFields.Presence = result.Presence.String()
|
result.ClientFields.Presence = result.Presence.String()
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,6 +142,9 @@ func (p *presenceStatements) GetPresenceForUser(
|
||||||
}
|
}
|
||||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||||
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
result.ClientFields.Presence = result.Presence.String()
|
result.ClientFields.Presence = result.Presence.String()
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@ package streams
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -80,11 +79,10 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
||||||
if _, ok := presences[roomUsers[i]]; ok {
|
if _, ok := presences[roomUsers[i]]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Bear in mind that this might return nil, but at least populating
|
||||||
|
// a nil means that there's a map entry so we won't repeat this call.
|
||||||
presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i])
|
presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
req.Log.WithError(err).Error("unable to query presence for user")
|
req.Log.WithError(err).Error("unable to query presence for user")
|
||||||
return from
|
return from
|
||||||
}
|
}
|
||||||
|
@ -93,8 +91,10 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
lastPos := to
|
lastPos := to
|
||||||
for i := range presences {
|
for _, presence := range presences {
|
||||||
presence := presences[i]
|
if presence == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
// Ignore users we don't share a room with
|
// Ignore users we don't share a room with
|
||||||
if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) {
|
if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) {
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -127,14 +127,23 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
if !ok { // this should almost never happen
|
if !ok { // this should almost never happen
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
newPresence := types.PresenceInternal{
|
newPresence := types.PresenceInternal{
|
||||||
ClientFields: types.PresenceClientResponse{
|
|
||||||
Presence: presenceID.String(),
|
|
||||||
},
|
|
||||||
Presence: presenceID,
|
Presence: presenceID,
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
|
LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensure we also send the current status_msg to federated servers and not nil
|
||||||
|
dbPresence, err := db.GetPresence(context.Background(), userID)
|
||||||
|
if err != nil && err != sql.ErrNoRows {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if dbPresence != nil {
|
||||||
|
newPresence.ClientFields = dbPresence.ClientFields
|
||||||
|
}
|
||||||
|
newPresence.ClientFields.Presence = presenceID.String()
|
||||||
|
|
||||||
defer rp.presence.Store(userID, newPresence)
|
defer rp.presence.Store(userID, newPresence)
|
||||||
// avoid spamming presence updates when syncing
|
// avoid spamming presence updates when syncing
|
||||||
existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
|
existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
|
||||||
|
@ -145,13 +154,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure we also send the current status_msg to federated servers and not nil
|
if err := rp.producer.SendPresence(userID, presenceID, newPresence.ClientFields.StatusMsg); err != nil {
|
||||||
dbPresence, err := db.GetPresence(context.Background(), userID)
|
|
||||||
if err != nil && err != sql.ErrNoRows {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := rp.producer.SendPresence(userID, presenceID, dbPresence.ClientFields.StatusMsg); err != nil {
|
|
||||||
logrus.WithError(err).Error("Unable to publish presence message from sync")
|
logrus.WithError(err).Error("Unable to publish presence message from sync")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -681,8 +681,6 @@ GET /presence/:user_id/status fetches initial status
|
||||||
PUT /presence/:user_id/status updates my presence
|
PUT /presence/:user_id/status updates my presence
|
||||||
Presence change reports an event to myself
|
Presence change reports an event to myself
|
||||||
Existing members see new members' presence
|
Existing members see new members' presence
|
||||||
#Existing members see new member's presence
|
|
||||||
Newly joined room includes presence in incremental sync
|
|
||||||
Get presence for newly joined members in incremental sync
|
Get presence for newly joined members in incremental sync
|
||||||
User sees their own presence in a sync
|
User sees their own presence in a sync
|
||||||
User sees updates to presence from other users in the incremental sync.
|
User sees updates to presence from other users in the incremental sync.
|
||||||
|
|
Loading…
Reference in a new issue