Skip to content

Commit

Permalink
- fixed threaded downloading (again, but now progress is fixed too)
Browse files Browse the repository at this point in the history
  • Loading branch information
EchterAlsFake committed Feb 8, 2024
1 parent fef3cf4 commit d177739
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 54 deletions.
112 changes: 59 additions & 53 deletions xvideos_api/modules/download.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# Thanks to: https://github.com/EchterAlsFake/PHUB/blob/master/src/phub/modules/download.py
# oh and of course ChatGPT lol

from ffmpeg_progress_yield import FfmpegProgress
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from ffmpeg_progress_yield import FfmpegProgress
from typing import Callable
from multiprocessing import Value, Lock
from typing import Callable, List

CallbackType = Callable[[int, int], None]

Expand All @@ -15,62 +14,69 @@
"""


def download_segment(args, retry_count=5):
url, index, length, callback, processed_segments = args
for attempt in range(retry_count):
try:
segment = requests.get(url, timeout=10)
if segment.ok:
with processed_segments.get_lock(): # Ensure thread-safe increment
processed_segments.value += 1
current_processed = processed_segments.value
callback(current_processed, length)
return (index, segment.content)
except ConnectionError as e:
if 'HTTPSConnectionPool' in str(e) and attempt < retry_count - 1:
print(f"Retry {attempt + 1} for segment due to HTTPSConnectionPool error.")
continue # Retry for HTTPSConnectionPool errors
else:
print(f"Error downloading segment after {attempt + 1} attempts: {e}")
except requests.RequestException as e:
print(f"Error downloading segment: {e}")
break # No retry for other types of errors


def threaded(video, quality, callback, path, start: int = 0, num_workers: int = 10) -> bool:
segments = list(video.get_segments(quality))[start:]
length = len(segments) # Total number of segments

# Shared value for counting processed segments and a lock for thread-safe operations
processed_segments = Value('i', 0)
buffer_lock = Lock()
def _thread(url: str, timeout: int) -> bytes:
'''
Download a single segment using requests.
'''
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status() # This will raise an exception for HTTP errors
return response.content
except requests.RequestException as e:
print(f"Failed to download segment {url}: {e}")
return b''


def _base_threaded(segments: List[str],
callback: CallbackType,
max_workers: int = 50,
timeout: int = 10) -> dict[str, bytes]:
'''
Base threaded downloader for threaded backends.
'''
length = len(segments)

with ThreadPoolExecutor(max_workers=num_workers) as executor:
# Map each future to its corresponding segment index
future_to_segment = {
executor.submit(download_segment, (url, i, length, callback, processed_segments)): i
for i, url in enumerate(segments)
}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
buffer = {}
future_to_url = {executor.submit(_thread, url, timeout): url for url in segments}

for future in as_completed(future_to_segment):
segment_index = future_to_segment[future]
completed = 0
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
_, segment_data = future.result()
segment_data = future.result()
if segment_data:
with buffer_lock:
# Write the segment data in the correct order
with open(path, 'ab') as file:
file.write(segment_data)
with processed_segments.get_lock():
processed_segments.value += 1
# Halve the progress for the purpose of correct display
adjusted_progress = processed_segments.value / 2
progress_percent = (adjusted_progress / length) * 100
callback(adjusted_progress, length)
buffer[url] = segment_data
completed += 1
callback(completed, length)
except Exception as e:
print(f"Exception in downloading segment: {e}")
print(f"Error downloading segment {url}: {e}")

return True
return buffer


def threaded(max_workers: int = 100,
timeout: int = 30) -> Callable:
def wrapper(video,
quality,
callback: CallbackType,
path: str) -> None:
segments = list(video.get_segments(quality))

buffer = _base_threaded(
segments=segments,
callback=callback,
max_workers=max_workers,
timeout=timeout
)

with open(path, 'wb') as file:
for url in segments:
file.write(buffer.get(url, b''))

print(f'Successfully wrote file to {path}')

return wrapper


def default(video, quality, callback, path, start: int = 0) -> bool:
Expand Down
3 changes: 2 additions & 1 deletion xvideos_api/xvideos_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ def download(self, downloader, quality, output_path, callback=None):
default(video=self, quality=quality, path=output_path, callback=callback)

elif downloader == threaded or str(downloader) == "threaded":
threaded(video=self, quality=quality, path=output_path, callback=callback)
download_video_threaded = threaded(20, 10)
download_video_threaded(video=self, quality=quality, path=output_path, callback=callback)

elif downloader == FFMPEG or str(downloader) == "FFMPEG":
FFMPEG(video=self, quality=quality, path=output_path, callback=callback)
Expand Down

0 comments on commit d177739

Please sign in to comment.