Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reserving CPU resource in CPU inference #27321

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
dd848c2
add property enable_cpu_reservation
sunxiaoxia2022 Oct 29, 2024
23a6e42
update enable_cpu_pinning when enable_cpu_reservation=true && enable_…
sunxiaoxia2022 Oct 29, 2024
ce804ee
update test case
sunxiaoxia2022 Oct 30, 2024
bf63bf9
support cpu_reservation=true,cpu_pinning=false
sunxiaoxia2022 Nov 12, 2024
dd43e4a
change comments
sunxiaoxia2022 Nov 12, 2024
13a146e
add enable_cpu_reservation to CpuExecNetworkSupportedPropertiesAreAva…
sunxiaoxia2022 Nov 13, 2024
79099f0
initial implementation
wangleis Dec 3, 2024
b6ba3a6
update current_socket_id
wangleis Dec 4, 2024
ae3e1f0
Merge branch 'master' into xiaoxia/cpu_reservation
sunxiaoxia2022 Dec 4, 2024
7c53ce3
Revert "update current_socket_id"
wangleis Dec 4, 2024
e27cc45
Merge branch 'master' into xiaoxia/cpu_reservation
sunxiaoxia2022 Dec 6, 2024
967f2fa
Merge branch 'pr27873' into xiaoxia/cpu_reservation
sunxiaoxia2022 Dec 6, 2024
3ccc8c6
Merge branch 'master' into xiaoxia/cpu_reservation
peterchen-intel Dec 16, 2024
9e7ed1c
initial implementation
wangleis Dec 18, 2024
5cdfc10
refactor streams calculation
wangleis Dec 18, 2024
e684864
fix code style issue
wangleis Dec 18, 2024
d1f50d2
set cpu_pinning yes if user only set cpu_reservation yes
sunxiaoxia2022 Dec 23, 2024
64e8114
Merge branch 'xiaoxia/cpu_reservation' of https://github.com/sunxiaox…
sunxiaoxia2022 Dec 23, 2024
bea7d8e
Merge branch 'master' into update_proc_type_table
wangleis Dec 24, 2024
93fcb21
revert pr27873
sunxiaoxia2022 Dec 24, 2024
6412749
Merge branch 'master' into xiaoxia/cpu_reservation
sunxiaoxia2022 Dec 24, 2024
e171818
Merge branch 'pr28117' into xiaoxia/cpu_reservation
sunxiaoxia2022 Dec 24, 2024
a575038
Merge commit 'refs/pull/28117/head' of https://github.com/openvinotoo…
sunxiaoxia2022 Dec 24, 2024
bd4a132
update for cpu reservation
wangleis Dec 24, 2024
885d837
Merge branch 'master' into update_proc_type_table
wangleis Dec 24, 2024
a6d7ea1
Merge branch 'master' into xiaoxia/cpu_reservation
wangleis Dec 30, 2024
8cc37b1
add enable_cpu_reservation condition in creating executor
sunxiaoxia2022 Jan 3, 2025
daf179b
Merge branch 'xiaoxia/cpu_reservation' of https://github.com/sunxiaox…
sunxiaoxia2022 Jan 3, 2025
f6e5867
Merge branch 'master' into xiaoxia/cpu_reservation
wangleis Jan 6, 2025
34b1aff
update pinning in windows
sunxiaoxia2022 Jan 6, 2025
f3bcc0c
add lock to guarantee thread safity
sunxiaoxia2022 Jan 10, 2025
8c86f52
fix conflict
sunxiaoxia2022 Jan 10, 2025
4e9d6a1
fix ci issue
sunxiaoxia2022 Jan 10, 2025
2e0295f
fix ci test issue
sunxiaoxia2022 Jan 11, 2025
76bb972
add test case of reservation
sunxiaoxia2022 Jan 13, 2025
a0c8793
add parallel running multiple compiled model test case
sunxiaoxia2022 Jan 13, 2025
af4cf60
Merge branch 'master' into xiaoxia/cpu_reservation
sunxiaoxia2022 Jan 13, 2025
9a36e8d
rm invalid log
sunxiaoxia2022 Jan 13, 2025
ab7484d
add test case
sunxiaoxia2022 Jan 14, 2025
aab8519
fix conflict
sunxiaoxia2022 Jan 14, 2025
f46103c
fix ci test issue
sunxiaoxia2022 Jan 14, 2025
85b03f6
change test case
sunxiaoxia2022 Jan 15, 2025
b1143c6
fix conflicts
sunxiaoxia2022 Jan 15, 2025
bb622bb
change test case
sunxiaoxia2022 Jan 16, 2025
d0e19e1
change _cpu_ids_all to Impl()
sunxiaoxia2022 Jan 16, 2025
c0f2e69
fix ci test
sunxiaoxia2022 Jan 16, 2025
e06fbdd
remove smoke_CpuExecNetworkCheckCpuReservation
sunxiaoxia2022 Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ All parameters must be set before calling ``ov::Core::compile_model()`` in order
- ``ov::hint::num_request``
- ``ov::hint::scheduling_core_type``
- ``ov::hint::enable_hyper_threading``
- ``ov::hint::enable_cpu_reservation``
- ``ov::hint::enable_cpu_pinning``
- ``ov::num_streams``
- ``ov::inference_num_threads``
Expand Down
7 changes: 7 additions & 0 deletions src/inference/dev_api/openvino/runtime/system_conf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ OPENVINO_RUNTIME_API std::vector<std::vector<int>> get_proc_type_table();
*/
OPENVINO_RUNTIME_API int get_current_socket_id();

