Skip to content

Commit

Permalink
sdk/python: ObjectFileWriter (file-like writer)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Koo <[email protected]>
  • Loading branch information
rkoo19 committed Dec 3, 2024
1 parent baa11bf commit 2aec518
Show file tree
Hide file tree
Showing 6 changed files with 476 additions and 1 deletion.
81 changes: 80 additions & 1 deletion python/aistore/sdk/obj/obj_file/object_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import requests
from io import BufferedIOBase
from io import BufferedIOBase, BufferedWriter
from typing import Optional
from overrides import override
from aistore.sdk.obj.content_iterator import ContentIterator
Expand All @@ -13,6 +13,7 @@
logger = get_logger(__name__)


# TODO: Rename to `ObjectFileReader`
class ObjectFile(BufferedIOBase):
"""
A sequential read-only file-like object extending `BufferedIOBase` for reading object data, with support for both
Expand Down Expand Up @@ -137,3 +138,81 @@ def read(self, size: Optional[int] = -1) -> bytes:
def close(self) -> None:
"""Close the file."""
self._closed = True


class ObjectFileWriter(BufferedWriter):
"""
A file-like writer object for AIStore, extending `BufferedWriter`.
Args:
obj (Object): The Object instance for handling write operations.
mode (str): Specifies the mode in which the file is opened.
- `'w'`: Write mode. Opens the object for writing, truncating any existing content.
Writing starts from the beginning of the object.
- `'a'`: Append mode. Opens the object for appending. Existing content is preserved,
and writing starts from the end of the object.
"""

def __init__(self, obj: "Object", mode: str):
self._obj = obj
self._mode = mode
self._handle = ""
self._closed = False
if self._mode == "w":
self._obj.put_content(b"")

@override
def __enter__(self, *args, **kwargs):
if self._mode == "w":
self._obj.put_content(b"")
return self

@override
def write(self, buffer: bytes) -> int:
"""
Write data to the object.
Args:
data (bytes): The data to write.
Returns:
int: Number of bytes written.
Raises:
ValueError: I/O operation on a closed file.
"""
if self._closed:
raise ValueError("I/O operation on closed file.")

self._handle = self._obj.append_content(buffer, handle=self._handle)

return len(buffer)

@override
def flush(self) -> None:
"""
Flush the writer, ensuring the object is finalized.
This does not close the writer but makes the current state accessible.
Raises:
ValueError: I/O operation on a closed file.
"""
if self._closed:
raise ValueError("I/O operation on closed file.")

if self._handle:
# Finalize the current state with flush
self._obj.append_content(content=b"", handle=self._handle, flush=True)
# Reset the handle to prepare for further appends
self._handle = ""

@override
def close(self) -> None:
"""
Close the writer and finalize the object.
"""
if not self._closed:
# Flush the data before closing
self.flush()
self._closed = True
13 changes: 13 additions & 0 deletions python/aistore/sdk/obj/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from aistore.sdk.provider import Provider
from aistore.sdk.obj.object_client import ObjectClient
from aistore.sdk.obj.object_reader import ObjectReader
from aistore.sdk.obj.object_writer import ObjectWriter
from aistore.sdk.request_client import RequestClient
from aistore.sdk.types import (
ActionMsg,
Expand Down Expand Up @@ -131,6 +132,7 @@ def head(self) -> CaseInsensitiveDict:
self._props = ObjectProps(headers)
return headers

# TODO: Rename to `get_reader` aand deprecate `get` method (call `get_reader`)
# pylint: disable=too-many-arguments
def get(
self,
Expand Down Expand Up @@ -277,6 +279,17 @@ def _put_data(self, data) -> Response:
data=data,
)

# TODO: Move all object writing methods to ObjectWriter class (e.g.
# put_content, put_file, append_content, and set_custom_props).
def get_writer(self) -> ObjectWriter:
"""
Create an ObjectWriter to write to object contents and attributes.
Returns:
An ObjectWriter which can be used to write to an object's contents and attributes.
"""
return ObjectWriter(self)

# pylint: disable=too-many-arguments
def promote(
self,
Expand Down
41 changes: 41 additions & 0 deletions python/aistore/sdk/obj/object_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#

from aistore.sdk.obj.obj_file.object_file import ObjectFileWriter

# pylint: disable=too-few-public-methods

# TODO: Move all object writing methods to this class (e.g.
# put_content, put_file, append_content, and set_custom_props).


class ObjectWriter:
"""
Provide a way to write an object's contents and attributes.
"""

def __init__(self, obj: "Object"):
self._obj = obj

def as_file(self, mode: str = "a") -> ObjectFileWriter:
"""
Return a file-like object for writing object data.
Args:
mode (str): Specifies the mode in which the file is opened (defaults to 'a').
- `'w'`: Write mode. Opens the object for writing, truncating any existing content.
Writing starts from the beginning of the object.
- `'a'`: Append mode. Opens the object for appending. Existing content is preserved,
and writing starts from the end of the object.
Returns:
A file-like object for writing object data.
Raises:
ValueError: Invalid mode provided.
"""
if mode not in {"w", "a"}:
raise ValueError(f"Invalid mode: {mode}")

return ObjectFileWriter(self._obj, mode)
102 changes: 102 additions & 0 deletions python/examples/sdk/object_file/obj-file-writer-stress-test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#

# This script tests AIStore's ObjectFileWriter and its ability to handle interruptions
# during a write workload (e.g., simulating intermittent AIStore K8s node failures). Run with
# a one-node cluster to best and most consistently observe the behavior of the ObjectFileWriter.

import logging
import os
import time
import urllib3
import random
import string
from kubernetes import client as k8s_client, config as k8s_config
from aistore.sdk.client import Client
from utils import start_pod_killer, stop_pod_killer

logging.basicConfig(level=logging.INFO) # Set to DEBUG for more detailed logs

KB = 1024
MB = 1024 * KB
GB = 1024 * MB

# Adjust the write size, pod kill interval, and write count to control the workload
# and frequency of interruptions based on your specific test machine configuration.
AIS_ENDPOINT = os.getenv("AIS_ENDPOINT", "http://localhost:51080")
BUCKET_NAME = "writer-stress-test"
OBJECT_NAME = "writer-stress-test-object"
WRITE_COUNT = 1000
WRITE_SIZE = 128 * KB
INTERRUPT = True
POD_KILL_NAMESPACE = "ais"
POD_KILL_NAME = "ais-target-0"
POD_KILL_INTERVAL = 1e-3


def generate_random_data(size: int) -> bytes:
"""Generate random data of the specified size."""
return "".join(random.choices(string.ascii_letters + string.digits, k=size)).encode()


def test_with_interruptions(k8s_client: k8s_client.CoreV1Api, obj):
"""Test ObjectFileWriter with pod interruptions."""
logging.info("Starting ObjectFileWriter stress test")

# Start pod killer process
pod_killer_process = None
if INTERRUPT:
logging.info("Starting pod killer process...")
pod_killer_process = start_pod_killer(k8s_client, POD_KILL_NAMESPACE, POD_KILL_NAME, POD_KILL_INTERVAL)

# Perform write workload
expected_data = obj_file_write(obj)

# Stop the pod killer process
if INTERRUPT:
logging.info("Stopping pod killer process...")
stop_pod_killer(pod_killer_process)

# Validate written data after interruptions
logging.info("Validating written content...")
actual_data = obj.get().read_all()
assert actual_data == expected_data, "Validation Failed: Written content does not match expected content"
logging.info("Validation Passed: Written content matches expected content.")


def obj_file_write(obj):
"""Perform the write workload using ObjectFileWriter and return expected data."""
writer = obj.put().as_file(mode="a")
expected_data = [] # Track all chunks written
with writer:
for i in range(WRITE_COUNT):
chunk = generate_random_data(WRITE_SIZE)
writer.write(chunk)
expected_data.append(chunk)
logging.info(f"Written chunk {i + 1}/{WRITE_COUNT}")
return b"".join(expected_data)


def main():
"""Main function to execute the stress test."""
retry = urllib3.Retry(total=10, backoff_factor=0.5, status_forcelist=[400, 404])
client = Client(endpoint=AIS_ENDPOINT, retry=retry)
k8s_config.load_kube_config()
v1 = k8s_client.CoreV1Api()

# Create bucket and object for testing
bucket = client.bucket(BUCKET_NAME).create()
obj = bucket.object(OBJECT_NAME)

try:
# Test writing to the object with pod interruptions
test_with_interruptions(v1, obj)
finally:
# Cleanup bucket
bucket.delete(missing_ok=True)
logging.info("Cleanup completed.")


if __name__ == "__main__":
main()
Loading

0 comments on commit 2aec518

Please sign in to comment.