diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index f252f4999..c8f8f9ded 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -567,12 +567,14 @@ def update_op_process(cfg, parser): # update op params of cfg.process internal_op_para = temp_cfg.get(op_in_process_name) - - cfg.process[i] = { - op_in_process_name: - None if internal_op_para is None else - namespace_to_dict(internal_op_para) - } + if internal_op_para is not None: + num_proc = internal_op_para.get('num_proc') + if 'num_proc' in internal_op_para: + internal_op_para['num_proc'] = num_proc or cfg.np + internal_op_para = namespace_to_dict(internal_op_para) + else: + internal_op_para = None + cfg.process[i] = {op_in_process_name: internal_op_para} # check the op params via type hint temp_parser = copy.deepcopy(parser) diff --git a/data_juicer/core/ray_data.py b/data_juicer/core/ray_data.py index 95c8874c9..d224a9b0a 100644 --- a/data_juicer/core/ray_data.py +++ b/data_juicer/core/ray_data.py @@ -71,7 +71,7 @@ def process_batch_arrow(table: pa.Table) -> pa.Table: def get_num_gpus(op, op_proc): - if op.use_cuda(): + if not op.use_cuda(): return 0 proc_per_gpu = op_proc / cuda_device_count() return 1.0 / proc_per_gpu diff --git a/data_juicer/ops/filter/video_ocr_area_ratio_filter.py b/data_juicer/ops/filter/video_ocr_area_ratio_filter.py index 9cece53d2..2a647541d 100644 --- a/data_juicer/ops/filter/video_ocr_area_ratio_filter.py +++ b/data_juicer/ops/filter/video_ocr_area_ratio_filter.py @@ -94,6 +94,7 @@ def __init__(self, def get_reader(self, rank): if self.use_cuda(): + rank = 0 if rank is None else rank device = f'cuda:{rank % cuda_device_count()}' self.reader.detector = self.reader.detector.to(device) self.reader.device = device diff --git a/data_juicer/utils/process_utils.py b/data_juicer/utils/process_utils.py index 2dd1e7d3a..33d0a9f68 100644 --- a/data_juicer/utils/process_utils.py +++ b/data_juicer/utils/process_utils.py @@ -55,13 +55,16 @@ def calculate_np(name, num_proc=None, use_cuda=False): """Calculate the optimum number of processes for the given OP""" + eps = 1e-9 # about 1 byte + if num_proc is None: num_proc = psutil.cpu_count() + if use_cuda: cuda_mem_available = get_min_cuda_memory() / 1024 op_proc = min( num_proc, - math.floor(cuda_mem_available / (mem_required + 0.1)) * + math.floor(cuda_mem_available / (mem_required + eps)) * cuda_device_count()) if use_cuda and mem_required == 0: logger.warning(f'The required cuda memory of Op[{name}] ' @@ -84,9 +87,9 @@ def calculate_np(name, cpu_available = psutil.cpu_count() mem_available = psutil.virtual_memory().available mem_available = mem_available / 1024**3 - op_proc = min(op_proc, math.floor(cpu_available / cpu_required)) + op_proc = min(op_proc, math.floor(cpu_available / cpu_required + eps)) op_proc = min(op_proc, - math.floor(mem_available / (mem_required + 0.1))) + math.floor(mem_available / (mem_required + eps))) if op_proc < 1.0: logger.warning(f'The required CPU number:{cpu_required} ' f'and memory:{mem_required}GB might ' diff --git a/tests/config/test_config_funcs.py b/tests/config/test_config_funcs.py index 23f2f0122..cf7e21f3d 100644 --- a/tests/config/test_config_funcs.py +++ b/tests/config/test_config_funcs.py @@ -46,7 +46,7 @@ def test_yaml_cfg_file(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': None, - 'num_proc': None, + 'num_proc': 4, 'cpu_required': 1, 'mem_required': 0, } @@ -61,7 +61,7 @@ def test_yaml_cfg_file(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': None, - 'num_proc': None, + 'num_proc': 4, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0, @@ -127,7 +127,7 @@ def test_mixture_cfg(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': None, - 'num_proc': None, + 'num_proc': 4, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0, @@ -143,7 +143,7 @@ def test_mixture_cfg(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': None, - 'num_proc': None, + 'num_proc': 4, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0, @@ -159,7 +159,7 @@ def test_mixture_cfg(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': None, - 'num_proc': None, + 'num_proc': 4, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0, @@ -175,7 +175,7 @@ def test_mixture_cfg(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': None, - 'num_proc': None, + 'num_proc': 4, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0, @@ -191,7 +191,7 @@ def test_mixture_cfg(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': None, - 'num_proc': None, + 'num_proc': 4, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0,