Skip to content

Commit

Permalink
sdk/python: rename ObjectFile to ObjectFileReader
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Koo <[email protected]>
  • Loading branch information
rkoo19 committed Dec 20, 2024
1 parent 78b1e2c commit 3dba53f
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 71 deletions.
12 changes: 6 additions & 6 deletions python/aistore/sdk/obj/obj_file/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@
#


class ObjectFileMaxResumeError(Exception):
class ObjectFileReaderMaxResumeError(Exception):
"""
Raised when ObjectFile has exceeded the max number of stream resumes for an object
Raised when ObjectFileReader has exceeded the max number of stream resumes for an object
"""

def __init__(self, err, max_retries):
self.original_error = err
super().__init__(
f"Object file exceeded max number of stream resumptions: {max_retries}"
f"ObjectFileReader exceeded max number of stream resumptions: {max_retries}"
)


class ObjectFileStreamError(Exception):
class ObjectFileReaderStreamError(Exception):
"""
Raised when ObjectFile fails to establish a stream for an object
Raised when ObjectFileReader fails to establish a stream for an object
"""

def __init__(self, err):
self.original_error = err
super().__init__("Object file failed to establish stream connection")
super().__init__("ObjectFileReader failed to establish stream connection")
9 changes: 4 additions & 5 deletions python/aistore/sdk/obj/obj_file/object_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
logger = get_logger(__name__)


# TODO: Rename to `ObjectFileReader`
class ObjectFile(BufferedIOBase):
class ObjectFileReader(BufferedIOBase):
"""
A sequential read-only file-like object extending `BufferedIOBase` for reading object data, with support for both
reading a fixed size of data and reading until the end of file (EOF).
Expand All @@ -29,7 +28,7 @@ class ObjectFile(BufferedIOBase):
Args:
content_iterator (ContentIterator): An iterator that can fetch object data from AIS in chunks.
max_resume (int): Maximum number of resumes allowed for an ObjectFile instance.
max_resume (int): Maximum number of resumes allowed for an ObjectFileReader instance.
"""

def __init__(self, content_iterator: ContentIterator, max_resume: int):
Expand Down Expand Up @@ -67,8 +66,8 @@ def read(self, size: Optional[int] = -1) -> bytes:
bytes: The read data as a bytes object.
Raises:
ObjectFileStreamError if a connection cannot be made.
ObjectFileMaxResumeError if the stream is interrupted more than the allowed maximum.
ObjectFileReaderStreamError: If a connection cannot be made.
ObjectFileReaderMaxResumeError: If the stream is interrupted more than the allowed maximum.
ValueError: I/O operation on a closed file.
Exception: Any other errors while streaming and reading.
"""
Expand Down
14 changes: 7 additions & 7 deletions python/aistore/sdk/obj/obj_file/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from aistore.sdk.obj.content_iterator import ContentIterator
from aistore.sdk.utils import get_logger
from aistore.sdk.obj.obj_file.errors import (
ObjectFileStreamError,
ObjectFileMaxResumeError,
ObjectFileReaderStreamError,
ObjectFileReaderMaxResumeError,
)

logger = get_logger(__name__)
Expand All @@ -30,13 +30,13 @@ def reset_iterator(
Iterator[bytes]: An iterator to read chunks of data from the object stream.
Raises:
ObjectFileStreamError if a connection cannot be made.
ObjectFileReaderStreamError if a connection cannot be made.
"""
try:
return content_iterator.iter(offset=resume_position)
except Exception as err:
logger.error("Error establishing object stream: (%s)", err)
raise ObjectFileStreamError(err) from err
raise ObjectFileReaderStreamError(err) from err


