This commit is contained in:
Neil Alexander 2020-05-27 16:04:00 +01:00
parent 7581c886bd
commit 792e04d69d

View file

@ -175,7 +175,7 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
func (n *Notifier) wakeupUsers(userIDs []string, newPos types.StreamingToken) {
for _, userID := range userIDs {
for _, stream := range n.fetchUserStreams(userID, false) {
for deviceID, stream := range n.fetchUserStreams(userID) {
if stream != nil {
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
}
@ -196,11 +196,11 @@ func (n *Notifier) wakeupUserDevice(userDevices map[string]string, newPos types.
// function does not wait for data to be available on the stream.
// NB: Callers should have locked the mutex before calling this function.
func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream {
user, ok := n.userDeviceStreams[userID]
_, ok := n.userDeviceStreams[userID]
if !ok && makeIfNotExists {
n.userDeviceStreams[userID] = map[string]*UserDeviceStream{}
}
stream, ok := user[deviceID]
stream, ok := n.userDeviceStreams[userID][deviceID]
if !ok && makeIfNotExists {
// TODO: Unbounded growth of streams (1 per user)
stream = NewUserDeviceStream(userID, deviceID, n.currPos)
@ -213,9 +213,9 @@ func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExist
// a stream will be made for this user if one doesn't exist and it will be returned. This
// function does not wait for data to be available on the stream.
// NB: Callers should have locked the mutex before calling this function.
func (n *Notifier) fetchUserStreams(userID string, makeIfNotExists bool) []*UserDeviceStream {
func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream {
user, ok := n.userDeviceStreams[userID]
if !ok && makeIfNotExists {
if !ok {
return []*UserDeviceStream{}
}
streams := []*UserDeviceStream{}
@ -266,8 +266,8 @@ func (n *Notifier) removeEmptyUserStreams() {
deleteBefore := now.Add(-5 * time.Minute)
for user, byUser := range n.userDeviceStreams {
for device, value := range byUser {
if value.TimeOfLastNonEmpty().Before(deleteBefore) {
for device, stream := range byUser {
if stream.TimeOfLastNonEmpty().Before(deleteBefore) {
delete(n.userDeviceStreams[user], device)
}
if len(n.userDeviceStreams[user]) == 0 {