Sync notifier tweaks (#2327)

* Micro-optimisations, lock fixes

* Refactor `SharedUsers`

* Reuse map to reduce allocations/GC pressure

* oh yeah, initialise it

* Leave room for the user ID we'll no doubt append afterward
This commit is contained in:
Neil Alexander 2022-04-06 15:04:41 +01:00 committed by GitHub
parent 602818460d
commit c28f00bf96
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -45,6 +45,8 @@ type Notifier struct {
lastCleanUpTime time.Time lastCleanUpTime time.Time
// Protects roomIDToJoinedUsers and roomIDToPeekingDevices // Protects roomIDToJoinedUsers and roomIDToPeekingDevices
mapLock *sync.RWMutex mapLock *sync.RWMutex
// This map is reused to prevent allocations and GC pressure in SharedUsers.
_sharedUsers map[string]struct{}
} }
// NewNotifier creates a new notifier set to the given sync position. // NewNotifier creates a new notifier set to the given sync position.
@ -58,12 +60,16 @@ func NewNotifier() *Notifier {
streamLock: &sync.Mutex{}, streamLock: &sync.Mutex{},
mapLock: &sync.RWMutex{}, mapLock: &sync.RWMutex{},
lastCleanUpTime: time.Now(), lastCleanUpTime: time.Now(),
_sharedUsers: map[string]struct{}{},
} }
} }
// SetCurrentPosition sets the current streaming positions. // SetCurrentPosition sets the current streaming positions.
// This must be called directly after NewNotifier and initialising the streams. // This must be called directly after NewNotifier and initialising the streams.
func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) { func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos = currPos n.currPos = currPos
} }
@ -249,13 +255,22 @@ func (n *Notifier) OnNewPresence(
n.wakeupUsers(sharedUsers, nil, n.currPos) n.wakeupUsers(sharedUsers, nil, n.currPos)
} }
func (n *Notifier) SharedUsers(userID string) (sharedUsers []string) { func (n *Notifier) SharedUsers(userID string) []string {
n.mapLock.RLock() n.mapLock.RLock()
defer n.mapLock.RUnlock() defer n.mapLock.RUnlock()
n._sharedUsers[userID] = struct{}{}
for roomID, users := range n.roomIDToJoinedUsers { for roomID, users := range n.roomIDToJoinedUsers {
if _, ok := users[userID]; ok { if _, ok := users[userID]; !ok {
sharedUsers = append(sharedUsers, n.JoinedUsers(roomID)...) continue
} }
for _, userID := range n.JoinedUsers(roomID) {
n._sharedUsers[userID] = struct{}{}
}
}
sharedUsers := make([]string, 0, len(n._sharedUsers)+1)
for userID := range n._sharedUsers {
sharedUsers = append(sharedUsers, userID)
delete(n._sharedUsers, userID)
} }
return sharedUsers return sharedUsers
} }
@ -328,7 +343,7 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
// This is just the bulk form of addJoinedUser // This is just the bulk form of addJoinedUser
for roomID, userIDs := range roomIDToUserIDs { for roomID, userIDs := range roomIDToUserIDs {
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet) n.roomIDToJoinedUsers[roomID] = make(userIDSet, len(userIDs))
} }
for _, userID := range userIDs { for _, userID := range userIDs {
n.roomIDToJoinedUsers[roomID].add(userID) n.roomIDToJoinedUsers[roomID].add(userID)
@ -343,7 +358,7 @@ func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.P
// This is just the bulk form of addPeekingDevice // This is just the bulk form of addPeekingDevice
for roomID, peekingDevices := range roomIDToPeekingDevices { for roomID, peekingDevices := range roomIDToPeekingDevices {
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet, len(peekingDevices))
} }
for _, peekingDevice := range peekingDevices { for _, peekingDevice := range peekingDevices {
n.roomIDToPeekingDevices[roomID].add(peekingDevice) n.roomIDToPeekingDevices[roomID].add(peekingDevice)
@ -416,7 +431,7 @@ func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream {
if !ok { if !ok {
return []*UserDeviceStream{} return []*UserDeviceStream{}
} }
streams := []*UserDeviceStream{} streams := make([]*UserDeviceStream, 0, len(user))
for _, stream := range user { for _, stream := range user {
streams = append(streams, stream) streams = append(streams, stream)
} }
@ -514,10 +529,10 @@ func (n *Notifier) removeEmptyUserStreams() {
} }
// A string set, mainly existing for improving clarity of structs in this file. // A string set, mainly existing for improving clarity of structs in this file.
type userIDSet map[string]bool type userIDSet map[string]struct{}
func (s userIDSet) add(str string) { func (s userIDSet) add(str string) {
s[str] = true s[str] = struct{}{}
} }
func (s userIDSet) remove(str string) { func (s userIDSet) remove(str string) {
@ -534,10 +549,10 @@ func (s userIDSet) values() (vals []string) {
// A set of PeekingDevices, similar to userIDSet // A set of PeekingDevices, similar to userIDSet
type peekingDeviceSet map[types.PeekingDevice]bool type peekingDeviceSet map[types.PeekingDevice]struct{}
func (s peekingDeviceSet) add(d types.PeekingDevice) { func (s peekingDeviceSet) add(d types.PeekingDevice) {
s[d] = true s[d] = struct{}{}
} }
// nolint:unused // nolint:unused