mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-16 19:33:09 -06:00
Fix syncapi/sync/notifier_test.go
Signed-off-by: Alex Chen <minecnly@gmail.com>
This commit is contained in:
parent
e742b7d9b5
commit
e6870279ed
|
|
@ -32,19 +32,36 @@ var (
|
||||||
randomMessageEvent gomatrixserverlib.Event
|
randomMessageEvent gomatrixserverlib.Event
|
||||||
aliceInviteBobEvent gomatrixserverlib.Event
|
aliceInviteBobEvent gomatrixserverlib.Event
|
||||||
bobLeaveEvent gomatrixserverlib.Event
|
bobLeaveEvent gomatrixserverlib.Event
|
||||||
|
syncPositionVeryOld types.SyncPosition
|
||||||
|
syncPositionBefore types.SyncPosition
|
||||||
|
syncPositionAfter types.SyncPosition
|
||||||
|
syncPositionAfter2 types.SyncPosition
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
streamPositionVeryOld = types.StreamPosition(5)
|
|
||||||
streamPositionBefore = types.StreamPosition(11)
|
|
||||||
streamPositionAfter = types.StreamPosition(12)
|
|
||||||
streamPositionAfter2 = types.StreamPosition(13)
|
|
||||||
roomID = "!test:localhost"
|
roomID = "!test:localhost"
|
||||||
alice = "@alice:localhost"
|
alice = "@alice:localhost"
|
||||||
bob = "@bob:localhost"
|
bob = "@bob:localhost"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
baseSyncPos := types.SyncPosition{
|
||||||
|
PDUPosition: 0,
|
||||||
|
TypingPosition: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
syncPositionVeryOld = baseSyncPos
|
||||||
|
syncPositionVeryOld.PDUPosition = 5
|
||||||
|
|
||||||
|
syncPositionBefore = baseSyncPos
|
||||||
|
syncPositionBefore.PDUPosition = 11
|
||||||
|
|
||||||
|
syncPositionAfter = baseSyncPos
|
||||||
|
syncPositionAfter.PDUPosition = 12
|
||||||
|
|
||||||
|
syncPositionAfter2 = baseSyncPos
|
||||||
|
syncPositionAfter2.PDUPosition = 13
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
|
randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
|
||||||
"type": "m.room.message",
|
"type": "m.room.message",
|
||||||
|
|
@ -92,19 +109,19 @@ func init() {
|
||||||
|
|
||||||
// Test that the current position is returned if a request is already behind.
|
// Test that the current position is returned if a request is already behind.
|
||||||
func TestImmediateNotification(t *testing.T) {
|
func TestImmediateNotification(t *testing.T) {
|
||||||
n := NewNotifier(streamPositionBefore)
|
n := NewNotifier(syncPositionBefore)
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionVeryOld))
|
pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionVeryOld))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("TestImmediateNotification error: %s", err)
|
t.Fatalf("TestImmediateNotification error: %s", err)
|
||||||
}
|
}
|
||||||
if pos != streamPositionBefore {
|
if pos != syncPositionBefore {
|
||||||
t.Fatalf("TestImmediateNotification want %d, got %d", streamPositionBefore, pos)
|
t.Fatalf("TestImmediateNotification want %d, got %d", syncPositionBefore, pos)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that new events to a joined room unblocks the request.
|
// Test that new events to a joined room unblocks the request.
|
||||||
func TestNewEventAndJoinedToRoom(t *testing.T) {
|
func TestNewEventAndJoinedToRoom(t *testing.T) {
|
||||||
n := NewNotifier(streamPositionBefore)
|
n := NewNotifier(syncPositionBefore)
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
n.setUsersJoinedToRooms(map[string][]string{
|
||||||
roomID: {alice, bob},
|
roomID: {alice, bob},
|
||||||
})
|
})
|
||||||
|
|
@ -112,12 +129,12 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
|
pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
|
t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
|
||||||
}
|
}
|
||||||
if pos != streamPositionAfter {
|
if pos != syncPositionAfter {
|
||||||
t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", streamPositionAfter, pos)
|
t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", syncPositionAfter, pos)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
@ -125,14 +142,14 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
|
||||||
stream := n.fetchUserStream(bob, true)
|
stream := n.fetchUserStream(bob, true)
|
||||||
waitForBlocking(stream, 1)
|
waitForBlocking(stream, 1)
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
|
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that an invite unblocks the request
|
// Test that an invite unblocks the request
|
||||||
func TestNewInviteEventForUser(t *testing.T) {
|
func TestNewInviteEventForUser(t *testing.T) {
|
||||||
n := NewNotifier(streamPositionBefore)
|
n := NewNotifier(syncPositionBefore)
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
n.setUsersJoinedToRooms(map[string][]string{
|
||||||
roomID: {alice, bob},
|
roomID: {alice, bob},
|
||||||
})
|
})
|
||||||
|
|
@ -140,12 +157,12 @@ func TestNewInviteEventForUser(t *testing.T) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
|
pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("TestNewInviteEventForUser error: %s", err)
|
t.Errorf("TestNewInviteEventForUser error: %s", err)
|
||||||
}
|
}
|
||||||
if pos != streamPositionAfter {
|
if pos != syncPositionAfter {
|
||||||
t.Errorf("TestNewInviteEventForUser want %d, got %d", streamPositionAfter, pos)
|
t.Errorf("TestNewInviteEventForUser want %d, got %d", syncPositionAfter, pos)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
@ -153,14 +170,14 @@ func TestNewInviteEventForUser(t *testing.T) {
|
||||||
stream := n.fetchUserStream(bob, true)
|
stream := n.fetchUserStream(bob, true)
|
||||||
waitForBlocking(stream, 1)
|
waitForBlocking(stream, 1)
|
||||||
|
|
||||||
n.OnNewEvent(&aliceInviteBobEvent, "", streamPositionAfter)
|
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that all blocked requests get woken up on a new event.
|
// Test that all blocked requests get woken up on a new event.
|
||||||
func TestMultipleRequestWakeup(t *testing.T) {
|
func TestMultipleRequestWakeup(t *testing.T) {
|
||||||
n := NewNotifier(streamPositionBefore)
|
n := NewNotifier(syncPositionBefore)
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
n.setUsersJoinedToRooms(map[string][]string{
|
||||||
roomID: {alice, bob},
|
roomID: {alice, bob},
|
||||||
})
|
})
|
||||||
|
|
@ -168,12 +185,12 @@ func TestMultipleRequestWakeup(t *testing.T) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(3)
|
wg.Add(3)
|
||||||
poll := func() {
|
poll := func() {
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
|
pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("TestMultipleRequestWakeup error: %s", err)
|
t.Errorf("TestMultipleRequestWakeup error: %s", err)
|
||||||
}
|
}
|
||||||
if pos != streamPositionAfter {
|
if pos != syncPositionAfter {
|
||||||
t.Errorf("TestMultipleRequestWakeup want %d, got %d", streamPositionAfter, pos)
|
t.Errorf("TestMultipleRequestWakeup want %d, got %d", syncPositionAfter, pos)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}
|
}
|
||||||
|
|
@ -184,7 +201,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
|
||||||
stream := n.fetchUserStream(bob, true)
|
stream := n.fetchUserStream(bob, true)
|
||||||
waitForBlocking(stream, 3)
|
waitForBlocking(stream, 3)
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
|
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
|
@ -198,7 +215,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
|
||||||
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||||
// listen as bob. Make bob leave room. Make alice send event to room.
|
// listen as bob. Make bob leave room. Make alice send event to room.
|
||||||
// Make sure alice gets woken up only and not bob as well.
|
// Make sure alice gets woken up only and not bob as well.
|
||||||
n := NewNotifier(streamPositionBefore)
|
n := NewNotifier(syncPositionBefore)
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
n.setUsersJoinedToRooms(map[string][]string{
|
||||||
roomID: {alice, bob},
|
roomID: {alice, bob},
|
||||||
})
|
})
|
||||||
|
|
@ -208,18 +225,18 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||||
// Make bob leave the room
|
// Make bob leave the room
|
||||||
leaveWG.Add(1)
|
leaveWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
|
pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
|
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
|
||||||
}
|
}
|
||||||
if pos != streamPositionAfter {
|
if pos != syncPositionAfter {
|
||||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter, pos)
|
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", syncPositionAfter, pos)
|
||||||
}
|
}
|
||||||
leaveWG.Done()
|
leaveWG.Done()
|
||||||
}()
|
}()
|
||||||
bobStream := n.fetchUserStream(bob, true)
|
bobStream := n.fetchUserStream(bob, true)
|
||||||
waitForBlocking(bobStream, 1)
|
waitForBlocking(bobStream, 1)
|
||||||
n.OnNewEvent(&bobLeaveEvent, "", streamPositionAfter)
|
n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
|
||||||
leaveWG.Wait()
|
leaveWG.Wait()
|
||||||
|
|
||||||
// send an event into the room. Make sure alice gets it. Bob should not.
|
// send an event into the room. Make sure alice gets it. Bob should not.
|
||||||
|
|
@ -227,19 +244,19 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||||
aliceStream := n.fetchUserStream(alice, true)
|
aliceStream := n.fetchUserStream(alice, true)
|
||||||
aliceWG.Add(1)
|
aliceWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionAfter))
|
pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
|
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
|
||||||
}
|
}
|
||||||
if pos != streamPositionAfter2 {
|
if pos != syncPositionAfter2 {
|
||||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter2, pos)
|
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", syncPositionAfter2, pos)
|
||||||
}
|
}
|
||||||
aliceWG.Done()
|
aliceWG.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
// this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
|
// this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
|
||||||
_, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionAfter))
|
_, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionAfter))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
|
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
|
||||||
}
|
}
|
||||||
|
|
@ -248,7 +265,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||||
waitForBlocking(aliceStream, 1)
|
waitForBlocking(aliceStream, 1)
|
||||||
waitForBlocking(bobStream, 1)
|
waitForBlocking(bobStream, 1)
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter2)
|
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter2)
|
||||||
aliceWG.Wait()
|
aliceWG.Wait()
|
||||||
|
|
||||||
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
|
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
|
||||||
|
|
@ -256,14 +273,13 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||||
time.Sleep(1 * time.Millisecond)
|
time.Sleep(1 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
// same as Notifier.WaitForEvents but with a timeout.
|
func waitForEvents(n *Notifier, req syncRequest) (types.SyncPosition, error) {
|
||||||
func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
|
|
||||||
listener := n.GetListener(req)
|
listener := n.GetListener(req)
|
||||||
defer listener.Close()
|
defer listener.Close()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
return types.StreamPosition(0), fmt.Errorf(
|
return types.SyncPosition{}, fmt.Errorf(
|
||||||
"waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since,
|
"waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since,
|
||||||
)
|
)
|
||||||
case <-listener.GetNotifyChannel(*req.since):
|
case <-listener.GetNotifyChannel(*req.since):
|
||||||
|
|
@ -280,7 +296,7 @@ func waitForBlocking(s *UserStream, numBlocking uint) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
|
func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest {
|
||||||
return syncRequest{
|
return syncRequest{
|
||||||
device: authtypes.Device{UserID: userID},
|
device: authtypes.Device{UserID: userID},
|
||||||
timeout: 1 * time.Minute,
|
timeout: 1 * time.Minute,
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue