Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prevent task starvation #434

Merged
merged 29 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c29d395
changed process_task_ to alternate between ready and sleeping task queue
stevenzhu94 Dec 5, 2020
be2c0fd
added ready_queue to TaskQueue constructor and unit test for starvation
stevenzhu94 Dec 5, 2020
fa5024b
made fetch_ready_ boolean local rather than a class property
stevenzhu94 Dec 5, 2020
1e3cac1
Fix compatibility with new SCons
gavv Dec 5, 2020
8758253
Disable clang 11 warnings
gavv Dec 5, 2020
452b797
changed process_task_ to alternate between ready and sleeping task queue
stevenzhu94 Dec 5, 2020
da4547d
added ready_queue to TaskQueue constructor and unit test for starvation
stevenzhu94 Dec 5, 2020
a869a06
made fetch_ready_ boolean local rather than a class property
stevenzhu94 Dec 5, 2020
9ad6747
Merge branch 'develop' of https://github.com/stevenzhu94/roc-toolkit …
stevenzhu94 Dec 5, 2020
6d4916a
reimplemented fetch_ready_ flag as a class field and only flip when f…
stevenzhu94 Dec 5, 2020
b547569
fixed fetch_ready_ field name in header file
stevenzhu94 Dec 5, 2020
314b3b6
reordered fetch_ready_ member declaration in header file
stevenzhu94 Dec 5, 2020
c55ae53
added debugging printouts
stevenzhu94 Dec 5, 2020
918bdf4
fixed printout
stevenzhu94 Dec 5, 2020
d4c8d36
fixed test for all sleep test to have same deadline
stevenzhu94 Dec 5, 2020
1a58980
increased synchronizing wait time in test
stevenzhu94 Dec 5, 2020
372f92f
changed test
stevenzhu94 Dec 5, 2020
b9fec01
update test
stevenzhu94 Dec 6, 2020
b565d46
update test
stevenzhu94 Dec 6, 2020
82b8971
update test
stevenzhu94 Dec 6, 2020
7101d57
update test
stevenzhu94 Dec 6, 2020
91861df
update test
stevenzhu94 Dec 6, 2020
c7ddd2c
update test
stevenzhu94 Dec 6, 2020
7bf2824
update test
stevenzhu94 Dec 6, 2020
b553ba9
update unit test
stevenzhu94 Dec 6, 2020
5e8ae4a
increased the delay on the schedule_at tasks in test to prevent the t…
stevenzhu94 Dec 7, 2020
36ebde8
revert .travis.yml file to original
stevenzhu94 Dec 7, 2020
077c230
changed checks in starvation test to account for instances where a sl…
stevenzhu94 Dec 7, 2020
a6595f0
updated success checks in starvation test
stevenzhu94 Dec 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,12 @@ if compiler == 'clang':
'-Wno-implicit-int-float-conversion',
'-Wno-enum-float-conversion',
]})
if compiler_ver[:2] >= (11, 0):
for var in ['CXXFLAGS', 'CFLAGS']:
env.Append(**{var: [
'-Wno-suggest-override',
'-Wno-suggest-destructor-override',
]})

if platform == 'darwin':
if compiler_ver[:2] >= (10, 0):
Expand Down
8 changes: 7 additions & 1 deletion scripts/scons/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ def _run_prog(context, src, suffix):
# RunProg may incorrectly use cached results from a previous run saved for
# different file contents but the same invocation number. To prevent this, we
# monkey patch its global counter with a hashsum of the file contents.
SCons.SConf._ac_build_counter = int(hashlib.md5(src.encode()).hexdigest(), 16)
# The workaround is needed only for older versions of SCons, where
# _ac_build_counter was an integer.
try:
if type(SCons.SConf._ac_build_counter) is int:
SCons.SConf._ac_build_counter = int(hashlib.md5(src.encode()).hexdigest(), 16)
except:
pass
return context.RunProg(src, suffix)

