Skip to content

Commit

Permalink
fix video memory leak (#374)
Browse files Browse the repository at this point in the history
* fixed

* error traceback

* fix error

* fix error

* fix questions
  • Loading branch information
BeachWang authored Aug 1, 2024
1 parent ef38589 commit 8d53f23
Show file tree
Hide file tree
Showing 25 changed files with 96 additions and 54 deletions.
2 changes: 2 additions & 0 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def wrapper(samples, *args, **kwargs):
logger.error(
f'An error occurred in mapper operation when processing '
f'samples {samples}, {type(e)}: {e}')
traceback.print_exc()
ret = {key: [] for key in samples.keys()}
ret[Fields.stats] = []
ret[Fields.source_file] = []
Expand Down Expand Up @@ -97,6 +98,7 @@ def wrapper(sample, *args, **kwargs):
logger.error(
f'An error occurred in mapper operation when processing '
f'sample {sample}, {type(e)}: {e}')
traceback.print_exc()
ret = {key: [] for key in sample.keys()}
ret[Fields.stats] = []
ret[Fields.source_file] = []
Expand Down
6 changes: 5 additions & 1 deletion data_juicer/ops/deduplicator/ray_video_deduplicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from jsonargparse.typing import PositiveInt

from data_juicer.utils.mm_utils import load_data_with_context, load_video
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)

from ..base_op import OPERATORS
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -52,4 +53,7 @@ def calculate_hash(self, sample, context=False):
if packet.stream.type == 'video':
md5_hash.update(bytes(packet))

for key in videos:
close_video(videos[key])

return md5_hash.hexdigest()
6 changes: 5 additions & 1 deletion data_juicer/ops/deduplicator/video_deduplicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from typing import Dict, Set, Tuple

from data_juicer.utils.constant import HashKeys
from data_juicer.utils.mm_utils import load_data_with_context, load_video
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)

from ..base_op import OPERATORS, Deduplicator
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -61,6 +62,9 @@ def compute_hash(self, sample, context=False):
if packet.stream.type == 'video':
md5_hash.update(bytes(packet))

for key in videos:
close_video(videos[key])

sample[HashKeys.videohash] = md5_hash.hexdigest()
return sample