/**
* @brief Returns the numa node ID in cpu mapping table of the currently running thread.
* @ingroup ov_dev_api_system_conf
* @return numa node ID in cpu mapping
*/
OPENVINO_RUNTIME_API int get_current_numa_node_id();

/**
* @brief Returns a table of original number of processor types without filtering other plugins occupying CPU
* resources. The difference from get_proc_type_table: This is used to get the configuration of current machine. For
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class OPENVINO_RUNTIME_API CPUStreamsExecutor : public IStreamsExecutor {

std::vector<int> get_rank() override;

void cpu_reset() override;

private:
struct Impl;
std::unique_ptr<Impl> _impl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <memory>
#include <string>
#include <vector>
#include <mutex>

#include "openvino/runtime/common.hpp"
#include "openvino/runtime/properties.hpp"
Expand Down Expand Up @@ -89,14 +90,15 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
ov::hint::SchedulingCoreType::ANY_CORE; //!< PCORE_ONLY and ECORE_ONLY are valid in hybrid core machine,
//!< ANY_CORE is valid in all machines. Core type priority:
//!< physical PCore, ECore, logical PCore
bool _cpu_reservation = false; //!< Whether to reserve current cores which will not be used by other plugin.
//!< If it is true, cpu_pinning defaults to true.
bool _cpu_reservation = false; //!< Whether to reserve current cores which will not be used by other plugin or
//!< compiled model. If it is true, cpu_pinning defaults to true.
bool _cpu_pinning = false; //!< Whether to bind threads to cores.
bool _cores_limit = true; //!< Whether to limit the number of streams and threads by the number of cpu cores
std::vector<std::vector<int>> _streams_info_table = {};
std::vector<std::vector<int>> _stream_processor_ids;
int _sub_streams = 0;
std::vector<int> _rank = {};
bool _add_lock = true;

/**
* @brief Get and reserve cpu ids based on configuration and hardware information,
Expand All @@ -109,6 +111,8 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
*/
void update_executor_config();

void update_executor_config(bool lock);

/**
* @brief Set _streams_info_table and _cpu_reservation in cpu streams executor config when nstreams = 0,
* that is, only create one thread with TBB
Expand Down Expand Up @@ -136,7 +140,8 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
bool cpu_pinning = false,
bool cores_limit = true,
std::vector<std::vector<int>> streams_info_table = {},
std::vector<int> rank = {})
std::vector<int> rank = {},
bool add_lock = true)
: _name{std::move(name)},
_streams{streams},
_threads_per_stream{threads_per_stream},
Expand All @@ -145,8 +150,9 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
_cpu_pinning{cpu_pinning},
_cores_limit{cores_limit},
_streams_info_table{std::move(streams_info_table)},
_rank{rank} {
update_executor_config();
_rank{rank},
_add_lock(add_lock) {
update_executor_config(_add_lock);
}

// These APIs which includes set_property and get_property can not be removed until they will never be called by
Expand Down Expand Up @@ -266,12 +272,19 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
*/
virtual std::vector<int> get_rank() = 0;

/**
* @brief Reset cpu map table when user set enable_cpu_reservation = true
*/
virtual void cpu_reset() = 0;

/**
* @brief Execute the task in the current thread using streams executor configuration and constraints
* @param task A task to start
*/
virtual void execute(Task task) = 0;
};

static std::mutex _streams_executor_mutex;

} // namespace threading
} // namespace ov
17 changes: 17 additions & 0 deletions src/inference/include/openvino/runtime/properties.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,23 @@ static constexpr Property<std::set<ModelDistributionPolicy>> model_distribution_
*/
static constexpr Property<bool> enable_cpu_pinning{"ENABLE_CPU_PINNING"};

