diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py index 360f18aa448f27..5aa4322277063a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py @@ -91,3 +91,7 @@ def group_s3_objects_by_dirname( dirname = "/" grouped_s3_objs[dirname].append(obj) return grouped_s3_objs + + +def get_path_depth(key: str, delimiter: str = "/") -> int: + return key.lstrip(delimiter).count(delimiter) + 1 diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 989d0d734352a2..5320b0b4bb6fb4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -40,6 +40,7 @@ get_bucket_name, get_bucket_relative_path, get_key_prefix, + get_path_depth, group_s3_objects_by_dirname, strip_s3_prefix, ) @@ -842,14 +843,14 @@ def get_dir_to_process( return [f"{protocol}{bucket_name}/{folder}"] return [f"{protocol}{bucket_name}/{folder}"] - def get_folder_info( + def get_folders_by_prefix_and_depth( self, path_spec: PathSpec, bucket: "Bucket", prefix: str, ) -> List[Folder]: """ - Retrieves all the folders in a path by listing all the files in the prefix. + Retrieves folders in the prefix whose depth matches path_spec.include. If the prefix is a full path then only that folder will be extracted. A folder has creation and modification times, size, and a sample file path. @@ -866,8 +867,14 @@ def get_folder_info( Returns: List[Folder]: A list of Folder objects representing the partitions found. """ + include_path_depth = get_path_depth(urlparse(path_spec.include).path) + s3_objects = ( + obj + for obj in bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE) + if get_path_depth(obj.key) == include_path_depth + ) + partitions: List[Folder] = [] - s3_objects = bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE) for key, group in group_s3_objects_by_dirname(s3_objects).items(): file_size = 0 creation_time = None @@ -993,7 +1000,7 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa prefix_to_process = urlparse(dir).path.lstrip("/") folders.extend( - self.get_folder_info( + self.get_folders_by_prefix_and_depth( path_spec, bucket, prefix_to_process ) ) diff --git a/metadata-ingestion/tests/unit/s3/test_s3_source.py b/metadata-ingestion/tests/unit/s3/test_s3_source.py index 902987213e122f..846f56701741dd 100644 --- a/metadata-ingestion/tests/unit/s3/test_s3_source.py +++ b/metadata-ingestion/tests/unit/s3/test_s3_source.py @@ -1,17 +1,31 @@ from datetime import datetime +from itertools import tee from typing import List, Tuple -from unittest.mock import Mock +from unittest.mock import Mock, patch import pytest from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.aws.s3_util import group_s3_objects_by_dirname from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator from datahub.ingestion.source.data_lake_common.path_spec import PathSpec from datahub.ingestion.source.s3.source import S3Source, partitioned_folder_comparator +def _get_s3_source(path_spec_: PathSpec) -> S3Source: + return S3Source.create( + config_dict={ + "path_spec": { + "include": path_spec_.include, + "table_name": path_spec_.table_name, + }, + }, + ctx=PipelineContext(run_id="test-s3"), + ) + + def test_partition_comparator_numeric_folder_name(): folder1 = "3" folder2 = "12" @@ -245,22 +259,10 @@ def container_properties_filter(x: MetadataWorkUnit) -> bool: } -def test_get_folder_info(): +def test_get_folders_by_prefix_and_depth_returns_latest_file_in_each_folder(): """ - Test S3Source.get_folder_info returns the latest file in each folder + Test S3Source.get_folders_by_prefix_and_depth returns the latest file in each folder """ - - def _get_s3_source(path_spec_: PathSpec) -> S3Source: - return S3Source.create( - config_dict={ - "path_spec": { - "include": path_spec_.include, - "table_name": path_spec_.table_name, - }, - }, - ctx=PipelineContext(run_id="test-s3"), - ) - # arrange path_spec = PathSpec( include="s3://my-bucket/{table}/{partition0}/*.csv", @@ -295,7 +297,7 @@ def _get_s3_source(path_spec_: PathSpec) -> S3Source: ) # act - res = _get_s3_source(path_spec).get_folder_info( + res = _get_s3_source(path_spec).get_folders_by_prefix_and_depth( path_spec, bucket, prefix="/my-folder" ) @@ -303,3 +305,46 @@ def _get_s3_source(path_spec_: PathSpec) -> S3Source: assert len(res) == 2 assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv" assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv" + + +def test_get_folders_by_prefix_and_depth_ignores_depth_mismatch(): + """ + Test S3Source.get_folders_by_prefix_and_depth ignores folders that do not match depth of path_spec.include. + """ + # arrange + path_spec = PathSpec( + include="s3://my-bucket/{table}/{partition0}/*.csv", + table_name="{table}", + ) + + bucket = Mock() + bucket.objects.filter().page_size = Mock( + return_value=[ + Mock( + bucket_name="my-bucket", + key="my-folder/ignore/this/path/0001.csv", + creation_time=datetime(2025, 1, 1, 1), + last_modified=datetime(2025, 1, 1, 1), + size=100, + ), + ] + ) + + captured_group_s3_objects_by_dirname_arg0 = None + def _wrapped_group_s3_objects_by_dirname(s3_objects): + nonlocal captured_group_s3_objects_by_dirname_arg0 + captured_group_s3_objects_by_dirname_arg0, copy2 = tee(s3_objects) + return group_s3_objects_by_dirname(copy2) + + # act + with patch( + "datahub.ingestion.source.s3.source.group_s3_objects_by_dirname", + wraps=_wrapped_group_s3_objects_by_dirname, + ): + _get_s3_source(path_spec).get_folders_by_prefix_and_depth( + path_spec, bucket, prefix="/my-folder" + ) + + # assert + assert captured_group_s3_objects_by_dirname_arg0 is not None + assert len(list(captured_group_s3_objects_by_dirname_arg0)) == 0 diff --git a/metadata-ingestion/tests/unit/s3/test_s3_util.py b/metadata-ingestion/tests/unit/s3/test_s3_util.py index 7850d65ca8b01f..8b77ccbd505edd 100644 --- a/metadata-ingestion/tests/unit/s3/test_s3_util.py +++ b/metadata-ingestion/tests/unit/s3/test_s3_util.py @@ -1,6 +1,11 @@ from unittest.mock import Mock -from datahub.ingestion.source.aws.s3_util import group_s3_objects_by_dirname +import pytest + +from datahub.ingestion.source.aws.s3_util import ( + get_path_depth, + group_s3_objects_by_dirname, +) def test_group_s3_objects_by_dirname(): @@ -27,3 +32,11 @@ def test_group_s3_objects_by_dirname_files_in_root_directory(): assert len(grouped_objects) == 1 assert grouped_objects["/"] == s3_objects + + +@pytest.mark.parametrize( + "path, expected_depth", + [("dir1", 1), ("/dir1", 1), ("/dir1/", 2), ("/dir1/file.txt", 2)], +) +def test_get_path_depth(path: str, expected_depth: int) -> None: + assert get_path_depth(path) == expected_depth