diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index 4e1a3c2a3..7c1e3c2db 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" log "github.com/sirupsen/logrus" ) @@ -64,6 +65,9 @@ func NewNotifier() *Notifier { // SetCurrentPosition sets the current streaming positions. // This must be called directly after NewNotifier and initialising the streams. func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + n.currPos = currPos } @@ -257,7 +261,7 @@ func (n *Notifier) SharedUsers(userID string) (sharedUsers []string) { sharedUsers = append(sharedUsers, n.JoinedUsers(roomID)...) } } - return sharedUsers + return util.UniqueStrings(sharedUsers) } func (n *Notifier) IsSharedUser(userA, userB string) bool { @@ -328,7 +332,7 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) { // This is just the bulk form of addJoinedUser for roomID, userIDs := range roomIDToUserIDs { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - n.roomIDToJoinedUsers[roomID] = make(userIDSet) + n.roomIDToJoinedUsers[roomID] = make(userIDSet, len(userIDs)) } for _, userID := range userIDs { n.roomIDToJoinedUsers[roomID].add(userID) @@ -343,7 +347,7 @@ func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.P // This is just the bulk form of addPeekingDevice for roomID, peekingDevices := range roomIDToPeekingDevices { if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { - n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) + n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet, len(peekingDevices)) } for _, peekingDevice := range peekingDevices { n.roomIDToPeekingDevices[roomID].add(peekingDevice) @@ -416,7 +420,7 @@ func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream { if !ok { return []*UserDeviceStream{} } - streams := []*UserDeviceStream{} + streams := make([]*UserDeviceStream, 0, len(user)) for _, stream := range user { streams = append(streams, stream) } @@ -514,10 +518,10 @@ func (n *Notifier) removeEmptyUserStreams() { } // A string set, mainly existing for improving clarity of structs in this file. -type userIDSet map[string]bool +type userIDSet map[string]struct{} func (s userIDSet) add(str string) { - s[str] = true + s[str] = struct{}{} } func (s userIDSet) remove(str string) { @@ -534,10 +538,10 @@ func (s userIDSet) values() (vals []string) { // A set of PeekingDevices, similar to userIDSet -type peekingDeviceSet map[types.PeekingDevice]bool +type peekingDeviceSet map[types.PeekingDevice]struct{} func (s peekingDeviceSet) add(d types.PeekingDevice) { - s[d] = true + s[d] = struct{}{} } // nolint:unused