mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 06:53:09 -06:00
Finish notifier tests
This commit is contained in:
parent
5c9eb9c13a
commit
2ff85d678d
|
|
@ -29,12 +29,14 @@ import (
|
|||
var (
|
||||
randomMessageEvent gomatrixserverlib.Event
|
||||
aliceInviteBobEvent gomatrixserverlib.Event
|
||||
bobLeaveEvent gomatrixserverlib.Event
|
||||
)
|
||||
|
||||
var (
|
||||
streamPositionVeryOld = types.StreamPosition(5)
|
||||
streamPositionBefore = types.StreamPosition(11)
|
||||
streamPositionAfter = types.StreamPosition(12)
|
||||
streamPositionAfter2 = types.StreamPosition(13)
|
||||
roomID = "!test:localhost"
|
||||
alice = "@alice:localhost"
|
||||
bob = "@bob:localhost"
|
||||
|
|
@ -70,6 +72,20 @@ func init() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
bobLeaveEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
|
||||
"type": "m.room.member",
|
||||
"state_key": "`+bob+`",
|
||||
"content": {
|
||||
"membership": "leave"
|
||||
},
|
||||
"sender": "`+bob+`",
|
||||
"room_id": "`+roomID+`",
|
||||
"origin_server_ts": 12345,
|
||||
"event_id": "$bobLeaveEvent:localhost"
|
||||
}`), false)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Test that the current position is returned if a request is already behind.
|
||||
|
|
@ -142,12 +158,100 @@ func TestNewInviteEventForUser(t *testing.T) {
|
|||
|
||||
// Test that all blocked requests get woken up on a new event.
|
||||
func TestMultipleRequestWakeup(t *testing.T) {
|
||||
n := NewNotifier(streamPositionBefore)
|
||||
n.usersJoinedToRooms(map[string][]string{
|
||||
roomID: []string{alice, bob},
|
||||
})
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
poll := func() {
|
||||
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
|
||||
if err != nil {
|
||||
t.Errorf("TestMultipleRequestWakeup error: %s", err)
|
||||
}
|
||||
if pos != streamPositionAfter {
|
||||
t.Errorf("TestMultipleRequestWakeup want %d, got %d", streamPositionAfter, pos)
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
go poll()
|
||||
go poll()
|
||||
go poll()
|
||||
|
||||
stream := n.fetchUserStream(bob, true)
|
||||
waitForBlocking(stream, 3)
|
||||
|
||||
n.OnNewEvent(&randomMessageEvent, streamPositionAfter)
|
||||
|
||||
wg.Wait()
|
||||
|
||||
numWaiting := stream.NumWaiting()
|
||||
if numWaiting != 0 {
|
||||
t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting)
|
||||
}
|
||||
}
|
||||
|
||||
// Test that you stop getting unblocked when you leave a room.
|
||||
// Test that you stop getting woken up when you leave a room.
|
||||
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||
// listen as bob. Make bob leave room. Make alice send event to room.
|
||||
// Make sure alice gets woken up only and not bob as well.
|
||||
n := NewNotifier(streamPositionBefore)
|
||||
n.usersJoinedToRooms(map[string][]string{
|
||||
roomID: []string{alice, bob},
|
||||
})
|
||||
|
||||
var leaveWG sync.WaitGroup
|
||||
|
||||
// Make bob leave the room
|
||||
leaveWG.Add(1)
|
||||
go func() {
|
||||
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
|
||||
if err != nil {
|
||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
|
||||
}
|
||||
if pos != streamPositionAfter {
|
||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter, pos)
|
||||
}
|
||||
leaveWG.Done()
|
||||
}()
|
||||
bobStream := n.fetchUserStream(bob, true)
|
||||
waitForBlocking(bobStream, 1)
|
||||
n.OnNewEvent(&bobLeaveEvent, streamPositionAfter)
|
||||
leaveWG.Wait()
|
||||
|
||||
// send an event into the room. Make sure alice gets it. Bob should not.
|
||||
var aliceWG sync.WaitGroup
|
||||
aliceStream := n.fetchUserStream(alice, true)
|
||||
aliceWG.Add(1)
|
||||
go func() {
|
||||
pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionAfter))
|
||||
if err != nil {
|
||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
|
||||
}
|
||||
if pos != streamPositionAfter2 {
|
||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter2, pos)
|
||||
}
|
||||
aliceWG.Done()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
// this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
|
||||
_, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionAfter))
|
||||
if err == nil {
|
||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
|
||||
}
|
||||
}()
|
||||
|
||||
waitForBlocking(aliceStream, 1)
|
||||
waitForBlocking(bobStream, 1)
|
||||
|
||||
n.OnNewEvent(&randomMessageEvent, streamPositionAfter2)
|
||||
aliceWG.Wait()
|
||||
|
||||
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
|
||||
// for a fraction of a second to account for this race
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
// same as Notifier.WaitForEvents but with a timeout.
|
||||
|
|
|
|||
Loading…
Reference in a new issue