From baa47b5a7112da73d169a5554df6c74fd36b2bf9 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 15 May 2017 11:30:15 +0100 Subject: [PATCH] Add UserStream type and unit test stub --- .../dendrite/syncapi/sync/notifier.go | 51 +++---- .../dendrite/syncapi/sync/notifier_test.go | 137 ++++++++++++++++++ .../dendrite/syncapi/sync/userstream.go | 57 ++++++++ 3 files changed, 218 insertions(+), 27 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go create mode 100644 src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 11bc2e656..a739e9126 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -38,11 +38,10 @@ type Notifier struct { // A map of RoomID => Set : Map access is guarded by roomIDToJoinedUsersMutex. roomIDToJoinedUsers map[string]set roomIDToJoinedUsersMutex *sync.Mutex - // A map of user_id => Cond which can be used to wake a given user's /sync request. - // Because this is a Cond, we can notify all waiting goroutines so this works - // across devices. Map access is guarded by userIDCondsMutex. - userIDConds map[string]*sync.Cond - userIDCondsMutex *sync.Mutex + // A map of user_id => UserStream which can be used to wake a given user's /sync request. + // Map access is guarded by userStreamsMutex. + userStreams map[string]*UserStream + userStreamsMutex *sync.Mutex } // NewNotifier creates a new notifier set to the given stream position. @@ -54,8 +53,8 @@ func NewNotifier(pos types.StreamPosition) *Notifier { currPosMutex: &sync.RWMutex{}, roomIDToJoinedUsers: make(map[string]set), roomIDToJoinedUsersMutex: &sync.Mutex{}, - userIDConds: make(map[string]*sync.Cond), - userIDCondsMutex: &sync.Mutex{}, + userStreams: make(map[string]*UserStream), + userStreamsMutex: &sync.Mutex{}, } } @@ -95,7 +94,7 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosit } for _, userID := range userIDs { - n.wakeupUser(userID) + n.wakeupUser(userID, pos) } } @@ -154,36 +153,34 @@ func (n *Notifier) usersJoinedToRooms(roomIDToUserIDs map[string][]string) { } } -func (n *Notifier) wakeupUser(userID string) { - cond := n.fetchUserCond(userID, false) - if cond == nil { +func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) { + stream := n.fetchUserStream(userID, false) + if stream == nil { return } - cond.Broadcast() // wakeup all goroutines Wait()ing on this Cond + stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream } func (n *Notifier) blockUser(userID string) { - cond := n.fetchUserCond(userID, true) - cond.L.Lock() - cond.Wait() - cond.L.Unlock() + stream := n.fetchUserStream(userID, true) + stream.Wait() } -// fetchUserCond retrieves a Cond unique to the given user. If makeIfNotExists is true, -// a Cond will be made for this user if one doesn't exist and it will be returned. This -// function does not lock the Cond. -func (n *Notifier) fetchUserCond(userID string, makeIfNotExists bool) *sync.Cond { +// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true, +// a stream will be made for this user if one doesn't exist and it will be returned. This +// function does not wait for data to be available on the stream. +func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream { // There is a bit of a locking dance here, we want to lock the mutex protecting the map // but NOT the Cond that we may be returning/creating. - n.userIDCondsMutex.Lock() - defer n.userIDCondsMutex.Unlock() - cond, ok := n.userIDConds[userID] + n.userStreamsMutex.Lock() + defer n.userStreamsMutex.Unlock() + stream, ok := n.userStreams[userID] if !ok { - // TODO: Unbounded growth of locks (1 per user) - cond = sync.NewCond(&sync.Mutex{}) - n.userIDConds[userID] = cond + // TODO: Unbounded growth of streams (1 per user) + stream = NewUserStream(userID) + n.userStreams[userID] = stream } - return cond + return stream } func (n *Notifier) userJoined(roomID, userID string) { diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go new file mode 100644 index 000000000..6b9e0abe8 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -0,0 +1,137 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +var randomMessageEvent gomatrixserverlib.Event + +func init() { + var err error + randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.message", + "content": { + "body": "Hello World", + "msgtype": "m.text" + }, + "sender": "@noone:localhost", + "room_id": "!test:localhost", + "origin_server_ts": 12345, + "event_id": "$something:localhost" + }`), false) + if err != nil { + panic(err) + } +} + +// Test that the current position is returned if a request is already behind. +func TestImmediateNotification(t *testing.T) { + currPos := types.StreamPosition(11) + n := NewNotifier(currPos) + pos, err := waitForEvents(n, newTestSyncRequest("@alice:localhost", types.StreamPosition(9))) + if err != nil { + t.Fatalf("TestImmediateNotification error: %s", err) + } + if pos != currPos { + t.Fatalf("TestImmediateNotification want %d, got %d", currPos, pos) + } +} + +// Test that new events to a joined room unblocks the request. +func TestNewEventAndJoinedToRoom(t *testing.T) { + currPos := types.StreamPosition(11) + newPos := types.StreamPosition(12) + n := NewNotifier(currPos) + n.usersJoinedToRooms(map[string][]string{ + "!test:localhost": []string{"@alice:localhost", "@bob:localhost"}, + }) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + pos, err := waitForEvents(n, newTestSyncRequest("@bob:localhost", types.StreamPosition(11))) + if err != nil { + t.Errorf("TestNewEventAndJoinedToRoom error: %s", err) + } + if pos != newPos { + t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", newPos, pos) + } + wg.Done() + }() + time.Sleep(1 * time.Millisecond) + + n.OnNewEvent(&randomMessageEvent, types.StreamPosition(12)) + + wg.Wait() +} + +// Test that new events to a not joined room does not unblock the request. +func TestNewEventAndNotJoinedToRoom(t *testing.T) { + +} + +// Test that an invite unblocks the request +func TestNewInviteEventForUser(t *testing.T) { + +} + +// Test that all blocked requests get woken up on a new event. +func TestMultipleRequestWakeup(t *testing.T) { + +} + +// Test that you stop getting unblocked when you leave a room. +func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { + +} + +// same as Notifier.WaitForEvents but with a timeout. +func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { + done := make(chan types.StreamPosition, 1) + go func() { + newPos := n.WaitForEvents(req) + done <- newPos + close(done) + }() + select { + case <-time.After(5 * time.Second): + return types.StreamPosition(0), fmt.Errorf( + "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, + ) + case p := <-done: + return p, nil + } +} + +func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest { + return syncRequest{ + userID: userID, + timeout: 1 * time.Minute, + since: since, + wantFullState: false, + limit: defaultTimelineLimit, + log: util.GetLogger(context.TODO()), + } +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go new file mode 100644 index 000000000..34e7ed8dc --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go @@ -0,0 +1,57 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package sync + +import ( + "sync" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +// UserStream represents a communication mechanism between the /sync request goroutine +// and the underlying sync server goroutines. Goroutines can Wait() for a stream position and +// goroutines can Broadcast(streamPosition) to other goroutines. +type UserStream struct { + UserID string + // Because this is a Cond, we can notify all waiting goroutines so this works + // across devices. Protects pos. + cond *sync.Cond + // The position to broadcast to callers of Wait(). + pos types.StreamPosition +} + +// NewUserStream creates a new user stream +func NewUserStream(userID string) *UserStream { + return &UserStream{ + UserID: userID, + cond: sync.NewCond(&sync.Mutex{}), + } +} + +// Wait blocks until there is a new stream position for this user, which is then returned. +func (s *UserStream) Wait() (pos types.StreamPosition) { + s.cond.L.Lock() + s.cond.Wait() + pos = s.pos + s.cond.L.Unlock() + return +} + +// Broadcast a new stream position for this user. +func (s *UserStream) Broadcast(pos types.StreamPosition) { + s.cond.L.Lock() + s.pos = pos + s.cond.L.Unlock() + s.cond.Broadcast() +}