From 2ff85d678df7fe24dbfb549958ea90455a75b28f Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 15 May 2017 14:33:15 +0100 Subject: [PATCH] Finish notifier tests --- .../dendrite/syncapi/sync/notifier_test.go | 106 +++++++++++++++++- 1 file changed, 105 insertions(+), 1 deletion(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 6c8424c48..d42cbf1ab 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -29,12 +29,14 @@ import ( var ( randomMessageEvent gomatrixserverlib.Event aliceInviteBobEvent gomatrixserverlib.Event + bobLeaveEvent gomatrixserverlib.Event ) var ( streamPositionVeryOld = types.StreamPosition(5) streamPositionBefore = types.StreamPosition(11) streamPositionAfter = types.StreamPosition(12) + streamPositionAfter2 = types.StreamPosition(13) roomID = "!test:localhost" alice = "@alice:localhost" bob = "@bob:localhost" @@ -70,6 +72,20 @@ func init() { if err != nil { panic(err) } + bobLeaveEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.member", + "state_key": "`+bob+`", + "content": { + "membership": "leave" + }, + "sender": "`+bob+`", + "room_id": "`+roomID+`", + "origin_server_ts": 12345, + "event_id": "$bobLeaveEvent:localhost" + }`), false) + if err != nil { + panic(err) + } } // Test that the current position is returned if a request is already behind. @@ -142,12 +158,100 @@ func TestNewInviteEventForUser(t *testing.T) { // Test that all blocked requests get woken up on a new event. func TestMultipleRequestWakeup(t *testing.T) { + n := NewNotifier(streamPositionBefore) + n.usersJoinedToRooms(map[string][]string{ + roomID: []string{alice, bob}, + }) + var wg sync.WaitGroup + wg.Add(3) + poll := func() { + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) + if err != nil { + t.Errorf("TestMultipleRequestWakeup error: %s", err) + } + if pos != streamPositionAfter { + t.Errorf("TestMultipleRequestWakeup want %d, got %d", streamPositionAfter, pos) + } + wg.Done() + } + go poll() + go poll() + go poll() + + stream := n.fetchUserStream(bob, true) + waitForBlocking(stream, 3) + + n.OnNewEvent(&randomMessageEvent, streamPositionAfter) + + wg.Wait() + + numWaiting := stream.NumWaiting() + if numWaiting != 0 { + t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting) + } } -// Test that you stop getting unblocked when you leave a room. +// Test that you stop getting woken up when you leave a room. func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { + // listen as bob. Make bob leave room. Make alice send event to room. + // Make sure alice gets woken up only and not bob as well. + n := NewNotifier(streamPositionBefore) + n.usersJoinedToRooms(map[string][]string{ + roomID: []string{alice, bob}, + }) + var leaveWG sync.WaitGroup + + // Make bob leave the room + leaveWG.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) + if err != nil { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err) + } + if pos != streamPositionAfter { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter, pos) + } + leaveWG.Done() + }() + bobStream := n.fetchUserStream(bob, true) + waitForBlocking(bobStream, 1) + n.OnNewEvent(&bobLeaveEvent, streamPositionAfter) + leaveWG.Wait() + + // send an event into the room. Make sure alice gets it. Bob should not. + var aliceWG sync.WaitGroup + aliceStream := n.fetchUserStream(alice, true) + aliceWG.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionAfter)) + if err != nil { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err) + } + if pos != streamPositionAfter2 { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter2, pos) + } + aliceWG.Done() + }() + + go func() { + // this should timeout with an error (but the main goroutine won't wait for the timeout explicitly) + _, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionAfter)) + if err == nil { + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil") + } + }() + + waitForBlocking(aliceStream, 1) + waitForBlocking(bobStream, 1) + + n.OnNewEvent(&randomMessageEvent, streamPositionAfter2) + aliceWG.Wait() + + // it's possible that at this point alice has been informed and bob is about to be informed, so wait + // for a fraction of a second to account for this race + time.Sleep(1 * time.Millisecond) } // same as Notifier.WaitForEvents but with a timeout.