Reduce allocations in /sync presence stream (#2326)

* Reduce allocations on presence

* Try to reduce allocations further

* Tweak `IsSharedUser` some more

* Take map lock
This commit is contained in:
Neil Alexander 2022-04-06 13:31:44 +01:00 committed by GitHub
parent e5e3350ce1
commit 602818460d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 14 deletions

View file

@ -250,6 +250,8 @@ func (n *Notifier) OnNewPresence(
} }
func (n *Notifier) SharedUsers(userID string) (sharedUsers []string) { func (n *Notifier) SharedUsers(userID string) (sharedUsers []string) {
n.mapLock.RLock()
defer n.mapLock.RUnlock()
for roomID, users := range n.roomIDToJoinedUsers { for roomID, users := range n.roomIDToJoinedUsers {
if _, ok := users[userID]; ok { if _, ok := users[userID]; ok {
sharedUsers = append(sharedUsers, n.JoinedUsers(roomID)...) sharedUsers = append(sharedUsers, n.JoinedUsers(roomID)...)
@ -258,6 +260,20 @@ func (n *Notifier) SharedUsers(userID string) (sharedUsers []string) {
return sharedUsers return sharedUsers
} }
func (n *Notifier) IsSharedUser(userA, userB string) bool {
n.mapLock.RLock()
defer n.mapLock.RUnlock()
var okA, okB bool
for _, users := range n.roomIDToJoinedUsers {
_, okA = users[userA]
_, okB = users[userB]
if okA && okB {
return true
}
}
return false
}
// GetListener returns a UserStreamListener that can be used to wait for // GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed. // updates for a user. Must be closed.
// notify for anything before sincePos // notify for anything before sincePos
@ -509,6 +525,7 @@ func (s userIDSet) remove(str string) {
} }
func (s userIDSet) values() (vals []string) { func (s userIDSet) values() (vals []string) {
vals = make([]string, 0, len(s))
for str := range s { for str := range s {
vals = append(vals, str) vals = append(vals, str)
} }
@ -529,6 +546,7 @@ func (s peekingDeviceSet) remove(d types.PeekingDevice) {
} }
func (s peekingDeviceSet) values() (vals []types.PeekingDevice) { func (s peekingDeviceSet) values() (vals []types.PeekingDevice) {
vals = make([]types.PeekingDevice, 0, len(s))
for d := range s { for d := range s {
vals = append(vals, d) vals = append(vals, d)
} }

View file

@ -64,18 +64,6 @@ func (p *PresenceStreamProvider) IncrementalSync(
return to return to
} }
// get all joined users
// TODO: SharedUsers might get out of sync
sharedUsers := p.notifier.SharedUsers(req.Device.UserID)
sharedUsersMap := map[string]bool{
req.Device.UserID: true,
}
// convert array to a map for easier checking if a user exists
for i := range sharedUsers {
sharedUsersMap[sharedUsers[i]] = true
}
// add newly joined rooms user presences // add newly joined rooms user presences
newlyJoined := joinedRooms(req.Response, req.Device.UserID) newlyJoined := joinedRooms(req.Response, req.Device.UserID)
if len(newlyJoined) > 0 { if len(newlyJoined) > 0 {
@ -88,7 +76,6 @@ func (p *PresenceStreamProvider) IncrementalSync(
for _, roomID := range newlyJoined { for _, roomID := range newlyJoined {
roomUsers := p.notifier.JoinedUsers(roomID) roomUsers := p.notifier.JoinedUsers(roomID)
for i := range roomUsers { for i := range roomUsers {
sharedUsersMap[roomUsers[i]] = true
// we already got a presence from this user // we already got a presence from this user
if _, ok := presences[roomUsers[i]]; ok { if _, ok := presences[roomUsers[i]]; ok {
continue continue
@ -109,7 +96,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
for i := range presences { for i := range presences {
presence := presences[i] presence := presences[i]
// Ignore users we don't share a room with // Ignore users we don't share a room with
if !sharedUsersMap[presence.UserID] { if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) {
continue continue
} }
cacheKey := req.Device.UserID + req.Device.ID + presence.UserID cacheKey := req.Device.UserID + req.Device.ID + presence.UserID