From 602818460d3801b480f63a187514b2d9ba9185f1 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Apr 2022 13:31:44 +0100 Subject: [PATCH] Reduce allocations in `/sync` presence stream (#2326) * Reduce allocations on presence * Try to reduce allocations further * Tweak `IsSharedUser` some more * Take map lock --- syncapi/notifier/notifier.go | 18 ++++++++++++++++++ syncapi/streams/stream_presence.go | 15 +-------------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index d2b79b63f..4e1a3c2a3 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -250,6 +250,8 @@ func (n *Notifier) OnNewPresence( } func (n *Notifier) SharedUsers(userID string) (sharedUsers []string) { + n.mapLock.RLock() + defer n.mapLock.RUnlock() for roomID, users := range n.roomIDToJoinedUsers { if _, ok := users[userID]; ok { sharedUsers = append(sharedUsers, n.JoinedUsers(roomID)...) @@ -258,6 +260,20 @@ func (n *Notifier) SharedUsers(userID string) (sharedUsers []string) { return sharedUsers } +func (n *Notifier) IsSharedUser(userA, userB string) bool { + n.mapLock.RLock() + defer n.mapLock.RUnlock() + var okA, okB bool + for _, users := range n.roomIDToJoinedUsers { + _, okA = users[userA] + _, okB = users[userB] + if okA && okB { + return true + } + } + return false +} + // GetListener returns a UserStreamListener that can be used to wait for // updates for a user. Must be closed. // notify for anything before sincePos @@ -509,6 +525,7 @@ func (s userIDSet) remove(str string) { } func (s userIDSet) values() (vals []string) { + vals = make([]string, 0, len(s)) for str := range s { vals = append(vals, str) } @@ -529,6 +546,7 @@ func (s peekingDeviceSet) remove(d types.PeekingDevice) { } func (s peekingDeviceSet) values() (vals []types.PeekingDevice) { + vals = make([]types.PeekingDevice, 0, len(s)) for d := range s { vals = append(vals, d) } diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index a24edad59..6d5ec54bb 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -64,18 +64,6 @@ func (p *PresenceStreamProvider) IncrementalSync( return to } - // get all joined users - // TODO: SharedUsers might get out of sync - sharedUsers := p.notifier.SharedUsers(req.Device.UserID) - - sharedUsersMap := map[string]bool{ - req.Device.UserID: true, - } - // convert array to a map for easier checking if a user exists - for i := range sharedUsers { - sharedUsersMap[sharedUsers[i]] = true - } - // add newly joined rooms user presences newlyJoined := joinedRooms(req.Response, req.Device.UserID) if len(newlyJoined) > 0 { @@ -88,7 +76,6 @@ func (p *PresenceStreamProvider) IncrementalSync( for _, roomID := range newlyJoined { roomUsers := p.notifier.JoinedUsers(roomID) for i := range roomUsers { - sharedUsersMap[roomUsers[i]] = true // we already got a presence from this user if _, ok := presences[roomUsers[i]]; ok { continue @@ -109,7 +96,7 @@ func (p *PresenceStreamProvider) IncrementalSync( for i := range presences { presence := presences[i] // Ignore users we don't share a room with - if !sharedUsersMap[presence.UserID] { + if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) { continue } cacheKey := req.Device.UserID + req.Device.ID + presence.UserID