Skip to content

Commit

Permalink
fix: multipart uploads and downloads with empty files (#847)
Browse files Browse the repository at this point in the history
- When file size is 0, make just 1 presigned url upload part
- When upload is complete, call progress callback at least once to
indicate it completed
- Don't send any data to juicefs s3 gateway when chunk is empty
- Fix divide by zero errors with downloads

Resolve BE-2133
  • Loading branch information
nickpetrovic authored Jan 11, 2025
1 parent 529bf19 commit a585721
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
5 changes: 5 additions & 0 deletions pkg/abstractions/volume/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ func (s *GlobalVolumeService) CreateMultipartUpload(ctx context.Context, in *pb.
totalParts := math.Ceil(float64(in.FileSize) / float64(in.ChunkSize))
uploadParts := make([]*pb.FileUploadPart, int(totalParts))

// When the file size is 0, we still need to create a part
if len(uploadParts) == 0 {
uploadParts = make([]*pb.FileUploadPart, 1)
}

for i := range uploadParts {
res, err := s.CreatePresignedURL(ctx, &pb.CreatePresignedURLRequest{
VolumeName: in.VolumeName,
Expand Down
2 changes: 1 addition & 1 deletion sdk/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "beta9"
version = "0.1.149"
version = "0.1.150"
description = ""
authors = ["beam.cloud <[email protected]>"]
packages = [
Expand Down
24 changes: 16 additions & 8 deletions sdk/src/beta9/multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from multiprocessing import Manager
from os import PathLike
from pathlib import Path
from queue import Queue
from queue import Empty, Queue
from threading import Thread
from typing import (
Any,
Expand Down Expand Up @@ -144,16 +144,24 @@ def _progress_updater(
"""

def target():
def cb(total: int, advance: int):
if callback is not None:
callback(total=total, advance=advance)

finished = 0
while finished < file_size:
try:
processed = queue.get_nowait()
except Exception:
processed = queue.get(timeout=1)
except Empty:
continue
except Exception:
break

finished += processed or 0
if callback is not None:
callback(total=file_size, advance=processed)
if processed:
finished += processed
cb(total=file_size, advance=processed)

cb(total=file_size, advance=0)

thread = Thread(target=target, daemon=True)
thread.start()
Expand Down Expand Up @@ -216,7 +224,7 @@ def read(self, size: Optional[int] = -1) -> bytes:
try:
response = session.put(
url=file_part.url,
data=QueueBuffer(chunk),
data=QueueBuffer(chunk) if chunk else None,
headers={
"Content-Length": str(len(chunk)),
},
Expand Down Expand Up @@ -356,7 +364,7 @@ def _calculate_file_ranges(file_size: int, chunk_size: int) -> List[FileRange]:
Returns:
List of byte ranges.
"""
ranges = math.ceil(file_size / chunk_size)
ranges = math.ceil(file_size / (chunk_size or 1))
return [
FileRange(
number=i + 1,
Expand Down

0 comments on commit a585721

Please sign in to comment.