def CheckLibWithHeaderExt(context, libs, headers, language, expr='1', run=True):
Expand Down
19 changes: 16 additions & 3 deletions src/modules/roc_ctl/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ TaskQueue::ICompletionHandler::~ICompletionHandler() {
TaskQueue::TaskQueue()
: started_(false)
, stop_(false)
, fetch_ready_(true)
, ready_queue_size_(0) {
started_ = Thread::start();
}
Expand Down Expand Up @@ -159,10 +160,22 @@ bool TaskQueue::process_tasks_() {
core::Mutex::Lock lock(task_mutex_);

for (;;) {
Task* task = fetch_ready_task_();

if (!task) {
Task* task = NULL;

if (fetch_ready_) {
task = fetch_ready_task_();
if (!task) {
task = fetch_sleeping_task_();
} else {
fetch_ready_ = !fetch_ready_;
}
} else {
task = fetch_sleeping_task_();
if (!task) {
task = fetch_ready_task_();
} else {
fetch_ready_ = !fetch_ready_;
}
}

if (!task) {
Expand Down
1 change: 1 addition & 0 deletions src/modules/roc_ctl/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ class TaskQueue : private core::Thread {

bool started_;
core::Atomic<int> stop_;
bool fetch_ready_;

core::Atomic<int> ready_queue_size_;
core::MpscQueue<Task, core::NoOwnership> ready_queue_;
Expand Down
77 changes: 75 additions & 2 deletions src/tests/roc_ctl/test_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,9 +747,9 @@ TEST(task_queue, schedule_at_and_schedule) {
const core::nanoseconds_t now = core::timestamp();

tq.schedule(tasks[0], &handler);
tq.schedule_at(tasks[1], now + core::Millisecond * 3, &handler);
tq.schedule_at(tasks[1], now + core::Millisecond * 7, &handler);
tq.schedule(tasks[2], &handler);
tq.schedule_at(tasks[3], now + core::Millisecond * 1, &handler);
tq.schedule_at(tasks[3], now + core::Millisecond * 5, &handler);

tq.unblock_one();
CHECK(handler.wait_called() == &tasks[0]);
Expand Down Expand Up @@ -1290,5 +1290,78 @@ TEST(task_queue, reschedule_cancelled) {
UNSIGNED_LONGS_EQUAL(1, tq.num_tasks());
}

TEST(task_queue, no_starvation) {
TestTaskQueue tq;
CHECK(tq.valid());

enum { NumTasks = 6 };

UNSIGNED_LONGS_EQUAL(0, tq.num_tasks());

TestHandler handler;
handler.expect_success(true);
handler.expect_n_calls(NumTasks);

TestTaskQueue::Task tasks[NumTasks];

tq.block();

const core::nanoseconds_t now = core::timestamp();
const core::nanoseconds_t WaitTime = core::Millisecond;

tq.schedule_at(tasks[0], now + WaitTime, &handler);
tq.schedule_at(tasks[1], now + WaitTime * 2, &handler);
tq.schedule_at(tasks[2], now + WaitTime * 3, &handler);
tq.schedule(tasks[3], &handler);
tq.schedule(tasks[4], &handler);
tq.schedule(tasks[5], &handler);

for (size_t i = 0; i < NumTasks; i++) {
tq.set_nth_result(i, true);
}

// wait for sleeping task to sync
core::sleep_for(WaitTime * (NumTasks / 2));

// check that the tasks are fetched from alternating queues
tq.unblock_one();
TaskQueue::Task* temp = handler.wait_called();
CHECK(temp == &tasks[0] || temp == &tasks[3]);
UNSIGNED_LONGS_EQUAL(1, tq.num_tasks());
CHECK(tasks[0].success() || tasks[3].success());

tq.unblock_one();
temp = handler.wait_called();
CHECK(temp == &tasks[0] || temp == &tasks[3]);
UNSIGNED_LONGS_EQUAL(2, tq.num_tasks());
CHECK(tasks[0].success() && tasks[3].success());

tq.unblock_one();
temp = handler.wait_called();
CHECK(temp == &tasks[1] || temp == &tasks[4]);
UNSIGNED_LONGS_EQUAL(3, tq.num_tasks());
CHECK(tasks[1].success() || tasks[4].success());

tq.unblock_one();
temp = handler.wait_called();
CHECK(temp == &tasks[1] || temp == &tasks[4]);
UNSIGNED_LONGS_EQUAL(4, tq.num_tasks());
CHECK(tasks[1].success() && tasks[4].success());

tq.unblock_one();
temp = handler.wait_called();
CHECK(temp == &tasks[2] || temp == &tasks[5]);
UNSIGNED_LONGS_EQUAL(5, tq.num_tasks());
CHECK(tasks[2].success() || tasks[5].success());

tq.unblock_one();
temp = handler.wait_called();
CHECK(temp == &tasks[2] || temp == &tasks[5]);
UNSIGNED_LONGS_EQUAL(6, tq.num_tasks());
CHECK(tasks[2].success() && tasks[5].success());

tq.check_all_unblocked();
}

} // namespace ctl
} // namespace roc