From 269cab118a9eece701748ac281a1f6955bc60257 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 15 May 2017 11:51:04 +0100 Subject: [PATCH] Fix tests so they don't race --- .../dendrite/syncapi/sync/notifier_test.go | 12 +++++++++++- .../matrix-org/dendrite/syncapi/sync/userstream.go | 14 +++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 6b9e0abe8..757f90f30 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -80,7 +80,9 @@ func TestNewEventAndJoinedToRoom(t *testing.T) { } wg.Done() }() - time.Sleep(1 * time.Millisecond) + + stream := n.fetchUserStream("@bob:localhost", true) + waitForBlocking(stream, 1) n.OnNewEvent(&randomMessageEvent, types.StreamPosition(12)) @@ -125,6 +127,14 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { } } +// Wait until something is Wait()ing on the user stream. +func waitForBlocking(s *UserStream, numBlocking int) { + for numBlocking != s.NumWaiting() { + // This is horrible but I don't want to add a signalling mechanism JUST for testing. + time.Sleep(1 * time.Microsecond) + } +} + func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest { return syncRequest{ userID: userID, diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go index 34e7ed8dc..5a624443f 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go @@ -11,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package sync import ( @@ -25,10 +26,12 @@ import ( type UserStream struct { UserID string // Because this is a Cond, we can notify all waiting goroutines so this works - // across devices. Protects pos. + // across devices for the same user. Protects pos. cond *sync.Cond // The position to broadcast to callers of Wait(). pos types.StreamPosition + // The number of goroutines blocked on Wait() - used for testing and metrics + numWaiting int } // NewUserStream creates a new user stream @@ -42,8 +45,10 @@ func NewUserStream(userID string) *UserStream { // Wait blocks until there is a new stream position for this user, which is then returned. func (s *UserStream) Wait() (pos types.StreamPosition) { s.cond.L.Lock() + s.numWaiting += 1 s.cond.Wait() pos = s.pos + s.numWaiting -= 1 s.cond.L.Unlock() return } @@ -55,3 +60,10 @@ func (s *UserStream) Broadcast(pos types.StreamPosition) { s.cond.L.Unlock() s.cond.Broadcast() } + +// NumWaiting returns the number of goroutines waiting for Wait() to return. Used for metrics and testing. +func (s *UserStream) NumWaiting() int { + s.cond.L.Lock() + defer s.cond.L.Unlock() + return s.numWaiting +}