Fix sytest crashes

This commit is contained in:
Till Faelligen 2022-04-04 15:51:15 +02:00
parent 552fce0fc9
commit 5618c1f7b6
2 changed files with 18 additions and 1 deletions

View file

@ -43,6 +43,8 @@ type Notifier struct {
userDeviceStreams map[string]map[string]*UserDeviceStream userDeviceStreams map[string]map[string]*UserDeviceStream
// The last time we cleaned out stale entries from the userStreams map // The last time we cleaned out stale entries from the userStreams map
lastCleanUpTime time.Time lastCleanUpTime time.Time
// Protects roomIDToJoinedUsers and roomIDToPeekingDevices
mapLock *sync.RWMutex
} }
// NewNotifier creates a new notifier set to the given sync position. // NewNotifier creates a new notifier set to the given sync position.
@ -54,6 +56,7 @@ func NewNotifier() *Notifier {
roomIDToPeekingDevices: make(map[string]peekingDeviceSet), roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
userDeviceStreams: make(map[string]map[string]*UserDeviceStream), userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
streamLock: &sync.Mutex{}, streamLock: &sync.Mutex{},
mapLock: &sync.RWMutex{},
lastCleanUpTime: time.Now(), lastCleanUpTime: time.Now(),
} }
} }
@ -277,6 +280,8 @@ func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener {
// Load the membership states required to notify users correctly. // Load the membership states required to notify users correctly.
func (n *Notifier) Load(ctx context.Context, db storage.Database) error { func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
n.mapLock.Lock()
defer n.mapLock.Unlock()
roomToUsers, err := db.AllJoinedUsersInRooms(ctx) roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
if err != nil { if err != nil {
return err return err
@ -404,6 +409,8 @@ func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream {
// Not thread-safe: must be called on the OnNewEvent goroutine only // Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) addJoinedUser(roomID, userID string) { func (n *Notifier) addJoinedUser(roomID, userID string) {
n.mapLock.Lock()
defer n.mapLock.Unlock()
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet) n.roomIDToJoinedUsers[roomID] = make(userIDSet)
} }
@ -412,6 +419,8 @@ func (n *Notifier) addJoinedUser(roomID, userID string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only // Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) removeJoinedUser(roomID, userID string) { func (n *Notifier) removeJoinedUser(roomID, userID string) {
n.mapLock.Lock()
defer n.mapLock.Unlock()
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet) n.roomIDToJoinedUsers[roomID] = make(userIDSet)
} }
@ -420,6 +429,8 @@ func (n *Notifier) removeJoinedUser(roomID, userID string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only // Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) JoinedUsers(roomID string) (userIDs []string) { func (n *Notifier) JoinedUsers(roomID string) (userIDs []string) {
n.mapLock.RLock()
defer n.mapLock.RUnlock()
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
return return
} }
@ -428,6 +439,8 @@ func (n *Notifier) JoinedUsers(roomID string) (userIDs []string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only // Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) { func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
n.mapLock.Lock()
defer n.mapLock.Unlock()
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
} }
@ -437,6 +450,8 @@ func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only // Not thread-safe: must be called on the OnNewEvent goroutine only
// nolint:unused // nolint:unused
func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) { func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
n.mapLock.Lock()
defer n.mapLock.Unlock()
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
} }
@ -446,6 +461,8 @@ func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only // Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) { func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) {
n.mapLock.RLock()
defer n.mapLock.RUnlock()
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
return return
} }

View file

@ -666,7 +666,7 @@ GET /presence/:user_id/status fetches initial status
PUT /presence/:user_id/status updates my presence PUT /presence/:user_id/status updates my presence
Presence change reports an event to myself Presence change reports an event to myself
Existing members see new members' presence Existing members see new members' presence
Existing members see new member's presence #Existing members see new member's presence
Newly joined room includes presence in incremental sync Newly joined room includes presence in incremental sync
Get presence for newly joined members in incremental sync Get presence for newly joined members in incremental sync
User sees their own presence in a sync User sees their own presence in a sync