/**
* @brief This property allows CPU reservation during inference.
* @ingroup ov_runtime_cpp_prop_api
*
* Cpu Reservation means reserve cpus which will not be used by other plugin or compiled model. Developer can use this
* property to enable or disable CPU reservation during inference on Windows and Linux. MacOS does not support CPU
* reservation, and this property is always disabled. This property defaults to false.
*
* The following code is example to use this property.
*
* @code
* ie.set_property(ov::hint::enable_cpu_reservation(true));
* ie.set_property(ov::hint::enable_cpu_reservation(false));
* @endcode
*/
static constexpr Property<bool> enable_cpu_reservation{"ENABLE_CPU_RESERVATION"};

/**
* @brief This property define if using hyper threading during inference.
* @ingroup ov_runtime_cpp_prop_api
Expand Down
26 changes: 18 additions & 8 deletions src/inference/src/dev/threading/cpu_streams_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,6 @@ struct CPUStreamsExecutor::Impl {
_impl->_streamIdQueue.push(_streamId);
}
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
if (_impl->_config.get_name().find("StreamsExecutor") == std::string::npos) {
try {
set_cpu_used(_cpu_ids, NOT_USED);
} catch (const ov::Exception&) {
// Destructor should not throw - catch needed for static analysis.
// CPU::CPU() won't throw here as cpu_info() is called from Stream constructor.
}
}
if (nullptr != _observer) {
_observer->observe(false);
}
Expand Down Expand Up @@ -345,6 +337,7 @@ struct CPUStreamsExecutor::Impl {
_exectorMgr = executor_manager();
auto numaNodes = get_available_numa_nodes();
int streams_num = _config.get_streams();
auto processor_ids = _config.get_stream_processor_ids();
if (streams_num != 0) {
std::copy_n(std::begin(numaNodes),
std::min<std::size_t>(streams_num, numaNodes.size()),
Expand All @@ -353,6 +346,10 @@ struct CPUStreamsExecutor::Impl {
_usedNumaNodes = std::move(numaNodes);
}
for (auto streamId = 0; streamId < streams_num; ++streamId) {
if (_config.get_cpu_reservation()) {
std::lock_guard<std::mutex> lock(_cpu_ids_mutex);
_cpu_ids_all.insert(_cpu_ids_all.end(), processor_ids[streamId].begin(), processor_ids[streamId].end());
}
_threads.emplace_back([this, streamId] {
openvino::itt::threadName(_config.get_name() + "_" + std::to_string(streamId));
for (bool stopped = false; !stopped;) {
Expand Down Expand Up @@ -457,6 +454,8 @@ struct CPUStreamsExecutor::Impl {
CustomThreadLocal _streams;
std::shared_ptr<ExecutorManager> _exectorMgr;
bool _isExit = false;
std::vector<int> _cpu_ids_all;
std::mutex _cpu_ids_mutex;
};

int CPUStreamsExecutor::get_stream_id() {
Expand Down Expand Up @@ -492,9 +491,20 @@ std::vector<int> CPUStreamsExecutor::get_rank() {
return stream->_rank;
}

void CPUStreamsExecutor::cpu_reset() {
if (!_impl->_cpu_ids_all.empty()) {
set_cpu_used(_impl->_cpu_ids_all, NOT_USED);
{
std::lock_guard<std::mutex> lock(_impl->_cpu_ids_mutex);
_impl->_cpu_ids_all.clear();
}
}
}

CPUStreamsExecutor::CPUStreamsExecutor(const IStreamsExecutor::Config& config) : _impl{new Impl{config}} {}

CPUStreamsExecutor::~CPUStreamsExecutor() {
cpu_reset();
{
std::lock_guard<std::mutex> lock(_impl->_mutex);
_impl->_isStopped = true;
Expand Down
37 changes: 27 additions & 10 deletions src/inference/src/dev/threading/istreams_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,32 @@ void IStreamsExecutor::Config::update_executor_config() {
const auto proc_type_table = get_proc_type_table();
bool streams_info_available = false;

if (proc_type_table.empty()) {
return;
}

if (_cpu_reservation && !_cpu_pinning) {
_cpu_pinning = true;
if (proc_type_table.empty() || proc_type_table[0][ALL_PROC] == 0) {
if (_cpu_reservation) {
OPENVINO_THROW("[ Config ] proc_type_table is empty. No CPU resources available!");
} else {
return;
}
}

if (!_streams_info_table.empty()) {
streams_info_available = true;
std::vector<int> threads_proc_type(HYPER_THREADING_PROC + 1, 0);
int threads_all = 0;
for (size_t i = 0; i < _streams_info_table.size(); i++) {
if (_streams_info_table[i][NUMBER_OF_STREAMS] > 0) {
threads_proc_type[_streams_info_table[i][PROC_TYPE]] +=
int num_threads =
_streams_info_table[i][THREADS_PER_STREAM] * _streams_info_table[i][NUMBER_OF_STREAMS];
threads_proc_type[_streams_info_table[i][PROC_TYPE]] += num_threads;
threads_all += num_threads;
}
}
if (threads_all == 0) {
OPENVINO_THROW("streams_info_table is invalid!");
}
for (size_t i = ALL_PROC; i < threads_proc_type.size(); i++) {
if (threads_proc_type[i] > proc_type_table[0][i]) {
streams_info_available = false;
break;
OPENVINO_THROW("Not enough CPU resources!");
}
}
}
Expand Down Expand Up @@ -269,7 +274,7 @@ void IStreamsExecutor::Config::update_executor_config() {
}
}

if (_cpu_pinning) {
if (_cpu_pinning || _cpu_reservation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user set cpu_reservation only, change cpu_pinning to yes internally.
If user set both cpu_reservation and cpu_pinning, keep user setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done.

reserve_available_cpus(_streams_info_table, _stream_processor_ids, _cpu_reservation ? CPU_USED : NOT_USED);
}

Expand Down Expand Up @@ -319,6 +324,17 @@ void IStreamsExecutor::Config::update_executor_config() {
#endif
}

void IStreamsExecutor::Config::update_executor_config(bool lock) {
if (lock) {
{
std::lock_guard<std::mutex> lock{_streams_executor_mutex};
update_executor_config();
}
} else {
update_executor_config();
}
}

void IStreamsExecutor::Config::set_config_zero_stream() {
std::vector<std::vector<int>> proc_type_table = get_proc_type_table();
int core_type = MAIN_CORE_PROC;
Expand All @@ -333,6 +349,7 @@ void IStreamsExecutor::Config::set_config_zero_stream() {
socket_id = std::max(0, proc_type_table[0][PROC_SOCKET_ID]);
}
_streams_info_table.push_back({1, core_type, 1, numa_id, socket_id});
_cpu_reservation = false;
_cpu_pinning = false;
}

Expand Down
32 changes: 0 additions & 32 deletions src/inference/src/os/cpu_map_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,6 @@ class CPU {
std::mutex _cpu_mutex;
int _socket_idx = 0;

private:
/**
* @brief Sort proc_type_table by CPU ID on which application is running. The numa node containing this CPU ID
* will move to first row.
* @param[in] _processor_id CPU ID on which application is running.
* @param[in] _proc_type_table summary table of number of processors per type
* @param[in] _cpu_mapping_table CPU mapping table for each processor
* @return
*/
void sort_table_by_cpu_id(const int _processor_id,
std::vector<std::vector<int>>& _proc_type_table,
const std::vector<std::vector<int>>& _cpu_mapping_table) {
int current_numa_node = 0;
int current_socket = 0;

for (auto& row : _cpu_mapping_table) {
if (_processor_id == row[CPU_MAP_PROCESSOR_ID]) {
current_numa_node = row[CPU_MAP_NUMA_NODE_ID];
current_socket = row[CPU_MAP_SOCKET_ID];
break;
}
}
for (size_t i = 1; i < _proc_type_table.size(); i++) {
if ((current_numa_node == _proc_type_table[i][PROC_NUMA_NODE_ID]) &&
(current_socket == _proc_type_table[i][PROC_SOCKET_ID])) {
std::rotate(_proc_type_table.begin() + 1, _proc_type_table.begin() + i, _proc_type_table.end());
break;
}
}
};

friend class LinuxSortProcTableTests;
};

CPU& cpu_info();
Expand Down
5 changes: 0 additions & 5 deletions src/inference/src/os/lin/lin_system_conf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,6 @@ CPU::CPU() {
OPENVINO_THROW("CPU affinity check failed. No CPU is eligible to run inference.");
};

if (_proc_type_table.size() > 1) {
int cur_processor_id = sched_getcpu();
sort_table_by_cpu_id(cur_processor_id, _proc_type_table, _cpu_mapping_table);
}

_org_proc_type_table = _proc_type_table;

cpu_debug();
Expand Down
5 changes: 0 additions & 5 deletions src/inference/src/os/win/win_system_conf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ CPU::CPU() {
}
}

if (_proc_type_table.size() > 1) {
int cur_processor_id = GetCurrentProcessorNumber();
sort_table_by_cpu_id(cur_processor_id, _proc_type_table, _cpu_mapping_table);
}

cpu_debug();
}

Expand Down
Loading
Loading