diff --git a/pkg/abstractions/volume/multipart.go b/pkg/abstractions/volume/multipart.go index a619e10a7..f14ff744e 100644 --- a/pkg/abstractions/volume/multipart.go +++ b/pkg/abstractions/volume/multipart.go @@ -168,6 +168,11 @@ func (s *GlobalVolumeService) CreateMultipartUpload(ctx context.Context, in *pb. totalParts := math.Ceil(float64(in.FileSize) / float64(in.ChunkSize)) uploadParts := make([]*pb.FileUploadPart, int(totalParts)) + // When the file size is 0, we still need to create a part + if len(uploadParts) == 0 { + uploadParts = make([]*pb.FileUploadPart, 1) + } + for i := range uploadParts { res, err := s.CreatePresignedURL(ctx, &pb.CreatePresignedURLRequest{ VolumeName: in.VolumeName, diff --git a/sdk/pyproject.toml b/sdk/pyproject.toml index 7b3e10500..3e8ab2702 100644 --- a/sdk/pyproject.toml +++ b/sdk/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "beta9" -version = "0.1.149" +version = "0.1.150" description = "" authors = ["beam.cloud "] packages = [ diff --git a/sdk/src/beta9/multipart.py b/sdk/src/beta9/multipart.py index af333d3ca..3fbf4495e 100644 --- a/sdk/src/beta9/multipart.py +++ b/sdk/src/beta9/multipart.py @@ -12,7 +12,7 @@ from multiprocessing import Manager from os import PathLike from pathlib import Path -from queue import Queue +from queue import Empty, Queue from threading import Thread from typing import ( Any, @@ -144,16 +144,24 @@ def _progress_updater( """ def target(): + def cb(total: int, advance: int): + if callback is not None: + callback(total=total, advance=advance) + finished = 0 while finished < file_size: try: - processed = queue.get_nowait() - except Exception: + processed = queue.get(timeout=1) + except Empty: continue + except Exception: + break - finished += processed or 0 - if callback is not None: - callback(total=file_size, advance=processed) + if processed: + finished += processed + cb(total=file_size, advance=processed) + + cb(total=file_size, advance=0) thread = Thread(target=target, daemon=True) thread.start() @@ -216,7 +224,7 @@ def read(self, size: Optional[int] = -1) -> bytes: try: response = session.put( url=file_part.url, - data=QueueBuffer(chunk), + data=QueueBuffer(chunk) if chunk else None, headers={ "Content-Length": str(len(chunk)), }, @@ -356,7 +364,7 @@ def _calculate_file_ranges(file_size: int, chunk_size: int) -> List[FileRange]: Returns: List of byte ranges. """ - ranges = math.ceil(file_size / chunk_size) + ranges = math.ceil(file_size / (chunk_size or 1)) return [ FileRange( number=i + 1,