From d05d52df19752b64ee3f753c2e51f4c3021c23ba Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 27 May 2020 17:55:42 +0100 Subject: [PATCH] Add test for waking up single device --- syncapi/sync/notifier.go | 30 +++++++++++++++++++++--------- syncapi/sync/notifier_test.go | 31 ++++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 7b1b49a18..9b410a0c4 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -173,16 +173,21 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) { } } +// wakeupUsers will wake up the sync strems for all of the devices for all of the +// specified user IDs. func (n *Notifier) wakeupUsers(userIDs []string, newPos types.StreamingToken) { for _, userID := range userIDs { for _, stream := range n.fetchUserStreams(userID) { - if stream != nil { - stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream + if stream == nil { + continue } + stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream } } } +// wakeupUserDevice will wake up the sync stream for a specific user device. Other +// device streams will be left alone. // nolint:unused func (n *Notifier) wakeupUserDevice(userDevices map[string]string, newPos types.StreamingToken) { for userID, deviceID := range userDevices { @@ -192,25 +197,32 @@ func (n *Notifier) wakeupUserDevice(userDevices map[string]string, newPos types. } } -// fetchUserDeviceStream retrieves a stream unique to the given user. If makeIfNotExists is true, -// a stream will be made for this user if one doesn't exist and it will be returned. This +// fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true, +// a stream will be made for this device if one doesn't exist and it will be returned. This // function does not wait for data to be available on the stream. // NB: Callers should have locked the mutex before calling this function. func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream { _, ok := n.userDeviceStreams[userID] - if !ok && makeIfNotExists { + if !ok { + if !makeIfNotExists { + return nil + } n.userDeviceStreams[userID] = map[string]*UserDeviceStream{} } stream, ok := n.userDeviceStreams[userID][deviceID] - if !ok && makeIfNotExists { + if !ok { + if !makeIfNotExists { + return nil + } // TODO: Unbounded growth of streams (1 per user) - stream = NewUserDeviceStream(userID, deviceID, n.currPos) - n.userDeviceStreams[userID][deviceID] = stream + if stream = NewUserDeviceStream(userID, deviceID, n.currPos); stream != nil { + n.userDeviceStreams[userID][deviceID] = stream + } } return stream } -// fetchUserDeviceStreams retrieves all streams for the given user. If makeIfNotExists is true, +// fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true, // a stream will be made for this user if one doesn't exist and it will be returned. This // function does not wait for data to be available on the stream. // NB: Callers should have locked the mutex before calling this function. diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go index 43b76d901..001f2d233 100644 --- a/syncapi/sync/notifier_test.go +++ b/syncapi/sync/notifier_test.go @@ -153,6 +153,35 @@ func TestCorrectStream(t *testing.T) { } } +func TestCorrectStreamWakeup(t *testing.T) { + n := NewNotifier(syncPositionBefore) + awoken := make(chan string) + + streamone := lockedFetchUserStream(n, alice, "one") + streamtwo := lockedFetchUserStream(n, alice, "two") + + go waitForBlocking(streamone, 1) + go waitForBlocking(streamtwo, 1) + + go func() { + select { + case <-n.userDeviceStreams[alice]["one"].signalChannel: + awoken <- "one" + case <-n.userDeviceStreams[alice]["two"].signalChannel: + awoken <- "two" + } + }() + + time.Sleep(1 * time.Microsecond) + + wake := "two" + n.wakeupUserDevice(map[string]string{alice: wake}, syncPositionAfter) + + if result := <-awoken; result != wake { + t.Fatalf("expected to wake %q, got %q", wake, result) + } +} + // Test that an invite unblocks the request func TestNewInviteEventForUser(t *testing.T) { n := NewNotifier(syncPositionBefore) @@ -316,7 +345,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamingToken, error) { func waitForBlocking(s *UserDeviceStream, numBlocking uint) { for numBlocking != s.NumWaiting() { // This is horrible but I don't want to add a signalling mechanism JUST for testing. - time.Sleep(1 * time.Microsecond) + time.Sleep(10 * time.Millisecond) } }