Fix lock contention in sync notifier
This commit is contained in:
parent
87b1ed1338
commit
7902859bc1
|
@ -25,28 +25,28 @@ import (
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NOTE: ALL FUNCTIONS IN THIS FILE PREFIXED WITH _ ARE NOT THREAD-SAFE
|
||||||
|
// AND MUST ONLY BE CALLED WHEN THE NOTIFIER LOCK IS HELD!
|
||||||
|
|
||||||
// Notifier will wake up sleeping requests when there is some new data.
|
// Notifier will wake up sleeping requests when there is some new data.
|
||||||
// It does not tell requests what that data is, only the sync position which
|
// It does not tell requests what that data is, only the sync position which
|
||||||
// they can use to get at it. This is done to prevent races whereby we tell the caller
|
// they can use to get at it. This is done to prevent races whereby we tell the caller
|
||||||
// the event, but the token has already advanced by the time they fetch it, resulting
|
// the event, but the token has already advanced by the time they fetch it, resulting
|
||||||
// in missed events.
|
// in missed events.
|
||||||
type Notifier struct {
|
type Notifier struct {
|
||||||
|
lock *sync.RWMutex
|
||||||
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
||||||
roomIDToJoinedUsers map[string]userIDSet
|
roomIDToJoinedUsers map[string]userIDSet
|
||||||
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
||||||
roomIDToPeekingDevices map[string]peekingDeviceSet
|
roomIDToPeekingDevices map[string]peekingDeviceSet
|
||||||
// Protects currPos and userStreams.
|
|
||||||
streamLock *sync.Mutex
|
|
||||||
// The latest sync position
|
// The latest sync position
|
||||||
currPos types.StreamingToken
|
currPos types.StreamingToken
|
||||||
// A map of user_id => device_id => UserStream which can be used to wake a given user's /sync request.
|
// A map of user_id => device_id => UserStream which can be used to wake a given user's /sync request.
|
||||||
userDeviceStreams map[string]map[string]*UserDeviceStream
|
userDeviceStreams map[string]map[string]*UserDeviceStream
|
||||||
// The last time we cleaned out stale entries from the userStreams map
|
// The last time we cleaned out stale entries from the userStreams map
|
||||||
lastCleanUpTime time.Time
|
lastCleanUpTime time.Time
|
||||||
// Protects roomIDToJoinedUsers and roomIDToPeekingDevices
|
|
||||||
mapLock *sync.RWMutex
|
|
||||||
// This map is reused to prevent allocations and GC pressure in SharedUsers.
|
// This map is reused to prevent allocations and GC pressure in SharedUsers.
|
||||||
_sharedUsers map[string]struct{}
|
_sharedUserMap map[string]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNotifier creates a new notifier set to the given sync position.
|
// NewNotifier creates a new notifier set to the given sync position.
|
||||||
|
@ -57,18 +57,17 @@ func NewNotifier() *Notifier {
|
||||||
roomIDToJoinedUsers: make(map[string]userIDSet),
|
roomIDToJoinedUsers: make(map[string]userIDSet),
|
||||||
roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
|
roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
|
||||||
userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
|
userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
|
||||||
streamLock: &sync.Mutex{},
|
lock: &sync.RWMutex{},
|
||||||
mapLock: &sync.RWMutex{},
|
|
||||||
lastCleanUpTime: time.Now(),
|
lastCleanUpTime: time.Now(),
|
||||||
_sharedUsers: map[string]struct{}{},
|
_sharedUserMap: map[string]struct{}{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetCurrentPosition sets the current streaming positions.
|
// SetCurrentPosition sets the current streaming positions.
|
||||||
// This must be called directly after NewNotifier and initialising the streams.
|
// This must be called directly after NewNotifier and initialising the streams.
|
||||||
func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) {
|
func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos = currPos
|
n.currPos = currPos
|
||||||
}
|
}
|
||||||
|
@ -89,17 +88,16 @@ func (n *Notifier) OnNewEvent(
|
||||||
) {
|
) {
|
||||||
// update the current position then notify relevant /sync streams.
|
// update the current position then notify relevant /sync streams.
|
||||||
// This needs to be done PRIOR to waking up users as they will read this value.
|
// This needs to be done PRIOR to waking up users as they will read this value.
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
n.currPos.ApplyUpdates(posUpdate)
|
||||||
n.removeEmptyUserStreams()
|
n._removeEmptyUserStreams()
|
||||||
|
|
||||||
if ev != nil {
|
if ev != nil {
|
||||||
// Map this event's room_id to a list of joined users, and wake them up.
|
// Map this event's room_id to a list of joined users, and wake them up.
|
||||||
usersToNotify := n.JoinedUsers(ev.RoomID())
|
usersToNotify := n._joinedUsers(ev.RoomID())
|
||||||
// Map this event's room_id to a list of peeking devices, and wake them up.
|
// Map this event's room_id to a list of peeking devices, and wake them up.
|
||||||
peekingDevicesToNotify := n.PeekingDevices(ev.RoomID())
|
peekingDevicesToNotify := n._peekingDevices(ev.RoomID())
|
||||||
// If this is an invite, also add in the invitee to this list.
|
// If this is an invite, also add in the invitee to this list.
|
||||||
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
||||||
targetUserID := *ev.StateKey()
|
targetUserID := *ev.StateKey()
|
||||||
|
@ -117,20 +115,20 @@ func (n *Notifier) OnNewEvent(
|
||||||
// Manually append the new user's ID so they get notified
|
// Manually append the new user's ID so they get notified
|
||||||
// along all members in the room
|
// along all members in the room
|
||||||
usersToNotify = append(usersToNotify, targetUserID)
|
usersToNotify = append(usersToNotify, targetUserID)
|
||||||
n.addJoinedUser(ev.RoomID(), targetUserID)
|
n._addJoinedUser(ev.RoomID(), targetUserID)
|
||||||
case gomatrixserverlib.Leave:
|
case gomatrixserverlib.Leave:
|
||||||
fallthrough
|
fallthrough
|
||||||
case gomatrixserverlib.Ban:
|
case gomatrixserverlib.Ban:
|
||||||
n.removeJoinedUser(ev.RoomID(), targetUserID)
|
n._removeJoinedUser(ev.RoomID(), targetUserID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos)
|
n._wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos)
|
||||||
} else if roomID != "" {
|
} else if roomID != "" {
|
||||||
n.wakeupUsers(n.JoinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
|
n._wakeupUsers(n._joinedUsers(roomID), n._peekingDevices(roomID), n.currPos)
|
||||||
} else if len(userIDs) > 0 {
|
} else if len(userIDs) > 0 {
|
||||||
n.wakeupUsers(userIDs, nil, n.currPos)
|
n._wakeupUsers(userIDs, nil, n.currPos)
|
||||||
} else {
|
} else {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"posUpdate": posUpdate.String,
|
"posUpdate": posUpdate.String,
|
||||||
|
@ -141,22 +139,22 @@ func (n *Notifier) OnNewEvent(
|
||||||
func (n *Notifier) OnNewAccountData(
|
func (n *Notifier) OnNewAccountData(
|
||||||
userID string, posUpdate types.StreamingToken,
|
userID string, posUpdate types.StreamingToken,
|
||||||
) {
|
) {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
n.currPos.ApplyUpdates(posUpdate)
|
||||||
n.wakeupUsers([]string{userID}, nil, posUpdate)
|
n._wakeupUsers([]string{userID}, nil, posUpdate)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Notifier) OnNewPeek(
|
func (n *Notifier) OnNewPeek(
|
||||||
roomID, userID, deviceID string,
|
roomID, userID, deviceID string,
|
||||||
posUpdate types.StreamingToken,
|
posUpdate types.StreamingToken,
|
||||||
) {
|
) {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
n.currPos.ApplyUpdates(posUpdate)
|
||||||
n.addPeekingDevice(roomID, userID, deviceID)
|
n._addPeekingDevice(roomID, userID, deviceID)
|
||||||
|
|
||||||
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
|
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
|
||||||
// by calling OnNewEvent.
|
// by calling OnNewEvent.
|
||||||
|
@ -166,11 +164,11 @@ func (n *Notifier) OnRetirePeek(
|
||||||
roomID, userID, deviceID string,
|
roomID, userID, deviceID string,
|
||||||
posUpdate types.StreamingToken,
|
posUpdate types.StreamingToken,
|
||||||
) {
|
) {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
n.currPos.ApplyUpdates(posUpdate)
|
||||||
n.removePeekingDevice(roomID, userID, deviceID)
|
n._removePeekingDevice(roomID, userID, deviceID)
|
||||||
|
|
||||||
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
|
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
|
||||||
// by calling OnRetireEvent.
|
// by calling OnRetireEvent.
|
||||||
|
@ -180,11 +178,11 @@ func (n *Notifier) OnNewSendToDevice(
|
||||||
userID string, deviceIDs []string,
|
userID string, deviceIDs []string,
|
||||||
posUpdate types.StreamingToken,
|
posUpdate types.StreamingToken,
|
||||||
) {
|
) {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
n.currPos.ApplyUpdates(posUpdate)
|
||||||
n.wakeupUserDevice(userID, deviceIDs, n.currPos)
|
n._wakeupUserDevice(userID, deviceIDs, n.currPos)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnNewReceipt updates the current position
|
// OnNewReceipt updates the current position
|
||||||
|
@ -192,11 +190,11 @@ func (n *Notifier) OnNewTyping(
|
||||||
roomID string,
|
roomID string,
|
||||||
posUpdate types.StreamingToken,
|
posUpdate types.StreamingToken,
|
||||||
) {
|
) {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
n.currPos.ApplyUpdates(posUpdate)
|
||||||
n.wakeupUsers(n.JoinedUsers(roomID), nil, n.currPos)
|
n._wakeupUsers(n._joinedUsers(roomID), nil, n.currPos)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnNewReceipt updates the current position
|
// OnNewReceipt updates the current position
|
||||||
|
@ -204,80 +202,84 @@ func (n *Notifier) OnNewReceipt(
|
||||||
roomID string,
|
roomID string,
|
||||||
posUpdate types.StreamingToken,
|
posUpdate types.StreamingToken,
|
||||||
) {
|
) {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
n.currPos.ApplyUpdates(posUpdate)
|
||||||
n.wakeupUsers(n.JoinedUsers(roomID), nil, n.currPos)
|
n._wakeupUsers(n._joinedUsers(roomID), nil, n.currPos)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Notifier) OnNewKeyChange(
|
func (n *Notifier) OnNewKeyChange(
|
||||||
posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
|
posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
|
||||||
) {
|
) {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
n.currPos.ApplyUpdates(posUpdate)
|
||||||
n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
|
n._wakeupUsers([]string{wakeUserID}, nil, n.currPos)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Notifier) OnNewInvite(
|
func (n *Notifier) OnNewInvite(
|
||||||
posUpdate types.StreamingToken, wakeUserID string,
|
posUpdate types.StreamingToken, wakeUserID string,
|
||||||
) {
|
) {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
n.currPos.ApplyUpdates(posUpdate)
|
||||||
n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
|
n._wakeupUsers([]string{wakeUserID}, nil, n.currPos)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Notifier) OnNewNotificationData(
|
func (n *Notifier) OnNewNotificationData(
|
||||||
userID string,
|
userID string,
|
||||||
posUpdate types.StreamingToken,
|
posUpdate types.StreamingToken,
|
||||||
) {
|
) {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
n.currPos.ApplyUpdates(posUpdate)
|
||||||
n.wakeupUsers([]string{userID}, nil, n.currPos)
|
n._wakeupUsers([]string{userID}, nil, n.currPos)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Notifier) OnNewPresence(
|
func (n *Notifier) OnNewPresence(
|
||||||
posUpdate types.StreamingToken, userID string,
|
posUpdate types.StreamingToken, userID string,
|
||||||
) {
|
) {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
n.currPos.ApplyUpdates(posUpdate)
|
||||||
sharedUsers := n.SharedUsers(userID)
|
sharedUsers := n._sharedUsers(userID)
|
||||||
sharedUsers = append(sharedUsers, userID)
|
sharedUsers = append(sharedUsers, userID)
|
||||||
|
|
||||||
n.wakeupUsers(sharedUsers, nil, n.currPos)
|
n._wakeupUsers(sharedUsers, nil, n.currPos)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Notifier) SharedUsers(userID string) []string {
|
func (n *Notifier) SharedUsers(userID string) []string {
|
||||||
n.mapLock.RLock()
|
n.lock.RLock()
|
||||||
defer n.mapLock.RUnlock()
|
defer n.lock.RUnlock()
|
||||||
n._sharedUsers[userID] = struct{}{}
|
return n._sharedUsers(userID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Notifier) _sharedUsers(userID string) []string {
|
||||||
|
n._sharedUserMap[userID] = struct{}{}
|
||||||
for roomID, users := range n.roomIDToJoinedUsers {
|
for roomID, users := range n.roomIDToJoinedUsers {
|
||||||
if _, ok := users[userID]; !ok {
|
if _, ok := users[userID]; !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, userID := range n.JoinedUsers(roomID) {
|
for _, userID := range n._joinedUsers(roomID) {
|
||||||
n._sharedUsers[userID] = struct{}{}
|
n._sharedUserMap[userID] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sharedUsers := make([]string, 0, len(n._sharedUsers)+1)
|
sharedUsers := make([]string, 0, len(n._sharedUserMap)+1)
|
||||||
for userID := range n._sharedUsers {
|
for userID := range n._sharedUserMap {
|
||||||
sharedUsers = append(sharedUsers, userID)
|
sharedUsers = append(sharedUsers, userID)
|
||||||
delete(n._sharedUsers, userID)
|
delete(n._sharedUserMap, userID)
|
||||||
}
|
}
|
||||||
return sharedUsers
|
return sharedUsers
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Notifier) IsSharedUser(userA, userB string) bool {
|
func (n *Notifier) IsSharedUser(userA, userB string) bool {
|
||||||
n.mapLock.RLock()
|
n.lock.RLock()
|
||||||
defer n.mapLock.RUnlock()
|
defer n.lock.RUnlock()
|
||||||
var okA, okB bool
|
var okA, okB bool
|
||||||
for _, users := range n.roomIDToJoinedUsers {
|
for _, users := range n.roomIDToJoinedUsers {
|
||||||
_, okA = users[userA]
|
_, okA = users[userA]
|
||||||
|
@ -301,18 +303,18 @@ func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener {
|
||||||
// TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
|
// TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
|
||||||
// but given we don't do /events, let's pretend it doesn't exist.
|
// but given we don't do /events, let's pretend it doesn't exist.
|
||||||
|
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
n.removeEmptyUserStreams()
|
n._removeEmptyUserStreams()
|
||||||
|
|
||||||
return n.fetchUserDeviceStream(req.Device.UserID, req.Device.ID, true).GetListener(req.Context)
|
return n._fetchUserDeviceStream(req.Device.UserID, req.Device.ID, true).GetListener(req.Context)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load the membership states required to notify users correctly.
|
// Load the membership states required to notify users correctly.
|
||||||
func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
|
func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
|
||||||
n.mapLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.mapLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
|
roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -330,8 +332,8 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
|
||||||
|
|
||||||
// CurrentPosition returns the current sync position
|
// CurrentPosition returns the current sync position
|
||||||
func (n *Notifier) CurrentPosition() types.StreamingToken {
|
func (n *Notifier) CurrentPosition() types.StreamingToken {
|
||||||
n.streamLock.Lock()
|
n.lock.RLock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.RUnlock()
|
||||||
|
|
||||||
return n.currPos
|
return n.currPos
|
||||||
}
|
}
|
||||||
|
@ -366,11 +368,11 @@ func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.P
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// wakeupUsers will wake up the sync strems for all of the devices for all of the
|
// _wakeupUsers will wake up the sync strems for all of the devices for all of the
|
||||||
// specified user IDs, and also the specified peekingDevices
|
// specified user IDs, and also the specified peekingDevices
|
||||||
func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) {
|
func (n *Notifier) _wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) {
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
for _, stream := range n.fetchUserStreams(userID) {
|
for _, stream := range n._fetchUserStreams(userID) {
|
||||||
if stream == nil {
|
if stream == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -380,28 +382,27 @@ func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingD
|
||||||
|
|
||||||
for _, peekingDevice := range peekingDevices {
|
for _, peekingDevice := range peekingDevices {
|
||||||
// TODO: don't bother waking up for devices whose users we already woke up
|
// TODO: don't bother waking up for devices whose users we already woke up
|
||||||
if stream := n.fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil {
|
if stream := n._fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil {
|
||||||
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
|
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// wakeupUserDevice will wake up the sync stream for a specific user device. Other
|
// _wakeupUserDevice will wake up the sync stream for a specific user device. Other
|
||||||
// device streams will be left alone.
|
// device streams will be left alone.
|
||||||
// nolint:unused
|
// nolint:unused
|
||||||
func (n *Notifier) wakeupUserDevice(userID string, deviceIDs []string, newPos types.StreamingToken) {
|
func (n *Notifier) _wakeupUserDevice(userID string, deviceIDs []string, newPos types.StreamingToken) {
|
||||||
for _, deviceID := range deviceIDs {
|
for _, deviceID := range deviceIDs {
|
||||||
if stream := n.fetchUserDeviceStream(userID, deviceID, false); stream != nil {
|
if stream := n._fetchUserDeviceStream(userID, deviceID, false); stream != nil {
|
||||||
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
|
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true,
|
// _fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true,
|
||||||
// a stream will be made for this device if one doesn't exist and it will be returned. This
|
// a stream will be made for this device if one doesn't exist and it will be returned. This
|
||||||
// function does not wait for data to be available on the stream.
|
// function does not wait for data to be available on the stream.
|
||||||
// NB: Callers should have locked the mutex before calling this function.
|
func (n *Notifier) _fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream {
|
||||||
func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream {
|
|
||||||
_, ok := n.userDeviceStreams[userID]
|
_, ok := n.userDeviceStreams[userID]
|
||||||
if !ok {
|
if !ok {
|
||||||
if !makeIfNotExists {
|
if !makeIfNotExists {
|
||||||
|
@ -422,11 +423,10 @@ func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExist
|
||||||
return stream
|
return stream
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true,
|
// _fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true,
|
||||||
// a stream will be made for this user if one doesn't exist and it will be returned. This
|
// a stream will be made for this user if one doesn't exist and it will be returned. This
|
||||||
// function does not wait for data to be available on the stream.
|
// function does not wait for data to be available on the stream.
|
||||||
// NB: Callers should have locked the mutex before calling this function.
|
func (n *Notifier) _fetchUserStreams(userID string) []*UserDeviceStream {
|
||||||
func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream {
|
|
||||||
user, ok := n.userDeviceStreams[userID]
|
user, ok := n.userDeviceStreams[userID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return []*UserDeviceStream{}
|
return []*UserDeviceStream{}
|
||||||
|
@ -438,51 +438,41 @@ func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream {
|
||||||
return streams
|
return streams
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
func (n *Notifier) _addJoinedUser(roomID, userID string) {
|
||||||
func (n *Notifier) addJoinedUser(roomID, userID string) {
|
|
||||||
n.mapLock.Lock()
|
|
||||||
defer n.mapLock.Unlock()
|
|
||||||
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
||||||
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
|
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
|
||||||
}
|
}
|
||||||
n.roomIDToJoinedUsers[roomID].add(userID)
|
n.roomIDToJoinedUsers[roomID].add(userID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
func (n *Notifier) _removeJoinedUser(roomID, userID string) {
|
||||||
func (n *Notifier) removeJoinedUser(roomID, userID string) {
|
|
||||||
n.mapLock.Lock()
|
|
||||||
defer n.mapLock.Unlock()
|
|
||||||
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
||||||
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
|
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
|
||||||
}
|
}
|
||||||
n.roomIDToJoinedUsers[roomID].remove(userID)
|
n.roomIDToJoinedUsers[roomID].remove(userID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
|
||||||
func (n *Notifier) JoinedUsers(roomID string) (userIDs []string) {
|
func (n *Notifier) JoinedUsers(roomID string) (userIDs []string) {
|
||||||
n.mapLock.RLock()
|
n.lock.RLock()
|
||||||
defer n.mapLock.RUnlock()
|
defer n.lock.RUnlock()
|
||||||
|
return n._joinedUsers(roomID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Notifier) _joinedUsers(roomID string) (userIDs []string) {
|
||||||
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return n.roomIDToJoinedUsers[roomID].values()
|
return n.roomIDToJoinedUsers[roomID].values()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
func (n *Notifier) _addPeekingDevice(roomID, userID, deviceID string) {
|
||||||
func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
|
|
||||||
n.mapLock.Lock()
|
|
||||||
defer n.mapLock.Unlock()
|
|
||||||
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
|
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
|
||||||
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
|
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
|
||||||
}
|
}
|
||||||
n.roomIDToPeekingDevices[roomID].add(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
|
n.roomIDToPeekingDevices[roomID].add(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
func (n *Notifier) _removePeekingDevice(roomID, userID, deviceID string) {
|
||||||
// nolint:unused
|
|
||||||
func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
|
|
||||||
n.mapLock.Lock()
|
|
||||||
defer n.mapLock.Unlock()
|
|
||||||
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
|
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
|
||||||
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
|
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
|
||||||
}
|
}
|
||||||
|
@ -490,24 +480,26 @@ func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
|
||||||
n.roomIDToPeekingDevices[roomID].remove(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
|
n.roomIDToPeekingDevices[roomID].remove(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
|
||||||
func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) {
|
func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) {
|
||||||
n.mapLock.RLock()
|
n.lock.RLock()
|
||||||
defer n.mapLock.RUnlock()
|
defer n.lock.RUnlock()
|
||||||
|
return n._peekingDevices(roomID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Notifier) _peekingDevices(roomID string) (peekingDevices []types.PeekingDevice) {
|
||||||
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
|
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return n.roomIDToPeekingDevices[roomID].values()
|
return n.roomIDToPeekingDevices[roomID].values()
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeEmptyUserStreams iterates through the user stream map and removes any
|
// _removeEmptyUserStreams iterates through the user stream map and removes any
|
||||||
// that have been empty for a certain amount of time. This is a crude way of
|
// that have been empty for a certain amount of time. This is a crude way of
|
||||||
// ensuring that the userStreams map doesn't grow forver.
|
// ensuring that the userStreams map doesn't grow forver.
|
||||||
// This should be called when the notifier gets called for whatever reason,
|
// This should be called when the notifier gets called for whatever reason,
|
||||||
// the function itself is responsible for ensuring it doesn't iterate too
|
// the function itself is responsible for ensuring it doesn't iterate too
|
||||||
// often.
|
// often.
|
||||||
// NB: Callers should have locked the mutex before calling this function.
|
func (n *Notifier) _removeEmptyUserStreams() {
|
||||||
func (n *Notifier) removeEmptyUserStreams() {
|
|
||||||
// Only clean up now and again
|
// Only clean up now and again
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
if n.lastCleanUpTime.Add(time.Minute).After(now) {
|
if n.lastCleanUpTime.Add(time.Minute).After(now) {
|
||||||
|
|
|
@ -175,7 +175,7 @@ func TestCorrectStreamWakeup(t *testing.T) {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
wake := "two"
|
wake := "two"
|
||||||
n.wakeupUserDevice(alice, []string{wake}, syncPositionAfter)
|
n._wakeupUserDevice(alice, []string{wake}, syncPositionAfter)
|
||||||
|
|
||||||
if result := <-awoken; result != wake {
|
if result := <-awoken; result != wake {
|
||||||
t.Fatalf("expected to wake %q, got %q", wake, result)
|
t.Fatalf("expected to wake %q, got %q", wake, result)
|
||||||
|
@ -359,10 +359,10 @@ func waitForBlocking(s *UserDeviceStream, numBlocking uint) {
|
||||||
// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock.
|
// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock.
|
||||||
// A new stream is made if it doesn't exist already.
|
// A new stream is made if it doesn't exist already.
|
||||||
func lockedFetchUserStream(n *Notifier, userID, deviceID string) *UserDeviceStream {
|
func lockedFetchUserStream(n *Notifier, userID, deviceID string) *UserDeviceStream {
|
||||||
n.streamLock.Lock()
|
n.lock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
return n.fetchUserDeviceStream(userID, deviceID, true)
|
return n._fetchUserDeviceStream(userID, deviceID, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) types.SyncRequest {
|
func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) types.SyncRequest {
|
||||||
|
|
Loading…
Reference in a new issue