mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-20 05:13:11 -06:00
Add test for waking up single device
This commit is contained in:
parent
4ca8e4822f
commit
d05d52df19
|
|
@ -173,16 +173,21 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
|
|||
}
|
||||
}
|
||||
|
||||
// wakeupUsers will wake up the sync strems for all of the devices for all of the
|
||||
// specified user IDs.
|
||||
func (n *Notifier) wakeupUsers(userIDs []string, newPos types.StreamingToken) {
|
||||
for _, userID := range userIDs {
|
||||
for _, stream := range n.fetchUserStreams(userID) {
|
||||
if stream != nil {
|
||||
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
|
||||
if stream == nil {
|
||||
continue
|
||||
}
|
||||
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// wakeupUserDevice will wake up the sync stream for a specific user device. Other
|
||||
// device streams will be left alone.
|
||||
// nolint:unused
|
||||
func (n *Notifier) wakeupUserDevice(userDevices map[string]string, newPos types.StreamingToken) {
|
||||
for userID, deviceID := range userDevices {
|
||||
|
|
@ -192,25 +197,32 @@ func (n *Notifier) wakeupUserDevice(userDevices map[string]string, newPos types.
|
|||
}
|
||||
}
|
||||
|
||||
// fetchUserDeviceStream retrieves a stream unique to the given user. If makeIfNotExists is true,
|
||||
// a stream will be made for this user if one doesn't exist and it will be returned. This
|
||||
// fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true,
|
||||
// a stream will be made for this device if one doesn't exist and it will be returned. This
|
||||
// function does not wait for data to be available on the stream.
|
||||
// NB: Callers should have locked the mutex before calling this function.
|
||||
func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream {
|
||||
_, ok := n.userDeviceStreams[userID]
|
||||
if !ok && makeIfNotExists {
|
||||
if !ok {
|
||||
if !makeIfNotExists {
|
||||
return nil
|
||||
}
|
||||
n.userDeviceStreams[userID] = map[string]*UserDeviceStream{}
|
||||
}
|
||||
stream, ok := n.userDeviceStreams[userID][deviceID]
|
||||
if !ok && makeIfNotExists {
|
||||
if !ok {
|
||||
if !makeIfNotExists {
|
||||
return nil
|
||||
}
|
||||
// TODO: Unbounded growth of streams (1 per user)
|
||||
stream = NewUserDeviceStream(userID, deviceID, n.currPos)
|
||||
n.userDeviceStreams[userID][deviceID] = stream
|
||||
if stream = NewUserDeviceStream(userID, deviceID, n.currPos); stream != nil {
|
||||
n.userDeviceStreams[userID][deviceID] = stream
|
||||
}
|
||||
}
|
||||
return stream
|
||||
}
|
||||
|
||||
// fetchUserDeviceStreams retrieves all streams for the given user. If makeIfNotExists is true,
|
||||
// fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true,
|
||||
// a stream will be made for this user if one doesn't exist and it will be returned. This
|
||||
// function does not wait for data to be available on the stream.
|
||||
// NB: Callers should have locked the mutex before calling this function.
|
||||
|
|
|
|||
|
|
@ -153,6 +153,35 @@ func TestCorrectStream(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCorrectStreamWakeup(t *testing.T) {
|
||||
n := NewNotifier(syncPositionBefore)
|
||||
awoken := make(chan string)
|
||||
|
||||
streamone := lockedFetchUserStream(n, alice, "one")
|
||||
streamtwo := lockedFetchUserStream(n, alice, "two")
|
||||
|
||||
go waitForBlocking(streamone, 1)
|
||||
go waitForBlocking(streamtwo, 1)
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-n.userDeviceStreams[alice]["one"].signalChannel:
|
||||
awoken <- "one"
|
||||
case <-n.userDeviceStreams[alice]["two"].signalChannel:
|
||||
awoken <- "two"
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(1 * time.Microsecond)
|
||||
|
||||
wake := "two"
|
||||
n.wakeupUserDevice(map[string]string{alice: wake}, syncPositionAfter)
|
||||
|
||||
if result := <-awoken; result != wake {
|
||||
t.Fatalf("expected to wake %q, got %q", wake, result)
|
||||
}
|
||||
}
|
||||
|
||||
// Test that an invite unblocks the request
|
||||
func TestNewInviteEventForUser(t *testing.T) {
|
||||
n := NewNotifier(syncPositionBefore)
|
||||
|
|
@ -316,7 +345,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamingToken, error) {
|
|||
func waitForBlocking(s *UserDeviceStream, numBlocking uint) {
|
||||
for numBlocking != s.NumWaiting() {
|
||||
// This is horrible but I don't want to add a signalling mechanism JUST for testing.
|
||||
time.Sleep(1 * time.Microsecond)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue