Review comments

This commit is contained in:
Kegan Dougal 2017-05-16 12:27:40 +01:00
parent 474e836b8b
commit d7de6c3f33

View file

@ -35,9 +35,8 @@ type Notifier struct {
// for concurrent reads on /sync requests
currPos types.StreamPosition
currPosMutex *sync.RWMutex
// A map of RoomID => Set<UserID> : Map access is guarded by roomIDToJoinedUsersMutex.
roomIDToJoinedUsers map[string]set
roomIDToJoinedUsersMutex *sync.Mutex
// A map of RoomID => Set<UserID>
roomIDToJoinedUsers map[string]set
// A map of user_id => UserStream which can be used to wake a given user's /sync request.
// Map access is guarded by userStreamsMutex.
userStreams map[string]*UserStream
@ -49,12 +48,11 @@ type Notifier struct {
// the joined users within each of them by calling Notifier.LoadFromDatabase().
func NewNotifier(pos types.StreamPosition) *Notifier {
return &Notifier{
currPos: pos,
currPosMutex: &sync.RWMutex{},
roomIDToJoinedUsers: make(map[string]set),
roomIDToJoinedUsersMutex: &sync.Mutex{},
userStreams: make(map[string]*UserStream),
userStreamsMutex: &sync.Mutex{},
currPos: pos,
currPosMutex: &sync.RWMutex{},
roomIDToJoinedUsers: make(map[string]set),
userStreams: make(map[string]*UserStream),
userStreamsMutex: &sync.Mutex{},
}
}
@ -88,7 +86,7 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosit
case "leave":
fallthrough
case "ban":
n.userLeft(ev.RoomID(), userID)
n.removeJoinedUser(ev.RoomID(), userID)
}
}
}
@ -139,9 +137,7 @@ func (n *Notifier) Load(db *storage.SyncServerDatabase) error {
// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
// OnNewEvent (eg on startup) to prevent racing.
func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
// This is just the bulk form of userJoined where we only lock once.
n.roomIDToJoinedUsersMutex.Lock()
defer n.roomIDToJoinedUsersMutex.Unlock()
// This is just the bulk form of addJoinedUser
for roomID, userIDs := range roomIDToUserIDs {
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(set)
@ -180,27 +176,24 @@ func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStr
return stream
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) addJoinedUser(roomID, userID string) {
n.roomIDToJoinedUsersMutex.Lock()
defer n.roomIDToJoinedUsersMutex.Unlock()
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(set)
}
n.roomIDToJoinedUsers[roomID].add(userID)
}
func (n *Notifier) userLeft(roomID, userID string) {
n.roomIDToJoinedUsersMutex.Lock()
defer n.roomIDToJoinedUsersMutex.Unlock()
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) removeJoinedUser(roomID, userID string) {
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(set)
}
n.roomIDToJoinedUsers[roomID].remove(userID)
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
n.roomIDToJoinedUsersMutex.Lock()
defer n.roomIDToJoinedUsersMutex.Unlock()
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
return
}
@ -218,11 +211,6 @@ func (s set) remove(str string) {
delete(s, str)
}
func (s set) has(str string) bool {
_, ok := s[str]
return ok
}
func (s set) values() (vals []string) {
for str := range s {
vals = append(vals, str)