Expand Down
4 changes: 2 additions & 2 deletions data_juicer/ops/filter/video_aesthetics_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import (extract_key_frames,
from data_juicer.utils.mm_utils import (close_video, extract_key_frames,
extract_video_frames_uniformly,
load_data_with_context, load_video)

Expand Down Expand Up @@ -181,7 +181,7 @@ def compute_stats(self, sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

return sample

Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/filter/video_aspect_ratio_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import numpy as np

from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import load_data_with_context, load_video
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)

from ..base_op import OPERATORS, Filter
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -67,7 +68,7 @@ def compute_stats(self, sample, context=False):
video_aspect_ratios[
key] = stream.codec_context.width / stream.codec_context.height
if not context:
video.close()
close_video(video)

sample[Fields.stats][StatsKeys.video_aspect_ratios] = [
video_aspect_ratios[key] for key in loaded_video_keys
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/filter/video_duration_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from jsonargparse.typing import NonNegativeInt

from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import load_data_with_context, load_video
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)

from ..base_op import OPERATORS, Filter
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -68,7 +69,7 @@ def compute_stats(self, sample, context=False):
video_durations[video_key] = round(stream.duration *
stream.time_base)
if not context:
video.close()
close_video(video)

# get video durations
sample[Fields.stats][StatsKeys.video_duration] = [
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/filter/video_frames_text_similarity_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import (SpecialTokens, extract_key_frames,
from data_juicer.utils.mm_utils import (SpecialTokens, close_video,
extract_key_frames,
extract_video_frames_uniformly,
load_data_with_context, load_video,
remove_special_tokens)
Expand Down Expand Up @@ -195,7 +196,7 @@ def compute_stats(self, sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

return sample

Expand Down
4 changes: 2 additions & 2 deletions data_juicer/ops/filter/video_nsfw_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import (extract_key_frames,
from data_juicer.utils.mm_utils import (close_video, extract_key_frames,
extract_video_frames_uniformly,
load_data_with_context, load_video)
from data_juicer.utils.model_utils import get_model, prepare_model
Expand Down Expand Up @@ -156,7 +156,7 @@ def compute_stats(self, sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

return sample

Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/filter/video_ocr_area_ratio_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from data_juicer import cuda_device_count
from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import (extract_video_frames_uniformly,
from data_juicer.utils.mm_utils import (close_video,
extract_video_frames_uniformly,
load_data_with_context, load_video)

from ..base_op import OPERATORS, UNFORKABLE, Filter
Expand Down Expand Up @@ -171,7 +172,7 @@ def compute_stats(self, sample, rank=None, context=False):
video_ocr_area_ratios[video_key] = np.mean(frame_ocr_area_ratios)

if not context:
container.close()
close_video(container)

# get video durations
sample[Fields.stats][StatsKeys.video_ocr_area_ratio] = [
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/filter/video_resolution_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from jsonargparse.typing import PositiveInt

from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import load_data_with_context, load_video
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)

from ..base_op import OPERATORS, Filter
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -91,7 +92,7 @@ def compute_stats(self, sample, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

return sample

Expand Down
4 changes: 2 additions & 2 deletions data_juicer/ops/filter/video_watermark_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import (extract_key_frames,
from data_juicer.utils.mm_utils import (close_video, extract_key_frames,
extract_video_frames_uniformly,
load_data_with_context, load_video)
from data_juicer.utils.model_utils import get_model, prepare_model
Expand Down Expand Up @@ -157,7 +157,7 @@ def compute_stats(self, sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

return sample

Expand Down
6 changes: 4 additions & 2 deletions data_juicer/ops/mapper/video_captioning_from_frames_mapper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# yapf: disable
import copy
import random

Expand All @@ -8,7 +9,8 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import HashKeys
from data_juicer.utils.mm_utils import (SpecialTokens, extract_key_frames,
from data_juicer.utils.mm_utils import (SpecialTokens, close_video,
extract_key_frames,
extract_video_frames_uniformly,
insert_texts_after_placeholders,
load_data_with_context, load_video,
Expand Down Expand Up @@ -285,7 +287,7 @@ def _process_single_sample(self, ori_sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])
return generated_samples

def _reduce_captions(self, chunk, generated_text_candidates_single_chunk):
Expand Down
6 changes: 4 additions & 2 deletions data_juicer/ops/mapper/video_captioning_from_video_mapper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# yapf: disable
import copy
import random

Expand All @@ -8,7 +9,8 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import HashKeys
from data_juicer.utils.mm_utils import (SpecialTokens, extract_key_frames,
from data_juicer.utils.mm_utils import (SpecialTokens, close_video,
extract_key_frames,
extract_video_frames_uniformly,
insert_texts_after_placeholders,
load_data_with_context, load_video,
Expand Down Expand Up @@ -292,7 +294,7 @@ def _process_single_sample(self, ori_sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])
return generated_samples

def _reduce_captions(self, chunk, generated_text_candidates_single_chunk):
Expand Down
7 changes: 4 additions & 3 deletions data_juicer/ops/mapper/video_face_blur_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import transfer_filename
from data_juicer.utils.mm_utils import (load_data_with_context, load_video,
pil_to_opencv, process_each_frame)
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video, pil_to_opencv,
process_each_frame)

from ..base_op import OPERATORS, Mapper
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -93,7 +94,7 @@ def process(self, sample, context=False):
processed_video_keys[video_key] = output_video_key

if not context:
video.close()
close_video(video)

# when the file is modified, its source file needs to be updated.
for i, value in enumerate(loaded_video_keys):
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/mapper/video_remove_watermark_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import transfer_filename
from data_juicer.utils.logger_utils import HiddenPrints
from data_juicer.utils.mm_utils import (extract_video_frames_uniformly,
from data_juicer.utils.mm_utils import (close_video,
extract_video_frames_uniformly,
load_data_with_context, load_video,
parse_string_to_roi,
process_each_frame)
Expand Down Expand Up @@ -233,7 +234,7 @@ def process_frame_func(frame):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

# when the file is modified, its source file needs to be updated.
for i, value in enumerate(sample[self.video_key]):
Expand Down
4 changes: 2 additions & 2 deletions data_juicer/ops/mapper/video_resize_aspect_ratio_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import transfer_filename
from data_juicer.utils.logger_utils import HiddenPrints
from data_juicer.utils.mm_utils import load_video
from data_juicer.utils.mm_utils import close_video, load_video

from ..base_op import OPERATORS, Mapper

Expand Down Expand Up @@ -117,7 +117,7 @@ def process(self, sample):
original_width = video.codec_context.width
original_height = video.codec_context.height
original_aspect_ratio = Fraction(original_width, original_height)
container.close()
close_video(container)

if (original_aspect_ratio >= self.min_ratio
and original_aspect_ratio <= self.max_ratio):
Expand Down
4 changes: 2 additions & 2 deletions data_juicer/ops/mapper/video_resize_resolution_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import transfer_filename
from data_juicer.utils.logger_utils import HiddenPrints
from data_juicer.utils.mm_utils import load_video
from data_juicer.utils.mm_utils import close_video, load_video

from ..base_op import OPERATORS, Mapper
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -102,7 +102,7 @@ def process(self, sample, context=False):
width = video.codec_context.width
height = video.codec_context.height
origin_ratio = width / height
container.close()
close_video(container)

if width >= self.min_width and width <= self.max_width and \
height >= self.min_height and height <= self.max_height:
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/mapper/video_split_by_duration_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import (add_suffix_to_filename,
transfer_filename)
from data_juicer.utils.mm_utils import (SpecialTokens, cut_video_by_seconds,
from data_juicer.utils.mm_utils import (SpecialTokens, close_video,
cut_video_by_seconds,
get_video_duration, load_video)

from ..base_op import OPERATORS, Mapper
Expand Down Expand Up @@ -123,7 +124,7 @@ def _process_single_sample(self, sample):
video = videos[video_key]
new_video_keys = self.split_videos_by_duration(
video_key, video)
video.close()
close_video(video)
split_video_keys.extend(new_video_keys)
place_holders.append(SpecialTokens.video *
len(new_video_keys))
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/mapper/video_split_by_key_frame_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import (add_suffix_to_filename,
transfer_filename)
from data_juicer.utils.mm_utils import (SpecialTokens, cut_video_by_seconds,
from data_juicer.utils.mm_utils import (SpecialTokens, close_video,
cut_video_by_seconds,
get_key_frame_seconds, load_video)

from ..base_op import OPERATORS, Mapper
Expand Down Expand Up @@ -105,7 +106,7 @@ def _process_single_sample(self, sample):
video_count]:
video = videos[video_key]
new_video_keys = self.get_split_key_frame(video_key, video)
video.close()
close_video(video)
split_video_keys.extend(new_video_keys)
place_holders.append(SpecialTokens.video *
len(new_video_keys))
Expand Down
7 changes: 6 additions & 1 deletion data_juicer/ops/mapper/video_tagging_from_frames_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields
from data_juicer.utils.mm_utils import (extract_key_frames,
from data_juicer.utils.mm_utils import (close_video, extract_key_frames,
extract_video_frames_uniformly,
load_data_with_context, load_video)
from data_juicer.utils.model_utils import get_model, prepare_model
Expand Down Expand Up @@ -110,5 +110,10 @@ def process(self, sample, rank=None, context=False):
word_count = Counter(words)
sorted_word_list = [item for item, _ in word_count.most_common()]
video_tags.append(sorted_word_list)

if not context:
for vid_key in videos:
close_video(videos[vid_key])

sample[Fields.video_frame_tags] = video_tags
return sample
1 change: 1 addition & 0 deletions data_juicer/ops/op_fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def compute_stats(self, sample, rank=None):
for context_key in sample[Fields.context]:
if isinstance(sample[Fields.context][context_key],
av.container.InputContainer):
sample[Fields.context][context_key].streams.video[0].close()
sample[Fields.context][context_key].close()
_ = sample.pop(Fields.context)
return sample
Expand Down
Loading

0 comments on commit 8d53f23

Please sign in to comment.