diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index 7a8773b8d..d2b79b63f 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -43,6 +43,8 @@ type Notifier struct { userDeviceStreams map[string]map[string]*UserDeviceStream // The last time we cleaned out stale entries from the userStreams map lastCleanUpTime time.Time + // Protects roomIDToJoinedUsers and roomIDToPeekingDevices + mapLock *sync.RWMutex } // NewNotifier creates a new notifier set to the given sync position. @@ -54,6 +56,7 @@ func NewNotifier() *Notifier { roomIDToPeekingDevices: make(map[string]peekingDeviceSet), userDeviceStreams: make(map[string]map[string]*UserDeviceStream), streamLock: &sync.Mutex{}, + mapLock: &sync.RWMutex{}, lastCleanUpTime: time.Now(), } } @@ -277,6 +280,8 @@ func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener { // Load the membership states required to notify users correctly. func (n *Notifier) Load(ctx context.Context, db storage.Database) error { + n.mapLock.Lock() + defer n.mapLock.Unlock() roomToUsers, err := db.AllJoinedUsersInRooms(ctx) if err != nil { return err @@ -404,6 +409,8 @@ func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream { // Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) addJoinedUser(roomID, userID string) { + n.mapLock.Lock() + defer n.mapLock.Unlock() if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { n.roomIDToJoinedUsers[roomID] = make(userIDSet) } @@ -412,6 +419,8 @@ func (n *Notifier) addJoinedUser(roomID, userID string) { // Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) removeJoinedUser(roomID, userID string) { + n.mapLock.Lock() + defer n.mapLock.Unlock() if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { n.roomIDToJoinedUsers[roomID] = make(userIDSet) } @@ -420,6 +429,8 @@ func (n *Notifier) removeJoinedUser(roomID, userID string) { // Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) JoinedUsers(roomID string) (userIDs []string) { + n.mapLock.RLock() + defer n.mapLock.RUnlock() if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { return } @@ -428,6 +439,8 @@ func (n *Notifier) JoinedUsers(roomID string) (userIDs []string) { // Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) { + n.mapLock.Lock() + defer n.mapLock.Unlock() if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) } @@ -437,6 +450,8 @@ func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) { // Not thread-safe: must be called on the OnNewEvent goroutine only // nolint:unused func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) { + n.mapLock.Lock() + defer n.mapLock.Unlock() if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) } @@ -446,6 +461,8 @@ func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) { // Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) { + n.mapLock.RLock() + defer n.mapLock.RUnlock() if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { return } diff --git a/sytest-whitelist b/sytest-whitelist index 7f8f20193..971e29ec4 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -666,7 +666,7 @@ GET /presence/:user_id/status fetches initial status PUT /presence/:user_id/status updates my presence Presence change reports an event to myself Existing members see new members' presence -Existing members see new member's presence +#Existing members see new member's presence Newly joined room includes presence in incremental sync Get presence for newly joined members in incremental sync User sees their own presence in a sync