mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-15 10:53:09 -06:00
Fix data races reported by go test -race ./...
Signed-off-by: Alex Chen <minecnly@gmail.com>
This commit is contained in:
parent
bff60953f3
commit
9872921105
|
|
@ -143,7 +143,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
|
|||
wg.Done()
|
||||
}()
|
||||
|
||||
stream := n.fetchUserStream(bob, true)
|
||||
stream := lockedFetchUserStream(n, bob, true)
|
||||
waitForBlocking(stream, 1)
|
||||
|
||||
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
||||
|
|
@ -171,7 +171,7 @@ func TestNewInviteEventForUser(t *testing.T) {
|
|||
wg.Done()
|
||||
}()
|
||||
|
||||
stream := n.fetchUserStream(bob, true)
|
||||
stream := lockedFetchUserStream(n, bob, true)
|
||||
waitForBlocking(stream, 1)
|
||||
|
||||
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
|
||||
|
|
@ -199,7 +199,7 @@ func TestEDUWakeup(t *testing.T) {
|
|||
wg.Done()
|
||||
}()
|
||||
|
||||
stream := n.fetchUserStream(bob, true)
|
||||
stream := lockedFetchUserStream(n, bob, true)
|
||||
waitForBlocking(stream, 1)
|
||||
|
||||
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
|
||||
|
|
@ -230,7 +230,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
|
|||
go poll()
|
||||
go poll()
|
||||
|
||||
stream := n.fetchUserStream(bob, true)
|
||||
stream := lockedFetchUserStream(n, bob, true)
|
||||
waitForBlocking(stream, 3)
|
||||
|
||||
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
||||
|
|
@ -266,14 +266,14 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
|||
}
|
||||
leaveWG.Done()
|
||||
}()
|
||||
bobStream := n.fetchUserStream(bob, true)
|
||||
bobStream := lockedFetchUserStream(n, bob, true)
|
||||
waitForBlocking(bobStream, 1)
|
||||
n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
|
||||
leaveWG.Wait()
|
||||
|
||||
// send an event into the room. Make sure alice gets it. Bob should not.
|
||||
var aliceWG sync.WaitGroup
|
||||
aliceStream := n.fetchUserStream(alice, true)
|
||||
aliceStream := lockedFetchUserStream(n, alice, true)
|
||||
aliceWG.Add(1)
|
||||
go func() {
|
||||
pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter))
|
||||
|
|
@ -328,6 +328,14 @@ func waitForBlocking(s *UserStream, numBlocking uint) {
|
|||
}
|
||||
}
|
||||
|
||||
// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock.
|
||||
func lockedFetchUserStream(n *Notifier, userID string, makeIfNotExists bool) *UserStream {
|
||||
n.streamLock.Lock()
|
||||
defer n.streamLock.Unlock()
|
||||
|
||||
return n.fetchUserStream(userID, makeIfNotExists)
|
||||
}
|
||||
|
||||
func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest {
|
||||
return syncRequest{
|
||||
device: authtypes.Device{UserID: userID},
|
||||
|
|
|
|||
Loading…
Reference in a new issue