def increment_resume(resume_total: int, max_resume: int, err: Exception) -> int:
Expand All @@ -52,11 +52,11 @@ def increment_resume(resume_total: int, max_resume: int, err: Exception) -> int:
int: The updated number of resume attempts.
Raises:
ObjectFileMaxResumeError: If the number of resume attempts exceeds the maximum allowed.
ObjectFileReaderMaxResumeError: If the number of resume attempts exceeds the maximum allowed.
"""
resume_total += 1
if resume_total > max_resume:
raise ObjectFileMaxResumeError(err, resume_total) from err
raise ObjectFileReaderMaxResumeError(err, resume_total) from err
return resume_total


Expand All @@ -82,7 +82,7 @@ def handle_chunked_encoding_error(
Tuple[Iterator[bytes], int]: The new iterator and the updated resume total.
Raises:
ObjectFileMaxResumeError: If the maximum number of resume attempts is exceeded.
ObjectFileReaderMaxResumeError: If the maximum number of resume attempts is exceeded.
"""
resume_total = increment_resume(resume_total, max_resume, err)
logger.warning(
Expand Down
6 changes: 3 additions & 3 deletions python/aistore/sdk/obj/object_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from aistore.sdk.obj.content_iterator import ContentIterator
from aistore.sdk.obj.object_client import ObjectClient
from aistore.sdk.obj.obj_file.object_file import ObjectFile
from aistore.sdk.obj.obj_file.object_file import ObjectFileReader
from aistore.sdk.const import DEFAULT_CHUNK_SIZE
from aistore.sdk.obj.object_attributes import ObjectAttributes

Expand Down Expand Up @@ -96,7 +96,7 @@ def as_file(
max_resume: Optional[int] = 5,
) -> BufferedIOBase:
"""
Create a read-only, non-seekable `ObjectFile` instance for streaming object data in chunks.
Create a read-only, non-seekable `ObjectFileReader` instance for streaming object data in chunks.
This file-like object primarily implements the `read()` method to retrieve data sequentially,
with automatic retry/resumption in case of stream interruptions such as `ChunkedEncodingError`.
Expand All @@ -117,7 +117,7 @@ def as_file(
f"Invalid max_resume (must be a non-negative integer): {max_resume}."
)

return ObjectFile(self._content_iterator, max_resume=max_resume)
return ObjectFileReader(self._content_iterator, max_resume=max_resume)

def __iter__(self) -> Iterator[bytes]:
"""
Expand Down
14 changes: 7 additions & 7 deletions python/examples/sdk/object_file/obj-file-stress-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#

# This script tests AIStore's ObjectFile and its ability to resume object reading with interruptions
# This script tests AIStore's ObjectFileReader and its ability to resume object reading with interruptions
# to the underlying object stream (e.g. simulating intermittent AIStore K8s node failures). Run with
# a one-node cluster to best and most consistently observe the behavior of the ObjectFile's resume
# a one-node cluster to best and most consistently observe the behavior of the ObjectFileReader's resume
# functionality.

import logging
Expand All @@ -14,7 +14,7 @@
from kubernetes import client as k8s_client, config as k8s_config
from aistore.sdk.client import Client
from aistore.sdk.obj.object_reader import ObjectReader
from utils import create_and_put_object, obj_file_read, start_pod_killer, stop_pod_killer
from utils import create_and_put_object, obj_file_reader_read, start_pod_killer, stop_pod_killer

logging.basicConfig(level=logging.INFO) # Set to DEBUG for more detailed logs

Expand All @@ -25,7 +25,7 @@
# Adjust the object size, pod kill interval, and chunk size to control how often interruptions
# occur based on your specific test machine configuration and network setup. For example, increase
# object size or decrease chunk size / pod kill interval to trigger more frequent disruptions.
# Test on a one-node cluster to best observe the behavior of the ObjectFile's resume functionality.
# Test on a one-node cluster to best observe the behavior of the ObjectFileReader's resume functionality.

AIS_ENDPOINT = os.getenv("AIS_ENDPOINT", "http://localhost:51080")
BUCKET_NAME = "stress-test"
Expand All @@ -42,7 +42,7 @@


def test_with_interruptions(k8s_client: k8s_client.CoreV1Api, object_reader: ObjectReader, generated_data: bytes):
"""Test object file read with pod interruptions and validate data."""
"""Test ObjectFileReader read with pod interruptions and validate data."""
logging.info("Starting test")

start_time = time.time()
Expand All @@ -52,8 +52,8 @@ def test_with_interruptions(k8s_client: k8s_client.CoreV1Api, object_reader: Obj
logging.info("Starting pod killer process...")
pod_killer_process = start_pod_killer(k8s_client, POD_KILL_NAMESPACE, POD_KILL_NAME, POD_KILL_INTERVAL)

# Perform object file read
downloaded_data, resume_total = obj_file_read(object_reader, READ_SIZE, BUFFER_SIZE, MAX_RESUME)
# Perform ObjectFileReader read
downloaded_data, resume_total = obj_file_reader_read(object_reader, READ_SIZE, BUFFER_SIZE, MAX_RESUME)

end_time = time.time()

Expand Down
14 changes: 7 additions & 7 deletions python/examples/sdk/object_file/obj-read-benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def benchmark_obj_reader(obj: Object) -> List[float]:
clear_directory(EXTRACT_PATH)
return times

def benchmark_obj_file(obj: Object) -> List[float]:
def benchmark_obj_file_reader(obj: Object) -> List[float]:
times = []
for _ in range(NUM_READS):
start_time = time.perf_counter()
Expand All @@ -71,18 +71,18 @@ def main():
reader_times = benchmark_obj_reader(obj)
reader_throughput = calculate_median_throughput(reader_times, TOTAL_SIZE) / MB

# Benchmark with ObjectFile NUM_READS times and calculate median throughput
file_times = benchmark_obj_file(obj)
file_throughput = calculate_median_throughput(file_times, TOTAL_SIZE) / MB
# Benchmark with ObjectFileReader NUM_READS times and calculate median throughput
file_reader_times = benchmark_obj_file_reader(obj)
file_reader_throughput = calculate_median_throughput(file_reader_times, TOTAL_SIZE) / MB

# Calculate the median % overhead
overhead = ((reader_throughput - file_throughput) / reader_throughput) * 100
overhead = ((reader_throughput - file_reader_throughput) / reader_throughput) * 100

# Save results to CSV
with open("benchmark_results.csv", mode="w", newline="") as file:
writer = csv.writer(file)
writer.writerow(["File Size (MB)", "File Count", "ObjectReader Throughput (MB/s)", "ObjectFile Throughput (MB/s)", "Overhead (%)"])
writer.writerow([FILE_SIZE / MB, NUM_FILES, reader_throughput, file_throughput, overhead])
writer.writerow(["File Size (MB)", "File Count", "ObjectReader Throughput (MB/s)", "ObjectFileReader Throughput (MB/s)", "Overhead (%)"])
writer.writerow([FILE_SIZE / MB, NUM_FILES, reader_throughput, file_reader_throughput, overhead])

finally:
bucket.delete()
Expand Down
4 changes: 2 additions & 2 deletions python/examples/sdk/object_file/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def create_and_put_objects(bucket: Bucket, obj_size: int, num_objects: int) -> N
create_and_put_object(obj, obj_size)


def obj_file_read(object_reader: ObjectReader, read_size: int, buffer_size: int, max_resume: int) -> Tuple[bytes, int]:
"""Reads the object file from the bucket. Returns the downloaded data and total number of resumes."""
def obj_file_reader_read(object_reader: ObjectReader, read_size: int, buffer_size: int, max_resume: int) -> Tuple[bytes, int]:
"""Reads via ObjectFileReader instantiated from provided ObjectReader. Returns the downloaded data and total number of resumes."""
result = bytearray()
with object_reader.as_file(buffer_size=buffer_size, max_resume=max_resume) as obj_file:
while True:
Expand Down
10 changes: 5 additions & 5 deletions python/examples/sdk/resilient-streaming-object-file.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"source": [
"# Resilient Object Streaming in AIStore\n",
"\n",
"The following demo shows how to use `ObjectFile` (`aistore.sdk.obj.obj_file.object_file`) to stream large objects amidst potential instances of `ChunkedEncodingError` due to momentary issues with the cluster or its availability mid-read:"
"The following demo shows how to use `ObjectFileReader` (`aistore.sdk.obj.obj_file.object_file`) to stream large objects amidst potential instances of `ChunkedEncodingError` due to momentary issues with the cluster or its availability mid-read:"
]
},
{
Expand Down Expand Up @@ -38,7 +38,7 @@
"\n",
"AIS_ENDPOINT = \"http://localhost:8080\"\n",
"\n",
"# Define custom retry logic for requests to AIS. This will also be used when re-establishing streams (in the case of object.get().as_file()).\n",
"# Define custom retry logic for requests to AIS. This will also be used when re-establishing streams (in the case of object.get_reader().as_file()).\n",
"# If you want to retry in the case of total pod failure, be sure to force retries on specific HTTP response codes that are not typically retried\n",
"# In particular, 400 and 404 are what you might see as the client attempts to redirect requests to an object on a missing target\n",
"# The timing on each retry is determined by (backoff_factor * 2^retry_count) -- here the last and longest retry waits 512 seconds\n",
Expand Down Expand Up @@ -76,7 +76,7 @@
"id": "f0b6fc75",
"metadata": {},
"source": [
"The `ObjectFile` implementation catches instances of `ChunkedEncodingError` mid-read and retries a new object stream from the last known position to resume safely, where `max_resume` dictates the number of resumes we will allow:"
"The `ObjectFileReader` implementation catches instances of `ChunkedEncodingError` mid-read and retries a new object stream from the last known position to resume safely, where `max_resume` dictates the number of resumes we will allow:"
]
},
{
Expand All @@ -86,7 +86,7 @@
"metadata": {},
"outputs": [],
"source": [
"# Step 3: Read using ObjectFile\n",
"# Step 3: Read using ObjectFileReader (via object.get_reader().as_file())\n",
"\n",
"with client.bucket(BUCKET_NAME).object(OBJECT_NAME).get().as_file(max_resume=3) as f:\n",
" print(f.read(10)) # Read the first 10 bytes of the file\n",
Expand All @@ -98,7 +98,7 @@
"id": "bcb789bd",
"metadata": {},
"source": [
"`ObjectFile` can be used in any context where a non-seekable, sequential file-like object is expected, such as `tarfile.open` in streaming mode `r|*`:"
"`ObjectFileReader` can be used in any context where a non-seekable, sequential file-like object is expected, such as `tarfile.open` in streaming mode `r|*`:"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from tests.utils import create_and_put_object, random_string, case_matrix


class TestObjectFileOps(unittest.TestCase):
class TestObjectFileReaderOps(unittest.TestCase):
OBJECT_NAME = "test-object"
BUCKET_NAME = f"test-bucket-{random_string(8)}"
OBJECT_SIZE = 5242880
Expand Down
Loading

0 comments on commit 3dba53f

Please sign in to comment.