diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 757f90f30..6c8424c48 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -26,7 +26,19 @@ import ( "github.com/matrix-org/util" ) -var randomMessageEvent gomatrixserverlib.Event +var ( + randomMessageEvent gomatrixserverlib.Event + aliceInviteBobEvent gomatrixserverlib.Event +) + +var ( + streamPositionVeryOld = types.StreamPosition(5) + streamPositionBefore = types.StreamPosition(11) + streamPositionAfter = types.StreamPosition(12) + roomID = "!test:localhost" + alice = "@alice:localhost" + bob = "@bob:localhost" +) func init() { var err error @@ -37,9 +49,23 @@ func init() { "msgtype": "m.text" }, "sender": "@noone:localhost", - "room_id": "!test:localhost", + "room_id": "`+roomID+`", "origin_server_ts": 12345, - "event_id": "$something:localhost" + "event_id": "$randomMessageEvent:localhost" + }`), false) + if err != nil { + panic(err) + } + aliceInviteBobEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.member", + "state_key": "`+bob+`", + "content": { + "membership": "invite" + }, + "sender": "`+alice+`", + "room_id": "`+roomID+`", + "origin_server_ts": 12345, + "event_id": "$aliceInviteBobEvent:localhost" }`), false) if err != nil { panic(err) @@ -48,55 +74,70 @@ func init() { // Test that the current position is returned if a request is already behind. func TestImmediateNotification(t *testing.T) { - currPos := types.StreamPosition(11) - n := NewNotifier(currPos) - pos, err := waitForEvents(n, newTestSyncRequest("@alice:localhost", types.StreamPosition(9))) + n := NewNotifier(streamPositionBefore) + pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionVeryOld)) if err != nil { t.Fatalf("TestImmediateNotification error: %s", err) } - if pos != currPos { - t.Fatalf("TestImmediateNotification want %d, got %d", currPos, pos) + if pos != streamPositionBefore { + t.Fatalf("TestImmediateNotification want %d, got %d", streamPositionBefore, pos) } } // Test that new events to a joined room unblocks the request. func TestNewEventAndJoinedToRoom(t *testing.T) { - currPos := types.StreamPosition(11) - newPos := types.StreamPosition(12) - n := NewNotifier(currPos) + n := NewNotifier(streamPositionBefore) n.usersJoinedToRooms(map[string][]string{ - "!test:localhost": []string{"@alice:localhost", "@bob:localhost"}, + roomID: []string{alice, bob}, }) var wg sync.WaitGroup wg.Add(1) go func() { - pos, err := waitForEvents(n, newTestSyncRequest("@bob:localhost", types.StreamPosition(11))) + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) if err != nil { t.Errorf("TestNewEventAndJoinedToRoom error: %s", err) } - if pos != newPos { - t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", newPos, pos) + if pos != streamPositionAfter { + t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", streamPositionAfter, pos) } wg.Done() }() - stream := n.fetchUserStream("@bob:localhost", true) + stream := n.fetchUserStream(bob, true) waitForBlocking(stream, 1) - n.OnNewEvent(&randomMessageEvent, types.StreamPosition(12)) + n.OnNewEvent(&randomMessageEvent, streamPositionAfter) wg.Wait() } -// Test that new events to a not joined room does not unblock the request. -func TestNewEventAndNotJoinedToRoom(t *testing.T) { - -} - // Test that an invite unblocks the request func TestNewInviteEventForUser(t *testing.T) { + n := NewNotifier(streamPositionBefore) + n.usersJoinedToRooms(map[string][]string{ + roomID: []string{alice, bob}, + }) + var wg sync.WaitGroup + wg.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore)) + if err != nil { + t.Errorf("TestNewInviteEventForUser error: %s", err) + } + if pos != streamPositionAfter { + t.Errorf("TestNewInviteEventForUser want %d, got %d", streamPositionAfter, pos) + } + wg.Done() + }() + + stream := n.fetchUserStream(bob, true) + waitForBlocking(stream, 1) + + n.OnNewEvent(&aliceInviteBobEvent, streamPositionAfter) + + wg.Wait() } // Test that all blocked requests get woken up on a new event. diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go index 5a624443f..ab91161f3 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go @@ -45,10 +45,10 @@ func NewUserStream(userID string) *UserStream { // Wait blocks until there is a new stream position for this user, which is then returned. func (s *UserStream) Wait() (pos types.StreamPosition) { s.cond.L.Lock() - s.numWaiting += 1 + s.numWaiting++ s.cond.Wait() pos = s.pos - s.numWaiting -= 1 + s.numWaiting-- s.cond.L.Unlock() return }