Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…open into patch-2

* 'develop' of https://github.com/RaRe-Technologies/smart_open:
  fix test, for real this time
  update integration test
  Add advanced usage sections to README.rst (piskvorky#741)
  Add logic for handling large files in MultipartWriter uploads to s3 (piskvorky#796)
  Fix __str__ method in SinglepartWriter (piskvorky#791)
  • Loading branch information
ddelange committed Feb 22, 2024
2 parents 0c5da08 + 34ba502 commit fb8dc6d
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 61 deletions.
41 changes: 38 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ You can customize the credentials when constructing the session for the client.
aws_session_token=SESSION_TOKEN,
)
client = session.client('s3', endpoint_url=..., config=...)
fin = open('s3://bucket/key', transport_params=dict(client=client))
fin = open('s3://bucket/key', transport_params={'client': client})
Your second option is to specify the credentials within the S3 URL itself:

Expand All @@ -341,6 +341,18 @@ Your second option is to specify the credentials within the S3 URL itself:
*Important*: ``smart_open`` ignores configuration files from the older ``boto`` library.
Port your old ``boto`` settings to ``boto3`` in order to use them with ``smart_open``.

S3 Advanced Usage
-----------------

Additional keyword arguments can be propagated to the boto3 methods that are used by ``smart_open`` under the hood using the ``client_kwargs`` transport parameter.

For instance, to upload a blob with Metadata, ACL, StorageClass, these keyword arguments can be passed to ``create_multipart_upload`` (`docs <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_multipart_upload>`__).

.. code-block:: python
kwargs = {'Metadata': {'version': 2}, 'ACL': 'authenticated-read', 'StorageClass': 'STANDARD_IA'}
fout = open('s3://bucket/key', 'wb', transport_params={'client_kwargs': {'S3.Client.create_multipart_upload': kwargs}})
Iterating Over an S3 Bucket's Contents
--------------------------------------

Expand Down Expand Up @@ -392,7 +404,20 @@ and pass it to the Client. To create an API token for use in the example below,
token = os.environ['GOOGLE_API_TOKEN']
credentials = Credentials(token=token)
client = Client(credentials=credentials)
fin = open('gs://gcp-public-data-landsat/index.csv.gz', transport_params=dict(client=client))
fin = open('gs://gcp-public-data-landsat/index.csv.gz', transport_params={'client': client})
GCS Advanced Usage
------------------

Additional keyword arguments can be propagated to the GCS open method (`docs <https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.blob.Blob#google_cloud_storage_blob_Blob_open>`__), which is used by ``smart_open`` under the hood, using the ``blob_open_kwargs`` transport parameter.

Additional blob properties (`docs <https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.blob.Blob#properties>`__) can be set before an upload, as long as they are not read-only, using the ``blob_properties`` transport parameter.

.. code-block:: python
open_kwargs = {'predefined_acl': 'authenticated-read'}
properties = {'metadata': {'version': 2}, 'storage_class': 'COLDLINE'}
fout = open('gs://bucket/key', 'wb', transport_params={'blob_open_kwargs': open_kwargs, 'blob_properties': properties})
Azure Credentials
-----------------
Expand All @@ -413,11 +438,21 @@ to setting up authentication.
from azure.storage.blob import BlobServiceClient
azure_storage_connection_string = os.environ['AZURE_STORAGE_CONNECTION_STRING']
client = BlobServiceClient.from_connection_string(azure_storage_connection_string)
fin = open('azure://my_container/my_blob.txt', transport_params=dict(client=client))
fin = open('azure://my_container/my_blob.txt', transport_params={'client': client})
If you need more credential options, refer to the
`Azure Storage authentication guide <https://docs.microsoft.com/en-us/azure/storage/common/storage-samples-python#authentication>`__.

Azure Advanced Usage
--------------------

Additional keyword arguments can be propagated to the ``commit_block_list`` method (`docs <https://azuresdkdocs.blob.core.windows.net/$web/python/azure-storage-blob/12.14.1/azure.storage.blob.html#azure.storage.blob.BlobClient.commit_block_list>`__), which is used by ``smart_open`` under the hood for uploads, using the ``blob_kwargs`` transport parameter.

.. code-block:: python
kwargs = {'metadata': {'version': 2}}
fout = open('azure://container/key', 'wb', transport_params={'blob_kwargs': kwargs})
Drop-in replacement of ``pathlib.Path.open``
--------------------------------------------

Expand Down
58 changes: 46 additions & 12 deletions integration-tests/test_s3_ported.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import contextlib
import gzip
import io
import os
import unittest
import uuid
import warnings
Expand Down Expand Up @@ -203,20 +204,53 @@ def test_write(self):

def test_multipart(self):
"""Does s3 multipart chunking work correctly?"""
with smart_open.s3.MultipartWriter(BUCKET_NAME, self.key, min_part_size=10) as fout:
fout.write(b"test")
self.assertEqual(fout._buf.tell(), 4)
data_dir = os.path.join(os.path.dirname(__file__), "../smart_open/tests/test_data")
with open(os.path.join(data_dir, "crime-and-punishment.txt"), "rb") as fin:
crime = fin.read()
data = b''
ps = 5 * 1024 * 1024
while len(data) < ps:
data += crime

title = "Преступление и наказание\n\n".encode()
to_be_continued = "\n\n... продолжение следует ...\n\n".encode()

key = "WriterTest.test_multipart"
with smart_open.s3.MultipartWriter(BUCKET_NAME, key, part_size=ps) as fout:
#
# Write some data without triggering an upload
#
fout.write(title)
assert fout._total_parts == 0
assert fout._buf.tell() == 48

#
# Trigger a part upload
#
fout.write(data)
assert fout._total_parts == 1
assert fout._buf.tell() == 661

#
# Write _without_ triggering a part upload
#
fout.write(to_be_continued)
assert fout._total_parts == 1
assert fout._buf.tell() == 710

fout.write(b"test\n")
self.assertEqual(fout._buf.tell(), 9)
self.assertEqual(fout._total_parts, 0)

fout.write(b"test")
self.assertEqual(fout._buf.tell(), 0)
self.assertEqual(fout._total_parts, 1)
#
# We closed the writer, so the final part must have been uploaded
#
assert fout._buf.tell() == 0
assert fout._total_parts == 2

data = read_key(self.key)
self.assertEqual(data, b"testtest\ntest")
#
# read back the same key and check its content
#
with smart_open.s3.open(BUCKET_NAME, key, 'rb') as fin:
got = fin.read()
want = title + data + to_be_continued
assert want == got

def test_empty_key(self):
"""Does writing no data cause key with an empty value to be created?"""
Expand Down
116 changes: 88 additions & 28 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# from the MIT License (MIT).
#
"""Implements file-like objects for reading and writing from/to AWS S3."""
from __future__ import annotations

import http
import io
Expand All @@ -14,6 +15,12 @@
import time
import warnings

from typing import (
Callable,
List,
TYPE_CHECKING,
)

try:
import boto3
import botocore.client
Expand All @@ -28,18 +35,34 @@

from smart_open import constants

from typing import (
Callable,
List,
)

if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
from typing_extensions import Buffer

logger = logging.getLogger(__name__)

DEFAULT_MIN_PART_SIZE = 50 * 1024**2
"""Default minimum part size for S3 multipart uploads"""
MIN_MIN_PART_SIZE = 5 * 1024 ** 2
#
# AWS puts restrictions on the part size for multipart uploads.
# Each part must be more than 5MB, and less than 5GB.
#
# On top of that, our MultipartWriter has a min_part_size option.
# In retrospect, it's an unfortunate name, because it conflicts with the
# minimum allowable part size (5MB), but it's too late to change it, because
# people are using that parameter (unlike the MIN, DEFAULT, MAX constants).
# It really just means "part size": as soon as you have this many bytes,
# write a part to S3 (see the MultipartWriter.write method).
#

MIN_PART_SIZE = 5 * 1024 ** 2
"""The absolute minimum permitted by Amazon."""

DEFAULT_PART_SIZE = 50 * 1024**2
"""The default part size for S3 multipart uploads, chosen carefully by smart_open"""

MAX_PART_SIZE = 5 * 1024 ** 3
"""The absolute maximum permitted by Amazon."""

SCHEMES = ("s3", "s3n", 's3u', "s3a")
DEFAULT_PORT = 443
DEFAULT_HOST = 's3.amazonaws.com'
Expand Down Expand Up @@ -286,7 +309,7 @@ def open(
mode,
version_id=None,
buffer_size=DEFAULT_BUFFER_SIZE,
min_part_size=DEFAULT_MIN_PART_SIZE,
min_part_size=DEFAULT_PART_SIZE,
multipart_upload=True,
defer_seek=False,
client=None,
Expand All @@ -306,12 +329,29 @@ def open(
buffer_size: int, optional
The buffer size to use when performing I/O.
min_part_size: int, optional
The minimum part size for multipart uploads. For writing only.
The minimum part size for multipart uploads, in bytes.
When the writebuffer contains this many bytes, smart_open will upload
the bytes to S3 as a single part of a multi-part upload, freeing the
buffer either partially or entirely. When you close the writer, it
will assemble the parts together.
The value determines the upper limit for the writebuffer. If buffer
space is short (e.g. you are buffering to memory), then use a smaller
value for min_part_size, or consider buffering to disk instead (see
the writebuffer option).
The value must be between 5MB and 5GB. If you specify a value outside
of this range, smart_open will adjust it for you, because otherwise the
upload _will_ fail.
For writing only. Does not apply if you set multipart_upload=False.
multipart_upload: bool, optional
Default: `True`
If set to `True`, will use multipart upload for writing to S3. If set
to `False`, S3 upload will use the S3 Single-Part Upload API, which
is more ideal for small file sizes.
For writing only.
version_id: str, optional
Version of the object, used when reading object.
Expand Down Expand Up @@ -358,10 +398,10 @@ def open(
fileobj = MultipartWriter(
bucket_id,
key_id,
min_part_size=min_part_size,
client=client,
client_kwargs=client_kwargs,
writebuffer=writebuffer,
part_size=min_part_size,
)
else:
fileobj = SinglepartWriter(
Expand Down Expand Up @@ -829,17 +869,21 @@ def __init__(
self,
bucket,
key,
min_part_size=DEFAULT_MIN_PART_SIZE,
part_size=DEFAULT_PART_SIZE,
client=None,
client_kwargs=None,
writebuffer=None,
writebuffer: io.BytesIO | None = None,
):
if min_part_size < MIN_MIN_PART_SIZE:
logger.warning("S3 requires minimum part size >= 5MB; \
multipart upload may fail")
self._min_part_size = min_part_size
adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE)
if part_size != adjusted_ps:
logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}")
part_size = adjusted_ps
self._part_size = part_size

_initialize_boto3(self, client, client_kwargs, bucket, key)
self._client: S3Client
self._bucket: str
self._key: str

try:
partial = functools.partial(
Expand All @@ -862,12 +906,12 @@ def __init__(

self._total_bytes = 0
self._total_parts = 0
self._parts = []
self._parts: list[dict[str, object]] = []

#
# This member is part of the io.BufferedIOBase interface.
#
self.raw = None
self.raw = None # type: ignore[assignment]

def flush(self):
pass
Expand Down Expand Up @@ -943,22 +987,38 @@ def tell(self):
def detach(self):
raise io.UnsupportedOperation("detach() not supported")

def write(self, b):
def write(self, b: Buffer) -> int:
"""Write the given buffer (bytes, bytearray, memoryview or any buffer
interface implementation) to the S3 file.
For more information about buffers, see https://docs.python.org/3/c-api/buffer.html
There's buffering happening under the covers, so this may not actually
do any HTTP transfer right away."""
offset = 0
mv = memoryview(b)
self._total_bytes += len(mv)

length = self._buf.write(b)
self._total_bytes += length
#
# botocore does not accept memoryview, otherwise we could've gotten
# away with not needing to write a copy to the buffer aside from cases
# where b is smaller than part_size
#
while offset < len(mv):
start = offset
end = offset + self._part_size - self._buf.tell()
self._buf.write(mv[start:end])
if self._buf.tell() < self._part_size:
#
# Not enough data to write a new part just yet. The assert
# ensures that we've consumed all of the input buffer.
#
assert end >= len(mv)
return len(mv)

if self._buf.tell() >= self._min_part_size:
self._upload_next_part()

return length
offset = end
return len(mv)

def terminate(self):
"""Cancel the underlying multipart upload."""
Expand All @@ -984,7 +1044,7 @@ def to_boto3(self, resource):
#
# Internal methods.
#
def _upload_next_part(self):
def _upload_next_part(self) -> None:
part_num = self._total_parts + 1
logger.info(
"%s: uploading part_num: %i, %i bytes (total %.3fGB)",
Expand Down Expand Up @@ -1033,10 +1093,10 @@ def __str__(self):
return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key)

def __repr__(self):
return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, min_part_size=%r)" % (
return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % (
self._bucket,
self._key,
self._min_part_size,
self._part_size,
)


Expand Down Expand Up @@ -1160,7 +1220,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def __str__(self):
return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._object.bucket_name, self._object.key)
return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._bucket, self._key)

def __repr__(self):
return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key)
Expand Down
Loading

0 comments on commit fb8dc6d

Please sign in to comment.