Add UserStream type and unit test stub

This commit is contained in:
Kegan Dougal 2017-05-15 11:30:15 +01:00
parent 1c7f9f929b
commit baa47b5a71
3 changed files with 218 additions and 27 deletions

View file

@ -38,11 +38,10 @@ type Notifier struct {
// A map of RoomID => Set<UserID> : Map access is guarded by roomIDToJoinedUsersMutex.
roomIDToJoinedUsers map[string]set
roomIDToJoinedUsersMutex *sync.Mutex
// A map of user_id => Cond which can be used to wake a given user's /sync request.
// Because this is a Cond, we can notify all waiting goroutines so this works
// across devices. Map access is guarded by userIDCondsMutex.
userIDConds map[string]*sync.Cond
userIDCondsMutex *sync.Mutex
// A map of user_id => UserStream which can be used to wake a given user's /sync request.
// Map access is guarded by userStreamsMutex.
userStreams map[string]*UserStream
userStreamsMutex *sync.Mutex
}
// NewNotifier creates a new notifier set to the given stream position.
@ -54,8 +53,8 @@ func NewNotifier(pos types.StreamPosition) *Notifier {
currPosMutex: &sync.RWMutex{},
roomIDToJoinedUsers: make(map[string]set),
roomIDToJoinedUsersMutex: &sync.Mutex{},
userIDConds: make(map[string]*sync.Cond),
userIDCondsMutex: &sync.Mutex{},
userStreams: make(map[string]*UserStream),
userStreamsMutex: &sync.Mutex{},
}
}
@ -95,7 +94,7 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosit
}
for _, userID := range userIDs {
n.wakeupUser(userID)
n.wakeupUser(userID, pos)
}
}
@ -154,36 +153,34 @@ func (n *Notifier) usersJoinedToRooms(roomIDToUserIDs map[string][]string) {
}
}
func (n *Notifier) wakeupUser(userID string) {
cond := n.fetchUserCond(userID, false)
if cond == nil {
func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) {
stream := n.fetchUserStream(userID, false)
if stream == nil {
return
}
cond.Broadcast() // wakeup all goroutines Wait()ing on this Cond
stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream
}
func (n *Notifier) blockUser(userID string) {
cond := n.fetchUserCond(userID, true)
cond.L.Lock()
cond.Wait()
cond.L.Unlock()
stream := n.fetchUserStream(userID, true)
stream.Wait()
}
// fetchUserCond retrieves a Cond unique to the given user. If makeIfNotExists is true,
// a Cond will be made for this user if one doesn't exist and it will be returned. This
// function does not lock the Cond.
func (n *Notifier) fetchUserCond(userID string, makeIfNotExists bool) *sync.Cond {
// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true,
// a stream will be made for this user if one doesn't exist and it will be returned. This
// function does not wait for data to be available on the stream.
func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream {
// There is a bit of a locking dance here, we want to lock the mutex protecting the map
// but NOT the Cond that we may be returning/creating.
n.userIDCondsMutex.Lock()
defer n.userIDCondsMutex.Unlock()
cond, ok := n.userIDConds[userID]
n.userStreamsMutex.Lock()
defer n.userStreamsMutex.Unlock()
stream, ok := n.userStreams[userID]
if !ok {
// TODO: Unbounded growth of locks (1 per user)
cond = sync.NewCond(&sync.Mutex{})
n.userIDConds[userID] = cond
// TODO: Unbounded growth of streams (1 per user)
stream = NewUserStream(userID)
n.userStreams[userID] = stream
}
return cond
return stream
}
func (n *Notifier) userJoined(roomID, userID string) {

View file

@ -0,0 +1,137 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
var randomMessageEvent gomatrixserverlib.Event
func init() {
var err error
randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
"type": "m.room.message",
"content": {
"body": "Hello World",
"msgtype": "m.text"
},
"sender": "@noone:localhost",
"room_id": "!test:localhost",
"origin_server_ts": 12345,
"event_id": "$something:localhost"
}`), false)
if err != nil {
panic(err)
}
}
// Test that the current position is returned if a request is already behind.
func TestImmediateNotification(t *testing.T) {
currPos := types.StreamPosition(11)
n := NewNotifier(currPos)
pos, err := waitForEvents(n, newTestSyncRequest("@alice:localhost", types.StreamPosition(9)))
if err != nil {
t.Fatalf("TestImmediateNotification error: %s", err)
}
if pos != currPos {
t.Fatalf("TestImmediateNotification want %d, got %d", currPos, pos)
}
}
// Test that new events to a joined room unblocks the request.
func TestNewEventAndJoinedToRoom(t *testing.T) {
currPos := types.StreamPosition(11)
newPos := types.StreamPosition(12)
n := NewNotifier(currPos)
n.usersJoinedToRooms(map[string][]string{
"!test:localhost": []string{"@alice:localhost", "@bob:localhost"},
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest("@bob:localhost", types.StreamPosition(11)))
if err != nil {
t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
}
if pos != newPos {
t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", newPos, pos)
}
wg.Done()
}()
time.Sleep(1 * time.Millisecond)
n.OnNewEvent(&randomMessageEvent, types.StreamPosition(12))
wg.Wait()
}
// Test that new events to a not joined room does not unblock the request.
func TestNewEventAndNotJoinedToRoom(t *testing.T) {
}
// Test that an invite unblocks the request
func TestNewInviteEventForUser(t *testing.T) {
}
// Test that all blocked requests get woken up on a new event.
func TestMultipleRequestWakeup(t *testing.T) {
}
// Test that you stop getting unblocked when you leave a room.
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
}
// same as Notifier.WaitForEvents but with a timeout.
func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
done := make(chan types.StreamPosition, 1)
go func() {
newPos := n.WaitForEvents(req)
done <- newPos
close(done)
}()
select {
case <-time.After(5 * time.Second):
return types.StreamPosition(0), fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since,
)
case p := <-done:
return p, nil
}
}
func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
return syncRequest{
userID: userID,
timeout: 1 * time.Minute,
since: since,
wantFullState: false,
limit: defaultTimelineLimit,
log: util.GetLogger(context.TODO()),
}
}

View file

@ -0,0 +1,57 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
import (
"sync"
"github.com/matrix-org/dendrite/syncapi/types"
)
// UserStream represents a communication mechanism between the /sync request goroutine
// and the underlying sync server goroutines. Goroutines can Wait() for a stream position and
// goroutines can Broadcast(streamPosition) to other goroutines.
type UserStream struct {
UserID string
// Because this is a Cond, we can notify all waiting goroutines so this works
// across devices. Protects pos.
cond *sync.Cond
// The position to broadcast to callers of Wait().
pos types.StreamPosition
}
// NewUserStream creates a new user stream
func NewUserStream(userID string) *UserStream {
return &UserStream{
UserID: userID,
cond: sync.NewCond(&sync.Mutex{}),
}
}
// Wait blocks until there is a new stream position for this user, which is then returned.
func (s *UserStream) Wait() (pos types.StreamPosition) {
s.cond.L.Lock()
s.cond.Wait()
pos = s.pos
s.cond.L.Unlock()
return
}
// Broadcast a new stream position for this user.
func (s *UserStream) Broadcast(pos types.StreamPosition) {
s.cond.L.Lock()
s.pos = pos
s.cond.L.Unlock()
s.cond.Broadcast()
}