Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BehaviorSubject mistakenly refers to check_unsubscribed as a class method #109

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ rvm:
- 2.3.0
- jruby-19mode # JRuby in 1.9 mode
- rbx-2.6
- rbx-3.82
before_install:
- gem update bundler
- ruby --version | grep -qF 'rubinius 2.6' || gem update bundler
14 changes: 7 additions & 7 deletions lib/rx/subjects/behavior_subject.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def has_observers?

# Gets the current value or throws an exception.
def value
gate.synchronize do
self.check_unsubscribed
gate.synchronize do
check_unsubscribed
raise @error if @error
@value
end
Expand All @@ -41,8 +41,8 @@ def value
# Notifies all subscribed observers about the end of the sequence.
def on_completed
os = nil
@gate.synchronize do
self.check_unsubscribed
@gate.synchronize do
check_unsubscribed

unless @stopped
os = @observers.clone
Expand All @@ -60,7 +60,7 @@ def on_error(error)

os = nil
@gate.synchronize do
self.check_unsubscribed
check_unsubscribed

unless @stopped
os = @observers.clone
Expand All @@ -77,7 +77,7 @@ def on_error(error)
def on_next(value)
os = nil
@gate.synchronize do
self.check_unsubscribed
check_unsubscribed
@value = value
os = @observers.clone unless @stopped
end
Expand All @@ -91,7 +91,7 @@ def subscribe(observer)

err = nil
gate.synchronize do
self.check_unsubscribed
check_unsubscribed

unless @stopped
observers.push(observer)
Expand Down
18 changes: 11 additions & 7 deletions test/rx/concurrency/test_default_scheduler.rb
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

require 'test_helper'
require 'rx/subscriptions/helpers/await_helpers'

# DefaultScheduler creates new threads in which to run scheduled tasks; a short
# sleep is necessary to allow the thread scheduler to yield to the other
# threads.
class TestDefaultScheduler < Minitest::Test
include AwaitHelpers

def setup
@scheduler = Rx::DefaultScheduler.instance
end

INTERVAL = 0.05

def test_schedule_with_state
state = []
task = ->(_, s) { s << 1 }
@scheduler.schedule_with_state(state, task)
sleep 0.001
await_array_length(state, 1, INTERVAL)

assert_equal([1], state)
end
Expand All @@ -24,15 +25,18 @@ def test_schedule_relative_with_state
state = []
task = ->(_, s) { s << 1 }
@scheduler.schedule_relative_with_state(state, 0.05, task)
sleep 0.1
await_array_length(state, 1, INTERVAL)

assert_equal([1], state)
end

def test_default_schedule_runs_in_its_own_thread
state = []
id = Thread.current.object_id
@scheduler.schedule -> { refute_equal(id, Thread.current.object_id) }
sleep 0.001
@scheduler.schedule -> { state << Thread.current.object_id }
await_array_length(state, 1, INTERVAL)

refute_equal([id], state)
end

def test_schedule_action_cancel
Expand Down
12 changes: 3 additions & 9 deletions test/rx/concurrency/test_periodic_scheduler.rb
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

require 'test_helper'
require 'rx/subscriptions/helpers/await_helpers'

class PeriodicTestClass
include Rx::PeriodicScheduler
end

def await_array_length(array, expected, interval)
sleep (expected * interval) * 0.9
deadline = Time.now + interval * (expected + 1)
while Time.now < deadline
break if array.length == expected
sleep interval / 10
end
end

class TestPeriodicScheduler < Minitest::Test
include AwaitHelpers

def setup
@scheduler = PeriodicTestClass.new
end
Expand Down
31 changes: 31 additions & 0 deletions test/rx/subjects/test_behavior_subject.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

require 'test_helper'

class TestBehaviorSubject < Minitest::Test
def test_subscriber_notified_on_change
value = 0
subject = Rx::BehaviorSubject.new 0
subject.as_observable.subscribe { |update| value = update }
subject.on_next 1
assert_equal 1, value
end

def test_multiple_observers_notified_on_change
value1 = 0
value2 = 0
subject = Rx::BehaviorSubject.new 0
subject.as_observable.subscribe { |update| value1 = update }
subject.as_observable.subscribe { |update| value2 = update }
subject.on_next 1
assert_equal 1, value1
assert_equal 1, value2
end

def test_errors_on_next_when_unsubscribed
subject = Rx::BehaviorSubject.new 0
subject.as_observable.subscribe { }
subject.unsubscribe
assert_raises(RuntimeError) { subject.on_next 1 }
end
end
10 changes: 10 additions & 0 deletions test/rx/subscriptions/helpers/await_helpers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module AwaitHelpers
def await_array_length(array, expected, interval)
sleep (expected * interval) * 0.9
deadline = Time.now + interval * (expected + 1)
while Time.now < deadline
break if array.length == expected
sleep interval / 10
end
end
end