diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index aaa56e700..b3753b7d7 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -35,9 +35,8 @@ type Notifier struct { // for concurrent reads on /sync requests currPos types.StreamPosition currPosMutex *sync.RWMutex - // A map of RoomID => Set : Map access is guarded by roomIDToJoinedUsersMutex. - roomIDToJoinedUsers map[string]set - roomIDToJoinedUsersMutex *sync.Mutex + // A map of RoomID => Set + roomIDToJoinedUsers map[string]set // A map of user_id => UserStream which can be used to wake a given user's /sync request. // Map access is guarded by userStreamsMutex. userStreams map[string]*UserStream @@ -49,12 +48,11 @@ type Notifier struct { // the joined users within each of them by calling Notifier.LoadFromDatabase(). func NewNotifier(pos types.StreamPosition) *Notifier { return &Notifier{ - currPos: pos, - currPosMutex: &sync.RWMutex{}, - roomIDToJoinedUsers: make(map[string]set), - roomIDToJoinedUsersMutex: &sync.Mutex{}, - userStreams: make(map[string]*UserStream), - userStreamsMutex: &sync.Mutex{}, + currPos: pos, + currPosMutex: &sync.RWMutex{}, + roomIDToJoinedUsers: make(map[string]set), + userStreams: make(map[string]*UserStream), + userStreamsMutex: &sync.Mutex{}, } } @@ -88,7 +86,7 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosit case "leave": fallthrough case "ban": - n.userLeft(ev.RoomID(), userID) + n.removeJoinedUser(ev.RoomID(), userID) } } } @@ -139,9 +137,7 @@ func (n *Notifier) Load(db *storage.SyncServerDatabase) error { // these rooms will wake the given users /sync requests. This should be called prior to ANY calls to // OnNewEvent (eg on startup) to prevent racing. func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) { - // This is just the bulk form of userJoined where we only lock once. - n.roomIDToJoinedUsersMutex.Lock() - defer n.roomIDToJoinedUsersMutex.Unlock() + // This is just the bulk form of addJoinedUser for roomID, userIDs := range roomIDToUserIDs { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { n.roomIDToJoinedUsers[roomID] = make(set) @@ -180,27 +176,24 @@ func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStr return stream } +// Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) addJoinedUser(roomID, userID string) { - n.roomIDToJoinedUsersMutex.Lock() - defer n.roomIDToJoinedUsersMutex.Unlock() if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { n.roomIDToJoinedUsers[roomID] = make(set) } n.roomIDToJoinedUsers[roomID].add(userID) } -func (n *Notifier) userLeft(roomID, userID string) { - n.roomIDToJoinedUsersMutex.Lock() - defer n.roomIDToJoinedUsersMutex.Unlock() +// Not thread-safe: must be called on the OnNewEvent goroutine only +func (n *Notifier) removeJoinedUser(roomID, userID string) { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { n.roomIDToJoinedUsers[roomID] = make(set) } n.roomIDToJoinedUsers[roomID].remove(userID) } +// Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { - n.roomIDToJoinedUsersMutex.Lock() - defer n.roomIDToJoinedUsersMutex.Unlock() if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { return } @@ -218,11 +211,6 @@ func (s set) remove(str string) { delete(s, str) } -func (s set) has(str string) bool { - _, ok := s[str] - return ok -} - func (s set) values() (vals []string) { for str := range s { vals = append(vals, str)