mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 23:13:11 -06:00
Fix tests so they don't race
This commit is contained in:
parent
37ded4fe2f
commit
269cab118a
|
|
@ -80,7 +80,9 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
|
stream := n.fetchUserStream("@bob:localhost", true)
|
||||||
|
waitForBlocking(stream, 1)
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, types.StreamPosition(12))
|
n.OnNewEvent(&randomMessageEvent, types.StreamPosition(12))
|
||||||
|
|
||||||
|
|
@ -125,6 +127,14 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait until something is Wait()ing on the user stream.
|
||||||
|
func waitForBlocking(s *UserStream, numBlocking int) {
|
||||||
|
for numBlocking != s.NumWaiting() {
|
||||||
|
// This is horrible but I don't want to add a signalling mechanism JUST for testing.
|
||||||
|
time.Sleep(1 * time.Microsecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
|
func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
|
||||||
return syncRequest{
|
return syncRequest{
|
||||||
userID: userID,
|
userID: userID,
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package sync
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
@ -25,10 +26,12 @@ import (
|
||||||
type UserStream struct {
|
type UserStream struct {
|
||||||
UserID string
|
UserID string
|
||||||
// Because this is a Cond, we can notify all waiting goroutines so this works
|
// Because this is a Cond, we can notify all waiting goroutines so this works
|
||||||
// across devices. Protects pos.
|
// across devices for the same user. Protects pos.
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
// The position to broadcast to callers of Wait().
|
// The position to broadcast to callers of Wait().
|
||||||
pos types.StreamPosition
|
pos types.StreamPosition
|
||||||
|
// The number of goroutines blocked on Wait() - used for testing and metrics
|
||||||
|
numWaiting int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewUserStream creates a new user stream
|
// NewUserStream creates a new user stream
|
||||||
|
|
@ -42,8 +45,10 @@ func NewUserStream(userID string) *UserStream {
|
||||||
// Wait blocks until there is a new stream position for this user, which is then returned.
|
// Wait blocks until there is a new stream position for this user, which is then returned.
|
||||||
func (s *UserStream) Wait() (pos types.StreamPosition) {
|
func (s *UserStream) Wait() (pos types.StreamPosition) {
|
||||||
s.cond.L.Lock()
|
s.cond.L.Lock()
|
||||||
|
s.numWaiting += 1
|
||||||
s.cond.Wait()
|
s.cond.Wait()
|
||||||
pos = s.pos
|
pos = s.pos
|
||||||
|
s.numWaiting -= 1
|
||||||
s.cond.L.Unlock()
|
s.cond.L.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -55,3 +60,10 @@ func (s *UserStream) Broadcast(pos types.StreamPosition) {
|
||||||
s.cond.L.Unlock()
|
s.cond.L.Unlock()
|
||||||
s.cond.Broadcast()
|
s.cond.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NumWaiting returns the number of goroutines waiting for Wait() to return. Used for metrics and testing.
|
||||||
|
func (s *UserStream) NumWaiting() int {
|
||||||
|
s.cond.L.Lock()
|
||||||
|
defer s.cond.L.Unlock()
|
||||||
|
return s.numWaiting
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue