Skip to content

Commit

Permalink
fix: allow creating empty file with multipart upload
Browse files Browse the repository at this point in the history
- When file size is 0, make just 1 presigned url upload part
- When upload is complete, call progress callback at least once
- Don't send a file-like object to server when chunk is empty

Resolve BE-2133
  • Loading branch information
nickpetrovic committed Jan 11, 2025
1 parent 529bf19 commit e2a9d56
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
5 changes: 5 additions & 0 deletions pkg/abstractions/volume/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ func (s *GlobalVolumeService) CreateMultipartUpload(ctx context.Context, in *pb.
totalParts := math.Ceil(float64(in.FileSize) / float64(in.ChunkSize))
uploadParts := make([]*pb.FileUploadPart, int(totalParts))

// When the file size is 0, we still need to create a part
if len(uploadParts) == 0 {
uploadParts = make([]*pb.FileUploadPart, 1)
}

for i := range uploadParts {
res, err := s.CreatePresignedURL(ctx, &pb.CreatePresignedURLRequest{
VolumeName: in.VolumeName,
Expand Down
2 changes: 1 addition & 1 deletion sdk/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "beta9"
version = "0.1.149"
version = "0.1.150"
description = ""
authors = ["beam.cloud <[email protected]>"]
packages = [
Expand Down
22 changes: 15 additions & 7 deletions sdk/src/beta9/multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from multiprocessing import Manager
from os import PathLike
from pathlib import Path
from queue import Queue
from queue import Empty, Queue
from threading import Thread
from typing import (
Any,
Expand Down Expand Up @@ -144,16 +144,24 @@ def _progress_updater(
"""

def target():
def cb(total: int, advance: int):
if callback is not None:
callback(total=total, advance=advance)

finished = 0
while finished < file_size:
try:
processed = queue.get_nowait()
except Exception:
processed = queue.get(timeout=1)
except Empty:
continue
except Exception:
break

finished += processed or 0
if callback is not None:
callback(total=file_size, advance=processed)
if processed:
finished += processed
cb(total=file_size, advance=processed)

cb(total=file_size, advance=0)

thread = Thread(target=target, daemon=True)
thread.start()
Expand Down Expand Up @@ -216,7 +224,7 @@ def read(self, size: Optional[int] = -1) -> bytes:
try:
response = session.put(
url=file_part.url,
data=QueueBuffer(chunk),
data=QueueBuffer(chunk) if chunk else None,
headers={
"Content-Length": str(len(chunk)),
},
Expand Down

0 comments on commit e2a9d56

Please sign in to comment.