Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/s3): ignore depth mismatched path #12326

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

eagle-25
Copy link
Contributor

@eagle-25 eagle-25 commented Jan 12, 2025

Summary

Feature Addition:

Ignores the paths whose depth dose not match to path_spec.include while ingesting S3.

Changes

As-is

  • Retrieves all paths under the prefix.

To-be

  • Retrieves only paths whose depth matches path_spec.include.

Background

  • Warning messages were triggered during S3 ingestion.
  • This issue was caused by attempting to ingest paths whose depth does not match path_spec.include.
  • Since this behavior seems unintended, I propose adding this feature.

Logs

path_spec: s3://bucket/{table}/{partition0}/*.csv

s3_files:
1. s3://bucket/shop_log/year=2025/month=01/day=01/0000.csv
2. s3://bucket/shop_log/year=2025/month=01/day=02/0000.csv
3. s3://bucket/user_log/2025/0000.csv

============

**Before**

$ python s3_ingestion.py

INFO - Processing folder: datahub-test/shop_log
WARNING - Unable to find any files in the folder datahub-test/shop_log/year=2025. Skipping...
WARNING - Unable to find any files in the folder datahub-test/shop_log/year=2025/month=01. Skipping...
WARNING - Unable to find any files in the folder datahub-test/shop_log/year=2025/month=01/day=01. Skipping...
WARNING - Unable to find any files in the folder datahub-test/shop_log/year=2025/month=01/day=02. Skipping...
WARNING - Unable to find any files in the folder s3://bucket/datahub-test/shop_log/year=2025/. Skipping...


INFO - Processing folder: datahub-test/user_log
INFO - Getting files from folder: s3://bucket/datahub-test/user_log/2025/
INFO - Extracting table schema from file: s3://bucket/datahub-test/user_log/2025/ai_kpi.04_kst_v4_dag_runs.csv
INFO - Creating dataset urn with name: bucket/datahub-test/user_log

============

**After**

$ python s3_ingestion.py

INFO - Processing folder: datahub-test/shop_log
INFO - Getting files from folder: s3://bucket/datahub-test/shop_log/year=2025/
WARNING - Unable to find any files in the folder s3://bucket/datahub-test/shop_log/year=2025/. Skipping...


INFO - Processing folder: datahub-test/user_log
INFO - Getting files from folder: s3://bucket/datahub-test/user_log/2025/
INFO - Extracting table schema from file: s3://bucket/datahub-test/user_log/2025/ai_kpi.04_kst_v4_dag_runs.csv
INFO - Creating dataset urn with name: bucket/datahub-test/user_log

How to reproduce

You can reproduce this issue by running the code below.

# s3_ingestion.py

from datahub.ingestion.run.pipeline import Pipeline

_DATAHUB_GMS_ENDPOINT = "<your_gms_endpoint>"
_DATAHUB_ACCESS_TOKEN = "<your_token>"

_S3_PATH = "s3://<your-bucket>/datahub-test/{table}/{partition0}/*.csv"

s3_config = {
        "source": {
            "type": "s3",
            "config": {
                "path_spec": {
                    "include": _S3_PATH
                },
                "aws_config": {
                    "aws_region": "ap-northeast-2",
                },
            },
        },
        "sink": {
            "type": "datahub-rest",
            "config": {
                "server": _DATAHUB_GMS_ENDPOINT,
                "token": _DATAHUB_ACCESS_TOKEN,
            },
        },
    }

pipeline = Pipeline.create(s3_config)
pipeline.run()

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added ingestion PR or Issue related to the ingestion of metadata community-contribution PR or Issue raised by member(s) of DataHub Community labels Jan 12, 2025
@eagle-25 eagle-25 force-pushed the feat/s3/ingestion/ignore-depth-mismatched-path branch 2 times, most recently from e6d168e to d5a4387 Compare January 12, 2025 15:37
Copy link

codecov bot commented Jan 12, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Files with missing lines Coverage Δ
...ngestion/src/datahub/ingestion/source/s3/source.py 86.45% <100.00%> (ø)

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ef36837...23db0f8. Read the comment docs.

@eagle-25 eagle-25 force-pushed the feat/s3/ingestion/ignore-depth-mismatched-path branch 4 times, most recently from c63a02e to a2690ac Compare January 12, 2025 16:01
@eagle-25 eagle-25 force-pushed the feat/s3/ingestion/ignore-depth-mismatched-path branch from a2690ac to 98d9263 Compare January 14, 2025 12:50
@eagle-25 eagle-25 marked this pull request as ready for review January 14, 2025 13:50
@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Jan 14, 2025
Comment on lines +870 to +874
def _is_allowed_path(path_spec_: PathSpec, s3_uri: str) -> bool:
allowed = path_spec_.allowed(s3_uri)
if not allowed:
logger.debug(f"File {s3_uri} not allowed and skipping")
return allowed
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this function to log when a file is ignored.

If you have a better approach, please feel free to suggest it. 🙏

@eagle-25 eagle-25 force-pushed the feat/s3/ingestion/ignore-depth-mismatched-path branch from 98d9263 to 651892a Compare January 14, 2025 14:01
for key, group in group_s3_objects_by_dirname(s3_objects).items():
file_size = 0
creation_time = None
modification_time = None

for item in group:
file_path = self.create_s3_path(item.bucket_name, item.key)
if not path_spec.allowed(file_path):
Copy link
Contributor Author

@eagle-25 eagle-25 Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify my intention, I wrote it as qna format.

Q: Why path_spec.allowed moved to L879?
A: Since path_spec.allowed already implements depth comparison, I used it to determine whether each path should be skipped or not while iterating over s3_objects.

Q: Does moving this logic to L879 make any breaking changes?
A: No, because it simply skips each disallowed path even though it has been moved to L879.

@eagle-25 eagle-25 force-pushed the feat/s3/ingestion/ignore-depth-mismatched-path branch from 651892a to 98366cd Compare January 15, 2025 05:45
@eagle-25 eagle-25 force-pushed the feat/s3/ingestion/ignore-depth-mismatched-path branch from 98366cd to 60d83cd Compare January 17, 2025 02:30
@eagle-25 eagle-25 force-pushed the feat/s3/ingestion/ignore-depth-mismatched-path branch from 60d83cd to c1841f6 Compare January 19, 2025 15:43
@eagle-25 eagle-25 force-pushed the feat/s3/ingestion/ignore-depth-mismatched-path branch from c1841f6 to ab015f5 Compare January 20, 2025 04:56
@eagle-25 eagle-25 force-pushed the feat/s3/ingestion/ignore-depth-mismatched-path branch from ab015f5 to 23db0f8 Compare January 20, 2025 09:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-contribution PR or Issue raised by member(s) of DataHub Community ingestion PR or Issue related to the ingestion of metadata needs-review Label for PRs that need review from a maintainer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant